Back to Blog

Data Engineering Pipeline: Building Reliable ETL/ELT Systems in 2026

Data engineering pipeline in 2026 — ETL vs ELT, Apache Airflow, dbt, data warehouse design, real-time vs batch processing, and production patterns for reliable

Viprasol Tech Team
April 21, 2026
13 min read

Data Engineering Pipeline: Building Reliable ETL/ELT Systems in 2026

Data pipelines are the plumbing of analytics and ML systems. When they work, nobody notices. When they break — missing data, stale dashboards, wrong metrics — business decisions get made on bad numbers.

Building reliable data pipelines requires thinking about failure modes from the start: idempotent processing, schema evolution, late-arriving data, upstream API changes, and the monitoring that catches problems before they cascade.

This guide covers the modern data engineering stack: ingestion patterns, transformation with dbt, orchestration with Airflow, and the architectural choices that make pipelines maintainable.


ETL vs ELT — The Paradigm Shift

ETL (Extract, Transform, Load): Transform data before loading it into the warehouse. Common in legacy systems where compute was expensive.

ELT (Extract, Load, Transform): Load raw data into the warehouse first, then transform using SQL. The modern approach — warehouses (Snowflake, BigQuery, Redshift) are cheap to compute in, and keeping raw data enables reprocessing.

ETL (legacy):
Source DB → [Extract] → [Transform in Python] → [Load to warehouse]
                              (compute on pipeline server)

ELT (modern):
Source DB → [Extract] → [Load raw to warehouse] → [Transform with dbt/SQL]
                         (cheap, keep everything)    (warehouse compute)

The ELT advantage: When business logic changes, you can reprocess historical data by re-running the SQL transformation — no need to re-extract from the source.


Ingestion Layer

Batch Ingestion from PostgreSQL

# ingestion/postgres_to_warehouse.py
# Incremental extraction using updated_at watermark

import psycopg2
import snowflake.connector
from datetime import datetime, timedelta
import json
import logging

logger = logging.getLogger(__name__)

def extract_incremental(
    source_conn,
    table: str,
    watermark_col: str,
    last_watermark: datetime,
    batch_size: int = 10000
) -> list[dict]:
    """Extract rows updated since last watermark."""
    records = []
    offset = 0
    
    while True:
        with source_conn.cursor() as cur:
            cur.execute(f"""
                SELECT * FROM {table}
                WHERE {watermark_col} > %s
                  AND {watermark_col} <= NOW()
                ORDER BY {watermark_col}, id
                LIMIT %s OFFSET %s
            """, (last_watermark, batch_size, offset))
            
            columns = [desc[0] for desc in cur.description]
            rows = cur.fetchall()
            
            if not rows:
                break
            
            records.extend([dict(zip(columns, row)) for row in rows])
            offset += len(rows)
            
            if len(rows) < batch_size:
                break
    
    logger.info(f"Extracted {len(records)} rows from {table} since {last_watermark}")
    return records


def load_to_staging(records: list[dict], table: str, conn) -> None:
    """Load records to warehouse staging area (idempotent via MERGE)."""
    if not records:
        return
    
    # Stage to temp table first, then MERGE
    staging_table = f"staging.{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    with conn.cursor() as cur:
        # Create temp staging table
        cur.execute(f"""
            CREATE TEMPORARY TABLE {staging_table} 
            LIKE raw.{table}
        """)
        
        # Bulk insert to staging
        cur.executemany(
            f"INSERT INTO {staging_table} VALUES ({','.join(['%s'] * len(records[0]))})",
            [list(r.values()) for r in records]
        )
        
        # MERGE into target (upsert — handles re-runs idempotently)
        cur.execute(f"""
            MERGE INTO raw.{table} AS target
            USING {staging_table} AS source
            ON target.id = source.id
            WHEN MATCHED THEN
                UPDATE SET target._extracted_at = CURRENT_TIMESTAMP,
                           {', '.join(f'target.{k} = source.{k}' for k in records[0] if k != 'id')}
            WHEN NOT MATCHED THEN
                INSERT ({', '.join(records[0].keys())}, _extracted_at)
                VALUES ({', '.join(f'source.{k}' for k in records[0])}, CURRENT_TIMESTAMP)
        """)
        
        conn.commit()

Real-Time Ingestion with Kafka + Flink

For event streams that can't wait for batch windows:

# Real-time ingestion: Kafka → Iceberg (via PyFlink)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, TableConfig

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Kafka source
t_env.execute_sql("""
    CREATE TABLE kafka_events (
        event_id     STRING,
        event_type   STRING,
        user_id      STRING,
        properties   STRING,   -- JSON
        occurred_at  TIMESTAMP(3),
        WATERMARK FOR occurred_at AS occurred_at - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-ingestion',
        'format' = 'json',
        'scan.startup.mode' = 'earliest-offset'
    )
""")

# Iceberg sink (table format for analytics)
t_env.execute_sql("""
    CREATE TABLE iceberg_events (
        event_id    STRING,
        event_type  STRING,
        user_id     STRING,
        properties  STRING,
        occurred_at TIMESTAMP(3),
        event_date  DATE
    ) PARTITIONED BY (event_date)
    WITH (
        'connector' = 'iceberg',
        'catalog-name' = 'glue',
        'warehouse' = 's3://my-data-lake/warehouse',
        'format-version' = '2'
    )
""")

# Stream processing with 5-minute windows
t_env.execute_sql("""
    INSERT INTO iceberg_events
    SELECT
        event_id,
        event_type,
        user_id,
        properties,
        occurred_at,
        CAST(occurred_at AS DATE) AS event_date
    FROM kafka_events
""")

🤖 AI Is Not the Future — It Is Right Now

Businesses using AI automation cut manual work by 60–80%. We build production-ready AI systems — RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.

  • LLM integration (OpenAI, Anthropic, Gemini, local models)
  • RAG systems that answer from your own data
  • AI agents that take real actions — not just chat
  • Custom ML models for prediction, classification, detection

Transformation with dbt

dbt (data build tool) is the standard for data transformation. SQL models, version control, testing, documentation, and lineage — all in one tool.

-- models/staging/stg_orders.sql
-- Staging layer: rename columns, cast types, add metadata

{{ config(materialized='view') }}

SELECT
    id                          AS order_id,
    user_id,
    status,
    total_cents / 100.0         AS total_usd,
    created_at::TIMESTAMPTZ     AS created_at,
    updated_at::TIMESTAMPTZ     AS updated_at,
    CURRENT_TIMESTAMP           AS _dbt_loaded_at
FROM {{ source('raw', 'orders') }}
WHERE created_at >= '2024-01-01'  -- Exclude test data
-- models/marts/orders/fct_orders.sql
-- Fact table: denormalized, analytics-ready

{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='sync_all_columns'
) }}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
    {% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
    {% endif %}
),

users AS (
    SELECT * FROM {{ ref('stg_users') }}
),

order_items AS (
    SELECT
        order_id,
        COUNT(*) AS item_count,
        SUM(quantity) AS total_quantity
    FROM {{ ref('stg_order_items') }}
    GROUP BY order_id
)

SELECT
    o.order_id,
    o.user_id,
    u.email,
    u.plan,
    u.signup_date,
    o.status,
    o.total_usd,
    oi.item_count,
    oi.total_quantity,
    o.created_at,
    DATE_TRUNC('day', o.created_at)   AS order_date,
    DATE_TRUNC('week', o.created_at)  AS order_week,
    DATE_TRUNC('month', o.created_at) AS order_month,
    o._dbt_loaded_at
FROM orders o
LEFT JOIN users u ON o.user_id = u.user_id
LEFT JOIN order_items oi ON o.order_id = oi.order_id
# models/marts/orders/schema.yml
# dbt tests: run automatically in CI

models:
  - name: fct_orders
    description: "One row per order, fully denormalized for analytics"
    columns:
      - name: order_id
        tests:
          - not_null
          - unique

      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'paid', 'fulfilled', 'cancelled', 'refunded']

      - name: total_usd
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"

      - name: user_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_users')
              field: user_id

Orchestration with Apache Airflow

# dags/daily_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-alerts@company.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_data_pipeline',
    default_args=default_args,
    description='Daily ETL: extract from production → transform with dbt → load to BI',
    schedule_interval='0 3 * * *',  # 3 AM UTC daily
    start_date=days_ago(1),
    catchup=False,  # Don't backfill missed runs
    tags=['production', 'daily'],
) as dag:

    # Step 1: Extract from production databases
    extract_orders = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders_incremental,
        op_kwargs={
            'table': 'orders',
            'watermark_col': 'updated_at',
        },
    )

    extract_users = PythonOperator(
        task_id='extract_users',
        python_callable=extract_users_incremental,
        op_kwargs={'table': 'users', 'watermark_col': 'updated_at'},
    )

    # Step 2: Run dbt transformations
    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command="""
            cd /opt/dbt && \
            dbt run \
              --profiles-dir /opt/dbt/profiles \
              --target production \
              --select tag:daily \
              --no-version-check
        """,
    )

    # Step 3: Run dbt tests
    dbt_test = BashOperator(
        task_id='dbt_test',
        bash_command="""
            cd /opt/dbt && \
            dbt test \
              --profiles-dir /opt/dbt/profiles \
              --target production \
              --select tag:daily
        """,
    )

    # Step 4: Notify stakeholders
    notify_success = PythonOperator(
        task_id='notify_success',
        python_callable=send_slack_notification,
        op_kwargs={'message': 'Daily pipeline completed ✅'},
    )

    # DAG dependencies
    [extract_orders, extract_users] >> dbt_run >> dbt_test >> notify_success

⚡ Your Competitors Are Already Using AI — Are You?

We build AI systems that actually work in production — not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.

  • AI agent systems that run autonomously — not just chatbots
  • Integrates with your existing tools (CRM, ERP, Slack, etc.)
  • Explainable outputs — know why the model decided what it did
  • Free AI opportunity audit for your business

Data Quality Monitoring

# data_quality/checks.py
# Run after each pipeline execution

import great_expectations as ge

def run_data_quality_checks(df, suite_name: str) -> bool:
    """Run GX checks. Returns True if all pass."""
    context = ge.get_context()
    
    validator = context.get_validator(
        batch_request=RuntimeBatchRequest(
            datasource_name="pandas",
            data_connector_name="runtime",
            data_asset_name=suite_name,
            runtime_parameters={"batch_data": df},
            batch_identifiers={"default_identifier_name": "default"},
        ),
        expectation_suite_name=suite_name,
    )

    # Row count expectation
    validator.expect_table_row_count_to_be_between(min_value=1000)
    
    # Completeness
    validator.expect_column_values_to_not_be_null("order_id")
    validator.expect_column_values_to_not_be_null("user_id")
    
    # Distribution check: daily orders should be within 50% of 30-day average
    validator.expect_column_mean_to_be_between("total_usd", min_value=20, max_value=500)
    
    results = validator.validate()
    return results.success

Modern Data Stack (2026)

LayerOptionsRecommendation
IngestionFivetran, Airbyte, custom PythonFivetran for standard connectors; custom for proprietary sources
StorageS3/GCS + Iceberg/Delta LakeS3 + Iceberg for open format
WarehouseSnowflake, BigQuery, RedshiftBigQuery for GCP; Snowflake for multi-cloud
Transformationdbtdbt (industry standard)
OrchestrationAirflow, Prefect, DagsterAirflow for mature teams; Prefect for simplicity
BIMetabase, Looker, TableauMetabase (open source, fast); Looker for enterprise

Implementation Cost

ScopeTimelineInvestment
Ingestion pipeline (3–5 sources)3–6 weeks$15,000–$35,000
dbt project setup + models2–4 weeks$10,000–$25,000
Airflow orchestration2–4 weeks$8,000–$20,000
Full data platform3–6 months$80,000–$200,000

Infrastructure: Snowflake/BigQuery ($500–$5,000/month at scale), Airflow on ECS (~$100–$300/month), dbt Cloud ($50–$500/month).


Working With Viprasol

We build data engineering platforms — ingestion pipelines, dbt transformation layers, Airflow orchestration, and data quality monitoring.

Data pipeline consultation →
Data Analytics Consulting →
AI & Machine Learning Services →


Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Want to Implement AI in Your Business?

From chatbots to predictive models — harness the power of AI with a team that delivers.

Free consultation • No commitment • Response within 24 hours

Viprasol · AI Agent Systems

Ready to automate your business with AI agents?

We build custom multi-agent AI systems that handle sales, support, ops, and content — across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.