Back to Blog

Data Pipeline Architecture: Batch vs Streaming, Airflow vs Prefect, dbt, and Warehouse Design

Design production data pipelines: choose between batch and streaming, compare Airflow vs Prefect vs Dagster, implement dbt transformations, and architect a modern data warehouse with medallion layers.

Viprasol Tech Team
September 18, 2026
14 min read

The modern data stack has stabilized around a set of proven patterns: ELT over ETL, cloud warehouses with near-unlimited compute, orchestrators that are actually debuggable, and dbt as the SQL transformation layer that makes warehouse code testable.

The decisions that still require careful thought: batch vs. streaming, orchestrator choice, warehouse selection, and how to structure the medallion layers for your specific query patterns.


ELT vs ETL: The Modern Default

ETL (Extract-Transform-Load): Transform data before loading into the warehouse. Requires maintaining transform logic in the ETL tool, often loses raw data.

ELT (Extract-Load-Transform): Load raw data first, transform inside the warehouse using SQL. This is the correct default in 2026:

  • Raw data is preserved — you can re-derive any transformation
  • Warehouse compute is cheap (Snowflake, BigQuery, Redshift all have auto-scaling)
  • SQL is more accessible than Spark/Java ETL code
  • dbt makes SQL transformations testable, documented, and version-controlled

When ETL is still right: Sensitive data that must be masked before storage (PII), extremely high volume requiring pre-aggregation before warehouse ingestion.


Batch vs Streaming

Choose BATCH when:
✅ Data freshness of 1–24 hours is acceptable
✅ Source systems can handle bulk reads
✅ Queries don't need to answer "what happened in the last 5 minutes"
✅ You have < 5 engineers on the data team
✅ Total pipeline latency < 1 hour is fine

Choose STREAMING when:
✅ Real-time dashboards or alerts required
✅ Fraud detection, anomaly detection
✅ Operational use cases (not just analytics)
✅ Event-driven downstream consumers
✅ Data volume > 10 GB/hour continuous

Most analytics workloads: START WITH BATCH.
Add streaming selectively for the 5–10% of use cases that need it.

☁️ Is Your Cloud Costing Too Much?

Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.

  • AWS, GCP, Azure certified engineers
  • Infrastructure as Code (Terraform, CDK)
  • Docker, Kubernetes, GitHub Actions CI/CD
  • Typical audit recovers $500–$3,000/month in savings

Medallion Architecture

Bronze (Raw)     → Exact copy of source data, immutable
Silver (Cleaned) → Validated, standardized, de-duped
Gold (Business)  → Business logic applied, aggregated, presentation-ready

Example flow for orders data:
                    
Source DB ──► Bronze ──► Silver ──► Gold ──► Dashboard
             orders     orders     daily_    BI tools
             (raw)      (cleaned)  revenue   APIs

Bronze Layer: Raw Ingestion

# src/pipelines/ingest/orders_ingest.py
# Bronze: exact copy of source, no transformations

import boto3
import psycopg2
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, timezone

def ingest_orders_to_bronze(
    source_conn_string: str,
    s3_bucket: str,
    watermark: datetime
) -> int:
    """
    Extract orders from production Postgres, write to S3 as Parquet.
    No transformations — exact copy for auditability.
    """
    s3 = boto3.client("s3")
    
    with psycopg2.connect(source_conn_string) as conn:
        # Use server-side cursor for large tables
        with conn.cursor(name="orders_extract") as cursor:
            cursor.itersize = 10_000  # Fetch 10k rows at a time
            cursor.execute(
                """
                SELECT 
                    id, user_id, tenant_id, status, total_cents, currency,
                    created_at, updated_at, metadata
                FROM orders
                WHERE updated_at > %s
                ORDER BY updated_at ASC
                """,
                (watermark,)
            )
            
            columns = [desc[0] for desc in cursor.description]
            rows = cursor.fetchall()
    
    if not rows:
        return 0
    
    # Write as Parquet (columnar, compressed, schema-aware)
    table = pa.Table.from_pydict(
        {col: [row[i] for row in rows] for i, col in enumerate(columns)}
    )
    
    # Partition by date for efficient querying
    extraction_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    s3_key = f"bronze/orders/dt={extraction_date}/batch_{datetime.now().timestamp():.0f}.parquet"
    
    # Write to buffer then upload
    buffer = pa.BufferOutputStream()
    pq.write_table(table, buffer, compression="snappy")
    s3.put_object(
        Bucket=s3_bucket,
        Key=s3_key,
        Body=buffer.getvalue().to_pybytes()
    )
    
    print(f"Ingested {len(rows)} orders to s3://{s3_bucket}/{s3_key}")
    return len(rows)

Orchestration: Airflow vs Prefect vs Dagster

FactorAirflowPrefectDagster
Learning curveHighLowMedium
Local developmentPainful (Docker required)Excellent (Python native)Good
Dynamic pipelinesWorkarounds requiredFirst-classFirst-class
Data awarenessNoneLimitedFull (assets)
ObservabilityBasic UIGood UIExcellent (asset catalog)
Self-hosted costHigh (ops burden)MediumMedium
Managed costAstronomer: $300–$1,500/moPrefect Cloud: $0–$500/moDagster Cloud: $0–$1,000/mo
Best forExisting Airflow shopsNew pipelines, simplicityData asset management

Recommendation: Prefect for new projects under 20 pipelines. Dagster when you need asset lineage and a data catalog. Stay on Airflow only if you're already on it and it works.

Prefect Flow Example

# src/pipelines/flows/daily_revenue.py
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
from typing import Optional
import pandas as pd

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
)
def extract_orders(date: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f"Extracting orders for {date}")
    
    # Read from Bronze (Parquet on S3)
    df = pd.read_parquet(
        f"s3://data-lake/bronze/orders/dt={date}/",
        storage_options={"anon": False}
    )
    
    logger.info(f"Extracted {len(df)} rows")
    return df

@task
def transform_to_silver(df: pd.DataFrame) -> pd.DataFrame:
    # Clean and standardize
    df = df.copy()
    df["total_usd"] = df["total_cents"] / 100
    df["created_at"] = pd.to_datetime(df["created_at"], utc=True)
    df["status"] = df["status"].str.lower().str.strip()
    
    # Remove duplicates (keep latest updated_at)
    df = df.sort_values("updated_at").drop_duplicates(subset=["id"], keep="last")
    
    # Validate
    assert df["total_usd"].ge(0).all(), "Negative order totals found"
    assert df["id"].notna().all(), "Null order IDs found"
    
    return df

@task
def load_to_warehouse(df: pd.DataFrame, table: str) -> int:
    """Load to Snowflake using COPY INTO for efficiency."""
    import snowflake.connector
    
    conn = snowflake.connector.connect(
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        user=os.environ["SNOWFLAKE_USER"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        warehouse="transform_wh",
        database="analytics",
        schema="silver",
    )
    
    # Write to stage then COPY INTO (much faster than row-by-row INSERT)
    df.to_parquet("/tmp/load.parquet", index=False)
    cursor = conn.cursor()
    cursor.execute(f"PUT file:///tmp/load.parquet @%{table}")
    cursor.execute(f"COPY INTO {table} FROM @%{table} FILE_FORMAT=(TYPE=PARQUET)")
    
    return len(df)

@flow(name="daily-revenue-pipeline", log_prints=True)
def daily_revenue_pipeline(date: Optional[str] = None) -> None:
    from datetime import date as date_cls
    
    run_date = date or date_cls.today().isoformat()
    
    # Tasks run sequentially (add .submit() for parallel)
    raw_df = extract_orders(run_date)
    clean_df = transform_to_silver(raw_df)
    rows_loaded = load_to_warehouse(clean_df, "orders")
    
    print(f"Pipeline complete: {rows_loaded} rows loaded for {run_date}")

# Run locally: python src/pipelines/flows/daily_revenue.py
if __name__ == "__main__":
    daily_revenue_pipeline()

⚙️ DevOps Done Right — Zero Downtime, Full Automation

Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.

  • Staging + production environments with feature flags
  • Automated security scanning in the pipeline
  • Uptime monitoring + alerting + runbook automation
  • On-call support handover docs included

dbt Transformation Layer

dbt turns SQL into a testable, version-controlled transformation system:

-- models/silver/orders.sql
-- dbt model: transforms bronze → silver orders

{{
  config(
    materialized='incremental',
    unique_key='id',
    on_schema_change='append_new_columns',
    cluster_by=['date_trunc(\'day\', created_at)'],
    tags=['silver', 'orders']
  )
}}

WITH source AS (
    SELECT *
    FROM {{ source('bronze', 'orders') }}
    {% if is_incremental() %}
        -- Incremental: only process records updated since last run
        WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
    {% endif %}
),

cleaned AS (
    SELECT
        id,
        user_id,
        tenant_id,
        LOWER(TRIM(status))                              AS status,
        total_cents / 100.0                              AS total_usd,
        UPPER(currency)                                  AS currency,
        DATE_TRUNC('day', created_at)                    AS order_date,
        created_at,
        updated_at,
        -- Parse JSON metadata safely
        TRY_PARSE_JSON(metadata)                         AS metadata,
        -- Deduplication: latest row per id
        ROW_NUMBER() OVER (
            PARTITION BY id ORDER BY updated_at DESC
        )                                                AS row_num
    FROM source
    WHERE id IS NOT NULL
)

SELECT
    id,
    user_id,
    tenant_id,
    status,
    total_usd,
    currency,
    order_date,
    created_at,
    updated_at,
    metadata
FROM cleaned
WHERE row_num = 1
# models/silver/schema.yml
version: 2

models:
  - name: orders
    description: "Cleaned and deduplicated orders from the production database"
    config:
      meta:
        owner: data-team
        freshness_warn_after: {hours: 4}
        freshness_error_after: {hours: 24}
    columns:
      - name: id
        description: "Order UUID (primary key)"
        tests:
          - unique
          - not_null
      - name: status
        description: "Order status"
        tests:
          - not_null
          - accepted_values:
              values: ['pending', 'processing', 'completed', 'cancelled', 'refunded']
      - name: total_usd
        description: "Order total in USD"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 100000
      - name: currency
        tests:
          - not_null
          - accepted_values:
              values: ['USD', 'EUR', 'GBP', 'INR', 'AUD', 'CAD']
-- models/gold/daily_revenue.sql
-- Gold layer: business-ready aggregation

{{
  config(
    materialized='table',
    tags=['gold', 'revenue', 'finance']
  )
}}

SELECT
    order_date,
    tenant_id,
    currency,
    COUNT(*)                                             AS order_count,
    SUM(total_usd)                                       AS gross_revenue_usd,
    SUM(CASE WHEN status = 'refunded' THEN total_usd ELSE 0 END) AS refunded_usd,
    SUM(CASE WHEN status != 'refunded' THEN total_usd ELSE 0 END) AS net_revenue_usd,
    COUNT(DISTINCT user_id)                              AS unique_customers,
    AVG(total_usd)                                       AS avg_order_value_usd,
    MEDIAN(total_usd)                                    AS median_order_value_usd
FROM {{ ref('orders') }}
WHERE status != 'cancelled'
GROUP BY 1, 2, 3
ORDER BY 1 DESC, 3 DESC

Warehouse Selection

WarehouseBest ForPricing Model2026 Cost (1TB data, 10K queries/day)
SnowflakeEnterprise, complex queriesCompute credits + storage$400–$800/mo
BigQueryEvent data, ad-hocPer-query + storage$200–$600/mo
RedshiftAWS ecosystem, predictable workloadsReserved instances$300–$700/mo
DuckDBSingle-node, local/edge analyticsFree (open source)$0 (+ infra)
ClickHouseHigh-volume events, time-seriesOpen source or managed$100–$400/mo

Recommendation: BigQuery for early-stage (no upfront, pay-per-query during development). Snowflake for scale (better query performance, easier cost predictability).


Streaming Addition: Kafka + Flink

When you need real-time:

# src/streaming/consumers/fraud_detector.py
from flink.table import TableEnvironment
from flink.table.descriptors import Schema, Kafka, Json

env = TableEnvironment.create(...)

# Read from Kafka
env.execute_sql("""
    CREATE TABLE payments (
        payment_id     STRING,
        user_id        STRING,
        amount_usd     DOUBLE,
        country        STRING,
        device_id      STRING,
        `timestamp`    TIMESTAMP(3),
        WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'payments.events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""")

# Detect high-velocity transactions (> 5 in 60 seconds per user)
env.execute_sql("""
    SELECT
        user_id,
        COUNT(*) AS tx_count,
        SUM(amount_usd) AS total_amount,
        TUMBLE_START(`timestamp`, INTERVAL '60' SECOND) AS window_start
    FROM payments
    GROUP BY
        user_id,
        TUMBLE(`timestamp`, INTERVAL '60' SECOND)
    HAVING COUNT(*) > 5
""")

See Also


Working With Viprasol

We design and build data pipelines that scale from startup to enterprise: Prefect/Airflow orchestration, dbt transformation layers, Snowflake/BigQuery warehouse design, and the medallion architecture that keeps raw data available for reprocessing. Our clients have reduced pipeline failure rates by 80% and cut warehouse costs by 40% through proper architecture.

Cloud & data engineering services → | Start a project →

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need DevOps & Cloud Expertise?

Scale your infrastructure with confidence. AWS, GCP, Azure certified team.

Free consultation • No commitment • Response within 24 hours

Viprasol · Big Data & Analytics

Making sense of your data at scale?

Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.