Data Engineering Pipeline: Building Reliable ETL/ELT Systems in 2026
Data engineering pipeline in 2026 — ETL vs ELT, Apache Airflow, dbt, data warehouse design, real-time vs batch processing, and production patterns for reliable
Data Engineering Pipeline: Building Reliable ETL/ELT Systems in 2026
Data pipelines are the plumbing of analytics and ML systems. When they work, nobody notices. When they break — missing data, stale dashboards, wrong metrics — business decisions get made on bad numbers.
Building reliable data pipelines requires thinking about failure modes from the start: idempotent processing, schema evolution, late-arriving data, upstream API changes, and the monitoring that catches problems before they cascade.
This guide covers the modern data engineering stack: ingestion patterns, transformation with dbt, orchestration with Airflow, and the architectural choices that make pipelines maintainable.
ETL vs ELT — The Paradigm Shift
ETL (Extract, Transform, Load): Transform data before loading it into the warehouse. Common in legacy systems where compute was expensive.
ELT (Extract, Load, Transform): Load raw data into the warehouse first, then transform using SQL. The modern approach — warehouses (Snowflake, BigQuery, Redshift) are cheap to compute in, and keeping raw data enables reprocessing.
ETL (legacy):
Source DB → [Extract] → [Transform in Python] → [Load to warehouse]
(compute on pipeline server)
ELT (modern):
Source DB → [Extract] → [Load raw to warehouse] → [Transform with dbt/SQL]
(cheap, keep everything) (warehouse compute)
The ELT advantage: When business logic changes, you can reprocess historical data by re-running the SQL transformation — no need to re-extract from the source.
Ingestion Layer
Batch Ingestion from PostgreSQL
# ingestion/postgres_to_warehouse.py
# Incremental extraction using updated_at watermark
import psycopg2
import snowflake.connector
from datetime import datetime, timedelta
import json
import logging
logger = logging.getLogger(__name__)
def extract_incremental(
source_conn,
table: str,
watermark_col: str,
last_watermark: datetime,
batch_size: int = 10000
) -> list[dict]:
"""Extract rows updated since last watermark."""
records = []
offset = 0
while True:
with source_conn.cursor() as cur:
cur.execute(f"""
SELECT * FROM {table}
WHERE {watermark_col} > %s
AND {watermark_col} <= NOW()
ORDER BY {watermark_col}, id
LIMIT %s OFFSET %s
""", (last_watermark, batch_size, offset))
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
if not rows:
break
records.extend([dict(zip(columns, row)) for row in rows])
offset += len(rows)
if len(rows) < batch_size:
break
logger.info(f"Extracted {len(records)} rows from {table} since {last_watermark}")
return records
def load_to_staging(records: list[dict], table: str, conn) -> None:
"""Load records to warehouse staging area (idempotent via MERGE)."""
if not records:
return
# Stage to temp table first, then MERGE
staging_table = f"staging.{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
with conn.cursor() as cur:
# Create temp staging table
cur.execute(f"""
CREATE TEMPORARY TABLE {staging_table}
LIKE raw.{table}
""")
# Bulk insert to staging
cur.executemany(
f"INSERT INTO {staging_table} VALUES ({','.join(['%s'] * len(records[0]))})",
[list(r.values()) for r in records]
)
# MERGE into target (upsert — handles re-runs idempotently)
cur.execute(f"""
MERGE INTO raw.{table} AS target
USING {staging_table} AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target._extracted_at = CURRENT_TIMESTAMP,
{', '.join(f'target.{k} = source.{k}' for k in records[0] if k != 'id')}
WHEN NOT MATCHED THEN
INSERT ({', '.join(records[0].keys())}, _extracted_at)
VALUES ({', '.join(f'source.{k}' for k in records[0])}, CURRENT_TIMESTAMP)
""")
conn.commit()
Real-Time Ingestion with Kafka + Flink
For event streams that can't wait for batch windows:
# Real-time ingestion: Kafka → Iceberg (via PyFlink)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, TableConfig
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Kafka source
t_env.execute_sql("""
CREATE TABLE kafka_events (
event_id STRING,
event_type STRING,
user_id STRING,
properties STRING, -- JSON
occurred_at TIMESTAMP(3),
WATERMARK FOR occurred_at AS occurred_at - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-ingestion',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
""")
# Iceberg sink (table format for analytics)
t_env.execute_sql("""
CREATE TABLE iceberg_events (
event_id STRING,
event_type STRING,
user_id STRING,
properties STRING,
occurred_at TIMESTAMP(3),
event_date DATE
) PARTITIONED BY (event_date)
WITH (
'connector' = 'iceberg',
'catalog-name' = 'glue',
'warehouse' = 's3://my-data-lake/warehouse',
'format-version' = '2'
)
""")
# Stream processing with 5-minute windows
t_env.execute_sql("""
INSERT INTO iceberg_events
SELECT
event_id,
event_type,
user_id,
properties,
occurred_at,
CAST(occurred_at AS DATE) AS event_date
FROM kafka_events
""")
🤖 AI Is Not the Future — It Is Right Now
Businesses using AI automation cut manual work by 60–80%. We build production-ready AI systems — RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.
- LLM integration (OpenAI, Anthropic, Gemini, local models)
- RAG systems that answer from your own data
- AI agents that take real actions — not just chat
- Custom ML models for prediction, classification, detection
Transformation with dbt
dbt (data build tool) is the standard for data transformation. SQL models, version control, testing, documentation, and lineage — all in one tool.
-- models/staging/stg_orders.sql
-- Staging layer: rename columns, cast types, add metadata
{{ config(materialized='view') }}
SELECT
id AS order_id,
user_id,
status,
total_cents / 100.0 AS total_usd,
created_at::TIMESTAMPTZ AS created_at,
updated_at::TIMESTAMPTZ AS updated_at,
CURRENT_TIMESTAMP AS _dbt_loaded_at
FROM {{ source('raw', 'orders') }}
WHERE created_at >= '2024-01-01' -- Exclude test data
-- models/marts/orders/fct_orders.sql
-- Fact table: denormalized, analytics-ready
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
) }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
users AS (
SELECT * FROM {{ ref('stg_users') }}
),
order_items AS (
SELECT
order_id,
COUNT(*) AS item_count,
SUM(quantity) AS total_quantity
FROM {{ ref('stg_order_items') }}
GROUP BY order_id
)
SELECT
o.order_id,
o.user_id,
u.email,
u.plan,
u.signup_date,
o.status,
o.total_usd,
oi.item_count,
oi.total_quantity,
o.created_at,
DATE_TRUNC('day', o.created_at) AS order_date,
DATE_TRUNC('week', o.created_at) AS order_week,
DATE_TRUNC('month', o.created_at) AS order_month,
o._dbt_loaded_at
FROM orders o
LEFT JOIN users u ON o.user_id = u.user_id
LEFT JOIN order_items oi ON o.order_id = oi.order_id
# models/marts/orders/schema.yml
# dbt tests: run automatically in CI
models:
- name: fct_orders
description: "One row per order, fully denormalized for analytics"
columns:
- name: order_id
tests:
- not_null
- unique
- name: status
tests:
- accepted_values:
values: ['pending', 'paid', 'fulfilled', 'cancelled', 'refunded']
- name: total_usd
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: user_id
tests:
- not_null
- relationships:
to: ref('stg_users')
field: user_id
Orchestration with Apache Airflow
# dags/daily_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_data_pipeline',
default_args=default_args,
description='Daily ETL: extract from production → transform with dbt → load to BI',
schedule_interval='0 3 * * *', # 3 AM UTC daily
start_date=days_ago(1),
catchup=False, # Don't backfill missed runs
tags=['production', 'daily'],
) as dag:
# Step 1: Extract from production databases
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders_incremental,
op_kwargs={
'table': 'orders',
'watermark_col': 'updated_at',
},
)
extract_users = PythonOperator(
task_id='extract_users',
python_callable=extract_users_incremental,
op_kwargs={'table': 'users', 'watermark_col': 'updated_at'},
)
# Step 2: Run dbt transformations
dbt_run = BashOperator(
task_id='dbt_run',
bash_command="""
cd /opt/dbt && \
dbt run \
--profiles-dir /opt/dbt/profiles \
--target production \
--select tag:daily \
--no-version-check
""",
)
# Step 3: Run dbt tests
dbt_test = BashOperator(
task_id='dbt_test',
bash_command="""
cd /opt/dbt && \
dbt test \
--profiles-dir /opt/dbt/profiles \
--target production \
--select tag:daily
""",
)
# Step 4: Notify stakeholders
notify_success = PythonOperator(
task_id='notify_success',
python_callable=send_slack_notification,
op_kwargs={'message': 'Daily pipeline completed ✅'},
)
# DAG dependencies
[extract_orders, extract_users] >> dbt_run >> dbt_test >> notify_success
⚡ Your Competitors Are Already Using AI — Are You?
We build AI systems that actually work in production — not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.
- AI agent systems that run autonomously — not just chatbots
- Integrates with your existing tools (CRM, ERP, Slack, etc.)
- Explainable outputs — know why the model decided what it did
- Free AI opportunity audit for your business
Data Quality Monitoring
# data_quality/checks.py
# Run after each pipeline execution
import great_expectations as ge
def run_data_quality_checks(df, suite_name: str) -> bool:
"""Run GX checks. Returns True if all pass."""
context = ge.get_context()
validator = context.get_validator(
batch_request=RuntimeBatchRequest(
datasource_name="pandas",
data_connector_name="runtime",
data_asset_name=suite_name,
runtime_parameters={"batch_data": df},
batch_identifiers={"default_identifier_name": "default"},
),
expectation_suite_name=suite_name,
)
# Row count expectation
validator.expect_table_row_count_to_be_between(min_value=1000)
# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("user_id")
# Distribution check: daily orders should be within 50% of 30-day average
validator.expect_column_mean_to_be_between("total_usd", min_value=20, max_value=500)
results = validator.validate()
return results.success
Modern Data Stack (2026)
| Layer | Options | Recommendation |
|---|---|---|
| Ingestion | Fivetran, Airbyte, custom Python | Fivetran for standard connectors; custom for proprietary sources |
| Storage | S3/GCS + Iceberg/Delta Lake | S3 + Iceberg for open format |
| Warehouse | Snowflake, BigQuery, Redshift | BigQuery for GCP; Snowflake for multi-cloud |
| Transformation | dbt | dbt (industry standard) |
| Orchestration | Airflow, Prefect, Dagster | Airflow for mature teams; Prefect for simplicity |
| BI | Metabase, Looker, Tableau | Metabase (open source, fast); Looker for enterprise |
Implementation Cost
| Scope | Timeline | Investment |
|---|---|---|
| Ingestion pipeline (3–5 sources) | 3–6 weeks | $15,000–$35,000 |
| dbt project setup + models | 2–4 weeks | $10,000–$25,000 |
| Airflow orchestration | 2–4 weeks | $8,000–$20,000 |
| Full data platform | 3–6 months | $80,000–$200,000 |
Infrastructure: Snowflake/BigQuery ($500–$5,000/month at scale), Airflow on ECS (~$100–$300/month), dbt Cloud ($50–$500/month).
Working With Viprasol
We build data engineering platforms — ingestion pipelines, dbt transformation layers, Airflow orchestration, and data quality monitoring.
→ Data pipeline consultation →
→ Data Analytics Consulting →
→ AI & Machine Learning Services →
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Want to Implement AI in Your Business?
From chatbots to predictive models — harness the power of AI with a team that delivers.
Free consultation • No commitment • Response within 24 hours
Ready to automate your business with AI agents?
We build custom multi-agent AI systems that handle sales, support, ops, and content — across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.