Data Pipeline Architecture: Batch vs Streaming, Airflow vs Prefect, dbt, and Warehouse Design
Design production data pipelines: choose between batch and streaming, compare Airflow vs Prefect vs Dagster, implement dbt transformations, and architect a modern data warehouse with medallion layers.
The modern data stack has stabilized around a set of proven patterns: ELT over ETL, cloud warehouses with near-unlimited compute, orchestrators that are actually debuggable, and dbt as the SQL transformation layer that makes warehouse code testable.
The decisions that still require careful thought: batch vs. streaming, orchestrator choice, warehouse selection, and how to structure the medallion layers for your specific query patterns.
ELT vs ETL: The Modern Default
ETL (Extract-Transform-Load): Transform data before loading into the warehouse. Requires maintaining transform logic in the ETL tool, often loses raw data.
ELT (Extract-Load-Transform): Load raw data first, transform inside the warehouse using SQL. This is the correct default in 2026:
- Raw data is preserved — you can re-derive any transformation
- Warehouse compute is cheap (Snowflake, BigQuery, Redshift all have auto-scaling)
- SQL is more accessible than Spark/Java ETL code
- dbt makes SQL transformations testable, documented, and version-controlled
When ETL is still right: Sensitive data that must be masked before storage (PII), extremely high volume requiring pre-aggregation before warehouse ingestion.
Batch vs Streaming
Choose BATCH when:
✅ Data freshness of 1–24 hours is acceptable
✅ Source systems can handle bulk reads
✅ Queries don't need to answer "what happened in the last 5 minutes"
✅ You have < 5 engineers on the data team
✅ Total pipeline latency < 1 hour is fine
Choose STREAMING when:
✅ Real-time dashboards or alerts required
✅ Fraud detection, anomaly detection
✅ Operational use cases (not just analytics)
✅ Event-driven downstream consumers
✅ Data volume > 10 GB/hour continuous
Most analytics workloads: START WITH BATCH.
Add streaming selectively for the 5–10% of use cases that need it.
☁️ Is Your Cloud Costing Too Much?
Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.
- AWS, GCP, Azure certified engineers
- Infrastructure as Code (Terraform, CDK)
- Docker, Kubernetes, GitHub Actions CI/CD
- Typical audit recovers $500–$3,000/month in savings
Medallion Architecture
Bronze (Raw) → Exact copy of source data, immutable
Silver (Cleaned) → Validated, standardized, de-duped
Gold (Business) → Business logic applied, aggregated, presentation-ready
Example flow for orders data:
Source DB ──► Bronze ──► Silver ──► Gold ──► Dashboard
orders orders daily_ BI tools
(raw) (cleaned) revenue APIs
Bronze Layer: Raw Ingestion
# src/pipelines/ingest/orders_ingest.py
# Bronze: exact copy of source, no transformations
import boto3
import psycopg2
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, timezone
def ingest_orders_to_bronze(
source_conn_string: str,
s3_bucket: str,
watermark: datetime
) -> int:
"""
Extract orders from production Postgres, write to S3 as Parquet.
No transformations — exact copy for auditability.
"""
s3 = boto3.client("s3")
with psycopg2.connect(source_conn_string) as conn:
# Use server-side cursor for large tables
with conn.cursor(name="orders_extract") as cursor:
cursor.itersize = 10_000 # Fetch 10k rows at a time
cursor.execute(
"""
SELECT
id, user_id, tenant_id, status, total_cents, currency,
created_at, updated_at, metadata
FROM orders
WHERE updated_at > %s
ORDER BY updated_at ASC
""",
(watermark,)
)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
if not rows:
return 0
# Write as Parquet (columnar, compressed, schema-aware)
table = pa.Table.from_pydict(
{col: [row[i] for row in rows] for i, col in enumerate(columns)}
)
# Partition by date for efficient querying
extraction_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
s3_key = f"bronze/orders/dt={extraction_date}/batch_{datetime.now().timestamp():.0f}.parquet"
# Write to buffer then upload
buffer = pa.BufferOutputStream()
pq.write_table(table, buffer, compression="snappy")
s3.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=buffer.getvalue().to_pybytes()
)
print(f"Ingested {len(rows)} orders to s3://{s3_bucket}/{s3_key}")
return len(rows)
Orchestration: Airflow vs Prefect vs Dagster
| Factor | Airflow | Prefect | Dagster |
|---|---|---|---|
| Learning curve | High | Low | Medium |
| Local development | Painful (Docker required) | Excellent (Python native) | Good |
| Dynamic pipelines | Workarounds required | First-class | First-class |
| Data awareness | None | Limited | Full (assets) |
| Observability | Basic UI | Good UI | Excellent (asset catalog) |
| Self-hosted cost | High (ops burden) | Medium | Medium |
| Managed cost | Astronomer: $300–$1,500/mo | Prefect Cloud: $0–$500/mo | Dagster Cloud: $0–$1,000/mo |
| Best for | Existing Airflow shops | New pipelines, simplicity | Data asset management |
Recommendation: Prefect for new projects under 20 pipelines. Dagster when you need asset lineage and a data catalog. Stay on Airflow only if you're already on it and it works.
Prefect Flow Example
# src/pipelines/flows/daily_revenue.py
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
from typing import Optional
import pandas as pd
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def extract_orders(date: str) -> pd.DataFrame:
logger = get_run_logger()
logger.info(f"Extracting orders for {date}")
# Read from Bronze (Parquet on S3)
df = pd.read_parquet(
f"s3://data-lake/bronze/orders/dt={date}/",
storage_options={"anon": False}
)
logger.info(f"Extracted {len(df)} rows")
return df
@task
def transform_to_silver(df: pd.DataFrame) -> pd.DataFrame:
# Clean and standardize
df = df.copy()
df["total_usd"] = df["total_cents"] / 100
df["created_at"] = pd.to_datetime(df["created_at"], utc=True)
df["status"] = df["status"].str.lower().str.strip()
# Remove duplicates (keep latest updated_at)
df = df.sort_values("updated_at").drop_duplicates(subset=["id"], keep="last")
# Validate
assert df["total_usd"].ge(0).all(), "Negative order totals found"
assert df["id"].notna().all(), "Null order IDs found"
return df
@task
def load_to_warehouse(df: pd.DataFrame, table: str) -> int:
"""Load to Snowflake using COPY INTO for efficiency."""
import snowflake.connector
conn = snowflake.connector.connect(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse="transform_wh",
database="analytics",
schema="silver",
)
# Write to stage then COPY INTO (much faster than row-by-row INSERT)
df.to_parquet("/tmp/load.parquet", index=False)
cursor = conn.cursor()
cursor.execute(f"PUT file:///tmp/load.parquet @%{table}")
cursor.execute(f"COPY INTO {table} FROM @%{table} FILE_FORMAT=(TYPE=PARQUET)")
return len(df)
@flow(name="daily-revenue-pipeline", log_prints=True)
def daily_revenue_pipeline(date: Optional[str] = None) -> None:
from datetime import date as date_cls
run_date = date or date_cls.today().isoformat()
# Tasks run sequentially (add .submit() for parallel)
raw_df = extract_orders(run_date)
clean_df = transform_to_silver(raw_df)
rows_loaded = load_to_warehouse(clean_df, "orders")
print(f"Pipeline complete: {rows_loaded} rows loaded for {run_date}")
# Run locally: python src/pipelines/flows/daily_revenue.py
if __name__ == "__main__":
daily_revenue_pipeline()

⚙️ DevOps Done Right — Zero Downtime, Full Automation
Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.
- Staging + production environments with feature flags
- Automated security scanning in the pipeline
- Uptime monitoring + alerting + runbook automation
- On-call support handover docs included
Recommended Reading
dbt Transformation Layer
dbt turns SQL into a testable, version-controlled transformation system:
-- models/silver/orders.sql
-- dbt model: transforms bronze → silver orders
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns',
cluster_by=['date_trunc(\'day\', created_at)'],
tags=['silver', 'orders']
)
}}
WITH source AS (
SELECT *
FROM {{ source('bronze', 'orders') }}
{% if is_incremental() %}
-- Incremental: only process records updated since last run
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
cleaned AS (
SELECT
id,
user_id,
tenant_id,
LOWER(TRIM(status)) AS status,
total_cents / 100.0 AS total_usd,
UPPER(currency) AS currency,
DATE_TRUNC('day', created_at) AS order_date,
created_at,
updated_at,
-- Parse JSON metadata safely
TRY_PARSE_JSON(metadata) AS metadata,
-- Deduplication: latest row per id
ROW_NUMBER() OVER (
PARTITION BY id ORDER BY updated_at DESC
) AS row_num
FROM source
WHERE id IS NOT NULL
)
SELECT
id,
user_id,
tenant_id,
status,
total_usd,
currency,
order_date,
created_at,
updated_at,
metadata
FROM cleaned
WHERE row_num = 1
# models/silver/schema.yml
version: 2
models:
- name: orders
description: "Cleaned and deduplicated orders from the production database"
config:
meta:
owner: data-team
freshness_warn_after: {hours: 4}
freshness_error_after: {hours: 24}
columns:
- name: id
description: "Order UUID (primary key)"
tests:
- unique
- not_null
- name: status
description: "Order status"
tests:
- not_null
- accepted_values:
values: ['pending', 'processing', 'completed', 'cancelled', 'refunded']
- name: total_usd
description: "Order total in USD"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 100000
- name: currency
tests:
- not_null
- accepted_values:
values: ['USD', 'EUR', 'GBP', 'INR', 'AUD', 'CAD']
-- models/gold/daily_revenue.sql
-- Gold layer: business-ready aggregation
{{
config(
materialized='table',
tags=['gold', 'revenue', 'finance']
)
}}
SELECT
order_date,
tenant_id,
currency,
COUNT(*) AS order_count,
SUM(total_usd) AS gross_revenue_usd,
SUM(CASE WHEN status = 'refunded' THEN total_usd ELSE 0 END) AS refunded_usd,
SUM(CASE WHEN status != 'refunded' THEN total_usd ELSE 0 END) AS net_revenue_usd,
COUNT(DISTINCT user_id) AS unique_customers,
AVG(total_usd) AS avg_order_value_usd,
MEDIAN(total_usd) AS median_order_value_usd
FROM {{ ref('orders') }}
WHERE status != 'cancelled'
GROUP BY 1, 2, 3
ORDER BY 1 DESC, 3 DESC
Warehouse Selection
| Warehouse | Best For | Pricing Model | 2026 Cost (1TB data, 10K queries/day) |
|---|---|---|---|
| Snowflake | Enterprise, complex queries | Compute credits + storage | $400–$800/mo |
| BigQuery | Event data, ad-hoc | Per-query + storage | $200–$600/mo |
| Redshift | AWS ecosystem, predictable workloads | Reserved instances | $300–$700/mo |
| DuckDB | Single-node, local/edge analytics | Free (open source) | $0 (+ infra) |
| ClickHouse | High-volume events, time-series | Open source or managed | $100–$400/mo |
Recommendation: BigQuery for early-stage (no upfront, pay-per-query during development). Snowflake for scale (better query performance, easier cost predictability).
Streaming Addition: Kafka + Flink
When you need real-time:
# src/streaming/consumers/fraud_detector.py
from flink.table import TableEnvironment
from flink.table.descriptors import Schema, Kafka, Json
env = TableEnvironment.create(...)
# Read from Kafka
env.execute_sql("""
CREATE TABLE payments (
payment_id STRING,
user_id STRING,
amount_usd DOUBLE,
country STRING,
device_id STRING,
`timestamp` TIMESTAMP(3),
WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'payments.events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Detect high-velocity transactions (> 5 in 60 seconds per user)
env.execute_sql("""
SELECT
user_id,
COUNT(*) AS tx_count,
SUM(amount_usd) AS total_amount,
TUMBLE_START(`timestamp`, INTERVAL '60' SECOND) AS window_start
FROM payments
GROUP BY
user_id,
TUMBLE(`timestamp`, INTERVAL '60' SECOND)
HAVING COUNT(*) > 5
""")
Additional Resources
- Real-Time Analytics Architecture — streaming analytics patterns
- PostgreSQL Full-Text Search — Postgres as analytics engine
- Event Sourcing Architecture — event-driven data pipelines
- Infrastructure Cost Tagging — tagging data pipeline costs
The Viprasol Method
We design and build data pipelines that scale from startup to enterprise: Prefect/Airflow orchestration, dbt transformation layers, Snowflake/BigQuery warehouse design, and the medallion architecture that keeps raw data available for reprocessing. Our clients have reduced pipeline failure rates by 80% and cut warehouse costs by 40% through proper architecture.
External Resources
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 1000+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement.
Need DevOps & Cloud Expertise?
Scale your infrastructure with confidence. AWS, GCP, Azure certified team.
Free consultation • No commitment • Response within 24 hours
Making sense of your data at scale?
Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.