Data Pipeline Architecture: Batch vs Streaming, Airflow vs Prefect, dbt, and Warehouse Design
Design production data pipelines: choose between batch and streaming, compare Airflow vs Prefect vs Dagster, implement dbt transformations, and architect a modern data warehouse with medallion layers.
The modern data stack has stabilized around a set of proven patterns: ELT over ETL, cloud warehouses with near-unlimited compute, orchestrators that are actually debuggable, and dbt as the SQL transformation layer that makes warehouse code testable.
The decisions that still require careful thought: batch vs. streaming, orchestrator choice, warehouse selection, and how to structure the medallion layers for your specific query patterns.
ELT vs ETL: The Modern Default
ETL (Extract-Transform-Load): Transform data before loading into the warehouse. Requires maintaining transform logic in the ETL tool, often loses raw data.
ELT (Extract-Load-Transform): Load raw data first, transform inside the warehouse using SQL. This is the correct default in 2026:
- Raw data is preserved — you can re-derive any transformation
- Warehouse compute is cheap (Snowflake, BigQuery, Redshift all have auto-scaling)
- SQL is more accessible than Spark/Java ETL code
- dbt makes SQL transformations testable, documented, and version-controlled
When ETL is still right: Sensitive data that must be masked before storage (PII), extremely high volume requiring pre-aggregation before warehouse ingestion.
Batch vs Streaming
Choose BATCH when:
✅ Data freshness of 1–24 hours is acceptable
✅ Source systems can handle bulk reads
✅ Queries don't need to answer "what happened in the last 5 minutes"
✅ You have < 5 engineers on the data team
✅ Total pipeline latency < 1 hour is fine
Choose STREAMING when:
✅ Real-time dashboards or alerts required
✅ Fraud detection, anomaly detection
✅ Operational use cases (not just analytics)
✅ Event-driven downstream consumers
✅ Data volume > 10 GB/hour continuous
Most analytics workloads: START WITH BATCH.
Add streaming selectively for the 5–10% of use cases that need it.
☁️ Is Your Cloud Costing Too Much?
Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.
- AWS, GCP, Azure certified engineers
- Infrastructure as Code (Terraform, CDK)
- Docker, Kubernetes, GitHub Actions CI/CD
- Typical audit recovers $500–$3,000/month in savings
Medallion Architecture
Bronze (Raw) → Exact copy of source data, immutable
Silver (Cleaned) → Validated, standardized, de-duped
Gold (Business) → Business logic applied, aggregated, presentation-ready
Example flow for orders data:
Source DB ──► Bronze ──► Silver ──► Gold ──► Dashboard
orders orders daily_ BI tools
(raw) (cleaned) revenue APIs
Bronze Layer: Raw Ingestion
# src/pipelines/ingest/orders_ingest.py
# Bronze: exact copy of source, no transformations
import boto3
import psycopg2
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, timezone
def ingest_orders_to_bronze(
source_conn_string: str,
s3_bucket: str,
watermark: datetime
) -> int:
"""
Extract orders from production Postgres, write to S3 as Parquet.
No transformations — exact copy for auditability.
"""
s3 = boto3.client("s3")
with psycopg2.connect(source_conn_string) as conn:
# Use server-side cursor for large tables
with conn.cursor(name="orders_extract") as cursor:
cursor.itersize = 10_000 # Fetch 10k rows at a time
cursor.execute(
"""
SELECT
id, user_id, tenant_id, status, total_cents, currency,
created_at, updated_at, metadata
FROM orders
WHERE updated_at > %s
ORDER BY updated_at ASC
""",
(watermark,)
)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
if not rows:
return 0
# Write as Parquet (columnar, compressed, schema-aware)
table = pa.Table.from_pydict(
{col: [row[i] for row in rows] for i, col in enumerate(columns)}
)
# Partition by date for efficient querying
extraction_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
s3_key = f"bronze/orders/dt={extraction_date}/batch_{datetime.now().timestamp():.0f}.parquet"
# Write to buffer then upload
buffer = pa.BufferOutputStream()
pq.write_table(table, buffer, compression="snappy")
s3.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=buffer.getvalue().to_pybytes()
)
print(f"Ingested {len(rows)} orders to s3://{s3_bucket}/{s3_key}")
return len(rows)
Orchestration: Airflow vs Prefect vs Dagster
| Factor | Airflow | Prefect | Dagster |
|---|---|---|---|
| Learning curve | High | Low | Medium |
| Local development | Painful (Docker required) | Excellent (Python native) | Good |
| Dynamic pipelines | Workarounds required | First-class | First-class |
| Data awareness | None | Limited | Full (assets) |
| Observability | Basic UI | Good UI | Excellent (asset catalog) |
| Self-hosted cost | High (ops burden) | Medium | Medium |
| Managed cost | Astronomer: $300–$1,500/mo | Prefect Cloud: $0–$500/mo | Dagster Cloud: $0–$1,000/mo |
| Best for | Existing Airflow shops | New pipelines, simplicity | Data asset management |
Recommendation: Prefect for new projects under 20 pipelines. Dagster when you need asset lineage and a data catalog. Stay on Airflow only if you're already on it and it works.
Prefect Flow Example
# src/pipelines/flows/daily_revenue.py
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
from typing import Optional
import pandas as pd
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def extract_orders(date: str) -> pd.DataFrame:
logger = get_run_logger()
logger.info(f"Extracting orders for {date}")
# Read from Bronze (Parquet on S3)
df = pd.read_parquet(
f"s3://data-lake/bronze/orders/dt={date}/",
storage_options={"anon": False}
)
logger.info(f"Extracted {len(df)} rows")
return df
@task
def transform_to_silver(df: pd.DataFrame) -> pd.DataFrame:
# Clean and standardize
df = df.copy()
df["total_usd"] = df["total_cents"] / 100
df["created_at"] = pd.to_datetime(df["created_at"], utc=True)
df["status"] = df["status"].str.lower().str.strip()
# Remove duplicates (keep latest updated_at)
df = df.sort_values("updated_at").drop_duplicates(subset=["id"], keep="last")
# Validate
assert df["total_usd"].ge(0).all(), "Negative order totals found"
assert df["id"].notna().all(), "Null order IDs found"
return df
@task
def load_to_warehouse(df: pd.DataFrame, table: str) -> int:
"""Load to Snowflake using COPY INTO for efficiency."""
import snowflake.connector
conn = snowflake.connector.connect(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse="transform_wh",
database="analytics",
schema="silver",
)
# Write to stage then COPY INTO (much faster than row-by-row INSERT)
df.to_parquet("/tmp/load.parquet", index=False)
cursor = conn.cursor()
cursor.execute(f"PUT file:///tmp/load.parquet @%{table}")
cursor.execute(f"COPY INTO {table} FROM @%{table} FILE_FORMAT=(TYPE=PARQUET)")
return len(df)
@flow(name="daily-revenue-pipeline", log_prints=True)
def daily_revenue_pipeline(date: Optional[str] = None) -> None:
from datetime import date as date_cls
run_date = date or date_cls.today().isoformat()
# Tasks run sequentially (add .submit() for parallel)
raw_df = extract_orders(run_date)
clean_df = transform_to_silver(raw_df)
rows_loaded = load_to_warehouse(clean_df, "orders")
print(f"Pipeline complete: {rows_loaded} rows loaded for {run_date}")
# Run locally: python src/pipelines/flows/daily_revenue.py
if __name__ == "__main__":
daily_revenue_pipeline()
⚙️ DevOps Done Right — Zero Downtime, Full Automation
Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.
- Staging + production environments with feature flags
- Automated security scanning in the pipeline
- Uptime monitoring + alerting + runbook automation
- On-call support handover docs included
dbt Transformation Layer
dbt turns SQL into a testable, version-controlled transformation system:
-- models/silver/orders.sql
-- dbt model: transforms bronze → silver orders
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns',
cluster_by=['date_trunc(\'day\', created_at)'],
tags=['silver', 'orders']
)
}}
WITH source AS (
SELECT *
FROM {{ source('bronze', 'orders') }}
{% if is_incremental() %}
-- Incremental: only process records updated since last run
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
cleaned AS (
SELECT
id,
user_id,
tenant_id,
LOWER(TRIM(status)) AS status,
total_cents / 100.0 AS total_usd,
UPPER(currency) AS currency,
DATE_TRUNC('day', created_at) AS order_date,
created_at,
updated_at,
-- Parse JSON metadata safely
TRY_PARSE_JSON(metadata) AS metadata,
-- Deduplication: latest row per id
ROW_NUMBER() OVER (
PARTITION BY id ORDER BY updated_at DESC
) AS row_num
FROM source
WHERE id IS NOT NULL
)
SELECT
id,
user_id,
tenant_id,
status,
total_usd,
currency,
order_date,
created_at,
updated_at,
metadata
FROM cleaned
WHERE row_num = 1
# models/silver/schema.yml
version: 2
models:
- name: orders
description: "Cleaned and deduplicated orders from the production database"
config:
meta:
owner: data-team
freshness_warn_after: {hours: 4}
freshness_error_after: {hours: 24}
columns:
- name: id
description: "Order UUID (primary key)"
tests:
- unique
- not_null
- name: status
description: "Order status"
tests:
- not_null
- accepted_values:
values: ['pending', 'processing', 'completed', 'cancelled', 'refunded']
- name: total_usd
description: "Order total in USD"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 100000
- name: currency
tests:
- not_null
- accepted_values:
values: ['USD', 'EUR', 'GBP', 'INR', 'AUD', 'CAD']
-- models/gold/daily_revenue.sql
-- Gold layer: business-ready aggregation
{{
config(
materialized='table',
tags=['gold', 'revenue', 'finance']
)
}}
SELECT
order_date,
tenant_id,
currency,
COUNT(*) AS order_count,
SUM(total_usd) AS gross_revenue_usd,
SUM(CASE WHEN status = 'refunded' THEN total_usd ELSE 0 END) AS refunded_usd,
SUM(CASE WHEN status != 'refunded' THEN total_usd ELSE 0 END) AS net_revenue_usd,
COUNT(DISTINCT user_id) AS unique_customers,
AVG(total_usd) AS avg_order_value_usd,
MEDIAN(total_usd) AS median_order_value_usd
FROM {{ ref('orders') }}
WHERE status != 'cancelled'
GROUP BY 1, 2, 3
ORDER BY 1 DESC, 3 DESC
Warehouse Selection
| Warehouse | Best For | Pricing Model | 2026 Cost (1TB data, 10K queries/day) |
|---|---|---|---|
| Snowflake | Enterprise, complex queries | Compute credits + storage | $400–$800/mo |
| BigQuery | Event data, ad-hoc | Per-query + storage | $200–$600/mo |
| Redshift | AWS ecosystem, predictable workloads | Reserved instances | $300–$700/mo |
| DuckDB | Single-node, local/edge analytics | Free (open source) | $0 (+ infra) |
| ClickHouse | High-volume events, time-series | Open source or managed | $100–$400/mo |
Recommendation: BigQuery for early-stage (no upfront, pay-per-query during development). Snowflake for scale (better query performance, easier cost predictability).
Streaming Addition: Kafka + Flink
When you need real-time:
# src/streaming/consumers/fraud_detector.py
from flink.table import TableEnvironment
from flink.table.descriptors import Schema, Kafka, Json
env = TableEnvironment.create(...)
# Read from Kafka
env.execute_sql("""
CREATE TABLE payments (
payment_id STRING,
user_id STRING,
amount_usd DOUBLE,
country STRING,
device_id STRING,
`timestamp` TIMESTAMP(3),
WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'payments.events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Detect high-velocity transactions (> 5 in 60 seconds per user)
env.execute_sql("""
SELECT
user_id,
COUNT(*) AS tx_count,
SUM(amount_usd) AS total_amount,
TUMBLE_START(`timestamp`, INTERVAL '60' SECOND) AS window_start
FROM payments
GROUP BY
user_id,
TUMBLE(`timestamp`, INTERVAL '60' SECOND)
HAVING COUNT(*) > 5
""")
See Also
- Real-Time Analytics Architecture — streaming analytics patterns
- PostgreSQL Full-Text Search — Postgres as analytics engine
- Event Sourcing Architecture — event-driven data pipelines
- Infrastructure Cost Tagging — tagging data pipeline costs
Working With Viprasol
We design and build data pipelines that scale from startup to enterprise: Prefect/Airflow orchestration, dbt transformation layers, Snowflake/BigQuery warehouse design, and the medallion architecture that keeps raw data available for reprocessing. Our clients have reduced pipeline failure rates by 80% and cut warehouse costs by 40% through proper architecture.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need DevOps & Cloud Expertise?
Scale your infrastructure with confidence. AWS, GCP, Azure certified team.
Free consultation • No commitment • Response within 24 hours
Making sense of your data at scale?
Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.