Data Engineering Pipeline: Building Reliable ETL/ELT Systems in 2026
Data engineering pipeline in 2026 — ETL vs ELT, Apache Airflow, dbt, data warehouse design, real-time vs batch processing, and production patterns for reliable
Data Engineering Pipeline: Building Reliable ETL/ELT Systems in 2026
Data pipelines are the plumbing of analytics and ML systems. When they work, nobody notices. When they break — missing data, stale dashboards, wrong metrics — business decisions get made on bad numbers.
Building reliable data pipelines requires thinking about failure modes from the start: idempotent processing, schema evolution, late-arriving data, upstream API changes, and the monitoring that catches problems before they cascade.
This guide covers the modern data engineering stack: ingestion patterns, transformation with dbt, orchestration with Airflow, and the architectural choices that make pipelines maintainable.
ETL vs ELT — The Paradigm Shift
ETL (Extract, Transform, Load): Transform data before loading it into the warehouse. Common in legacy systems where compute was expensive.
ELT (Extract, Load, Transform): Load raw data into the warehouse first, then transform using SQL. The modern approach — warehouses (Snowflake, BigQuery, Redshift) are cheap to compute in, and keeping raw data enables reprocessing.
ETL (legacy):
Source DB → [Extract] → [Transform in Python] → [Load to warehouse]
(compute on pipeline server)
ELT (modern):
Source DB → [Extract] → [Load raw to warehouse] → [Transform with dbt/SQL]
(cheap, keep everything) (warehouse compute)
The ELT advantage: When business logic changes, you can reprocess historical data by re-running the SQL transformation — no need to re-extract from the source.
Ingestion Layer
Batch Ingestion from PostgreSQL
# ingestion/postgres_to_warehouse.py
# Incremental extraction using updated_at watermark
import psycopg2
import snowflake.connector
from datetime import datetime, timedelta
import json
import logging
logger = logging.getLogger(__name__)
def extract_incremental(
source_conn,
table: str,
watermark_col: str,
last_watermark: datetime,
batch_size: int = 10000
) -> list[dict]:
"""Extract rows updated since last watermark."""
records = []
offset = 0
while True:
with source_conn.cursor() as cur:
cur.execute(f"""
SELECT * FROM {table}
WHERE {watermark_col} > %s
AND {watermark_col} <= NOW()
ORDER BY {watermark_col}, id
LIMIT %s OFFSET %s
""", (last_watermark, batch_size, offset))
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
if not rows:
break
records.extend([dict(zip(columns, row)) for row in rows])
offset += len(rows)
if len(rows) < batch_size:
break
logger.info(f"Extracted {len(records)} rows from {table} since {last_watermark}")
return records
def load_to_staging(records: list[dict], table: str, conn) -> None:
"""Load records to warehouse staging area (idempotent via MERGE)."""
if not records:
return
# Stage to temp table first, then MERGE
staging_table = f"staging.{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
with conn.cursor() as cur:
# Create temp staging table
cur.execute(f"""
CREATE TEMPORARY TABLE {staging_table}
LIKE raw.{table}
""")
# Bulk insert to staging
cur.executemany(
f"INSERT INTO {staging_table} VALUES ({','.join(['%s'] * len(records[0]))})",
[list(r.values()) for r in records]
)
# MERGE into target (upsert — handles re-runs idempotently)
cur.execute(f"""
MERGE INTO raw.{table} AS target
USING {staging_table} AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target._extracted_at = CURRENT_TIMESTAMP,
{', '.join(f'target.{k} = source.{k}' for k in records[0] if k != 'id')}
WHEN NOT MATCHED THEN
INSERT ({', '.join(records[0].keys())}, _extracted_at)
VALUES ({', '.join(f'source.{k}' for k in records[0])}, CURRENT_TIMESTAMP)
""")
conn.commit()
Real-Time Ingestion with Kafka + Flink
For event streams that can't wait for batch windows:
# Real-time ingestion: Kafka → Iceberg (via PyFlink)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, TableConfig
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Kafka source
t_env.execute_sql("""
CREATE TABLE kafka_events (
event_id STRING,
event_type STRING,
user_id STRING,
properties STRING, -- JSON
occurred_at TIMESTAMP(3),
WATERMARK FOR occurred_at AS occurred_at - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-ingestion',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
""")
# Iceberg sink (table format for analytics)
t_env.execute_sql("""
CREATE TABLE iceberg_events (
event_id STRING,
event_type STRING,
user_id STRING,
properties STRING,
occurred_at TIMESTAMP(3),
event_date DATE
) PARTITIONED BY (event_date)
WITH (
'connector' = 'iceberg',
'catalog-name' = 'glue',
'warehouse' = 's3://my-data-lake/warehouse',
'format-version' = '2'
)
""")
# Stream processing with 5-minute windows
t_env.execute_sql("""
INSERT INTO iceberg_events
SELECT
event_id,
event_type,
user_id,
properties,
occurred_at,
CAST(occurred_at AS DATE) AS event_date
FROM kafka_events
""")
🤖 AI Is Not the Future — It Is Right Now
Businesses using AI automation cut manual work by 60–80%. We build production-ready AI systems — RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.
- LLM integration (OpenAI, Anthropic, Gemini, local models)
- RAG systems that answer from your own data
- AI agents that take real actions — not just chat
- Custom ML models for prediction, classification, detection
Transformation with dbt
dbt (data build tool) is the standard for data transformation. SQL models, version control, testing, documentation, and lineage — all in one tool.
-- models/staging/stg_orders.sql
-- Staging layer: rename columns, cast types, add metadata
{{ config(materialized='view') }}
SELECT
id AS order_id,
user_id,
status,
total_cents / 100.0 AS total_usd,
created_at::TIMESTAMPTZ AS created_at,
updated_at::TIMESTAMPTZ AS updated_at,
CURRENT_TIMESTAMP AS _dbt_loaded_at
FROM {{ source('raw', 'orders') }}
WHERE created_at >= '2024-01-01' -- Exclude test data
-- models/marts/orders/fct_orders.sql
-- Fact table: denormalized, analytics-ready
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
) }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
users AS (
SELECT * FROM {{ ref('stg_users') }}
),
order_items AS (
SELECT
order_id,
COUNT(*) AS item_count,
SUM(quantity) AS total_quantity
FROM {{ ref('stg_order_items') }}
GROUP BY order_id
)
SELECT
o.order_id,
o.user_id,
u.email,
u.plan,
u.signup_date,
o.status,
o.total_usd,
oi.item_count,
oi.total_quantity,
o.created_at,
DATE_TRUNC('day', o.created_at) AS order_date,
DATE_TRUNC('week', o.created_at) AS order_week,
DATE_TRUNC('month', o.created_at) AS order_month,
o._dbt_loaded_at
FROM orders o
LEFT JOIN users u ON o.user_id = u.user_id
LEFT JOIN order_items oi ON o.order_id = oi.order_id
# models/marts/orders/schema.yml
# dbt tests: run automatically in CI
models:
- name: fct_orders
description: "One row per order, fully denormalized for analytics"
columns:
- name: order_id
tests:
- not_null
- unique
- name: status
tests:
- accepted_values:
values: ['pending', 'paid', 'fulfilled', 'cancelled', 'refunded']
- name: total_usd
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: user_id
tests:
- not_null
- relationships:
to: ref('stg_users')
field: user_id
Orchestration with Apache Airflow
# dags/daily_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_data_pipeline',
default_args=default_args,
description='Daily ETL: extract from production → transform with dbt → load to BI',
schedule_interval='0 3 * * *', # 3 AM UTC daily
start_date=days_ago(1),
catchup=False, # Don't backfill missed runs
tags=['production', 'daily'],
) as dag:
# Step 1: Extract from production databases
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders_incremental,
op_kwargs={
'table': 'orders',
'watermark_col': 'updated_at',
},
)
extract_users = PythonOperator(
task_id='extract_users',
python_callable=extract_users_incremental,
op_kwargs={'table': 'users', 'watermark_col': 'updated_at'},
)
# Step 2: Run dbt transformations
dbt_run = BashOperator(
task_id='dbt_run',
bash_command="""
cd /opt/dbt && \
dbt run \
--profiles-dir /opt/dbt/profiles \
--target production \
--select tag:daily \
--no-version-check
""",
)
# Step 3: Run dbt tests
dbt_test = BashOperator(
task_id='dbt_test',
bash_command="""
cd /opt/dbt && \
dbt test \
--profiles-dir /opt/dbt/profiles \
--target production \
--select tag:daily
""",
)
# Step 4: Notify stakeholders
notify_success = PythonOperator(
task_id='notify_success',
python_callable=send_slack_notification,
op_kwargs={'message': 'Daily pipeline completed ✅'},
)
# DAG dependencies
[extract_orders, extract_users] >> dbt_run >> dbt_test >> notify_success

⚡ Your Competitors Are Already Using AI — Are You?
We build AI systems that actually work in production — not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.
- AI agent systems that run autonomously — not just chatbots
- Integrates with your existing tools (CRM, ERP, Slack, etc.)
- Explainable outputs — know why the model decided what it did
- Free AI opportunity audit for your business
Data Quality Monitoring
# data_quality/checks.py
# Run after each pipeline execution
import great_expectations as ge
def run_data_quality_checks(df, suite_name: str) -> bool:
"""Run GX checks. Returns True if all pass."""
context = ge.get_context()
validator = context.get_validator(
batch_request=RuntimeBatchRequest(
datasource_name="pandas",
data_connector_name="runtime",
data_asset_name=suite_name,
runtime_parameters={"batch_data": df},
batch_identifiers={"default_identifier_name": "default"},
),
expectation_suite_name=suite_name,
)
# Row count expectation
validator.expect_table_row_count_to_be_between(min_value=1000)
# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("user_id")
# Distribution check: daily orders should be within 50% of 30-day average
validator.expect_column_mean_to_be_between("total_usd", min_value=20, max_value=500)
results = validator.validate()
return results.success
Modern Data Stack (2026)
| Layer | Options | Recommendation |
|---|---|---|
| Ingestion | Fivetran, Airbyte, custom Python | Fivetran for standard connectors; custom for proprietary sources |
| Storage | S3/GCS + Iceberg/Delta Lake | S3 + Iceberg for open format |
| Warehouse | Snowflake, BigQuery, Redshift | BigQuery for GCP; Snowflake for multi-cloud |
| Transformation | dbt | dbt (industry standard) |
| Orchestration | Airflow, Prefect, Dagster | Airflow for mature teams; Prefect for simplicity |
| BI | Metabase, Looker, Tableau | Metabase (open source, fast); Looker for enterprise |
Implementation Cost
| Scope | Timeline | Investment |
|---|---|---|
| Ingestion pipeline (3–5 sources) | 3–6 weeks | $15,000–$35,000 |
| dbt project setup + models | 2–4 weeks | $10,000–$25,000 |
| Airflow orchestration | 2–4 weeks | $8,000–$20,000 |
| Full data platform | 3–6 months | $80,000–$200,000 |
Infrastructure: Snowflake/BigQuery ($500–$5,000/month at scale), Airflow on ECS (~$100–$300/month), dbt Cloud ($50–$500/month).
Our Approach at Viprasol
We build data engineering platforms — ingestion pipelines, dbt transformation layers, Airflow orchestration, and data quality monitoring.
→ Data pipeline consultation →
→ Data Analytics Consulting →
→ AI & Machine Learning Services →
External Resources
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 1000+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement.
Want to Implement AI in Your Business?
From chatbots to predictive models — harness the power of AI with a team that delivers.
Free consultation • No commitment • Response within 24 hours
Ready to automate your business with AI agents?
We build custom multi-agent AI systems that handle sales, support, ops, and content — across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.