Back to Blog

PostgreSQL Logical Decoding: Change Data Capture, Debezium, and Real-Time Data Pipelines

Implement Change Data Capture with PostgreSQL logical decoding. Covers wal2json and pgoutput plugins, Debezium connector setup, consuming WAL events in Node.js, replication slot management, lag monitoring, and building real-time sync pipelines.

Viprasol Tech Team
May 20, 2027
12 min read

Change Data Capture (CDC) with PostgreSQL logical decoding lets you stream every INSERT, UPDATE, and DELETE from your database to downstream systems — without polling, without triggers, and without touching application code. It's the backbone of real-time search index updates, event sourcing, audit trails, and multi-database sync.

The WAL (Write-Ahead Log) is PostgreSQL's truth. Logical decoding reads it in real time and converts it into structured change events.

Prerequisites and Configuration

-- Check current WAL level (must be 'logical' for CDC)
SHOW wal_level;
-- If 'replica' or 'minimal': change in postgresql.conf and restart

-- postgresql.conf changes required:
-- wal_level = logical          -- Enable logical decoding
-- max_replication_slots = 10   -- Max concurrent replication slots
-- max_wal_senders = 10         -- Max concurrent WAL sender processes

-- Verify after restart
SHOW wal_level;    -- Should be 'logical'

-- Grant replication permission to your CDC user
CREATE USER cdc_user WITH REPLICATION LOGIN PASSWORD 'strong_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO cdc_user;

Replication Slots

-- Create a logical replication slot with wal2json plugin
SELECT pg_create_logical_replication_slot(
  'myapp_cdc',     -- slot name (unique per slot)
  'wal2json'       -- output plugin: wal2json | pgoutput | test_decoding
);

-- List all replication slots and their lag
SELECT
  slot_name,
  plugin,
  slot_type,
  active,
  restart_lsn,
  confirmed_flush_lsn,
  -- WAL lag in bytes (CRITICAL: monitor this — unread WAL blocks disk cleanup)
  pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_human
FROM pg_replication_slots;

-- ⚠️  CRITICAL WARNING: Replication slot lag
-- If a consumer stops reading, PostgreSQL retains ALL WAL since the slot's LSN
-- This can fill your disk. Monitor lag_bytes and alert > 1GB.

-- Drop a slot when no longer needed
SELECT pg_drop_replication_slot('myapp_cdc');

☁️ Is Your Cloud Costing Too Much?

Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.

  • AWS, GCP, Azure certified engineers
  • Infrastructure as Code (Terraform, CDK)
  • Docker, Kubernetes, GitHub Actions CI/CD
  • Typical audit recovers $500–$3,000/month in savings

wal2json: Direct Consumption in Node.js

// lib/cdc/wal2json-consumer.ts
import { Client } from "pg";

interface Wal2JsonChange {
  kind:         "insert" | "update" | "delete";
  schema:       string;
  table:        string;
  columnnames?: string[];
  columnvalues?: unknown[];
  oldkeys?:     { keynames: string[]; keyvalues: unknown[] };
}

interface Wal2JsonPayload {
  change: Wal2JsonChange[];
}

export async function startCDCConsumer(
  slotName:  string,
  tables:    string[],           // Tables to filter
  handler:   (change: Wal2JsonChange) => Promise<void>
): Promise<() => void> {
  // CDC consumer needs its own dedicated connection
  const client = new Client({
    host:     process.env.POSTGRES_HOST,
    port:     parseInt(process.env.POSTGRES_PORT ?? "5432"),
    database: process.env.POSTGRES_DB,
    user:     "cdc_user",
    password: process.env.CDC_PASSWORD,
    // Replication connections use a different protocol
    replication: "database",
  });

  await client.connect();

  // Start streaming from the replication slot
  await client.query(`
    START_REPLICATION SLOT ${slotName} LOGICAL 0/0
    (
      "pretty-print" '0',
      "include-lsn" '1',
      "include-timestamp" '1',
      "add-tables" '${tables.map((t) => `public.${t}`).join(",")}'
    )
  `);

  // wal2json emits binary messages; decode them
  client.on("replicationMessage", async (msg) => {
    if (!msg.log) return; // Skip keepalive messages

    try {
      const payload: Wal2JsonPayload = JSON.parse(msg.log.toString());

      for (const change of payload.change) {
        await handler(change);
      }

      // Acknowledge LSN so PostgreSQL can clean up WAL
      await client.query(
        `SELECT pg_replication_slot_advance('${slotName}', '${msg.lsn}')`
      );
    } catch (err) {
      console.error("[cdc] Error processing change:", err);
    }
  });

  return () => {
    void client.end();
  };
}

// Convert wal2json column arrays to an object
export function changeToRecord(change: Wal2JsonChange): Record<string, unknown> {
  if (!change.columnnames || !change.columnvalues) return {};
  return Object.fromEntries(
    change.columnnames.map((name, i) => [name, change.columnvalues![i]])
  );
}

Debezium: Production CDC with Kafka

For high-throughput production workloads, Debezium is the standard choice. It handles replication slot management, offset tracking, schema evolution, and at-least-once delivery.

# docker-compose.yml for local Debezium development
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.1
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-connect:
    image: debezium/connect:2.6
    depends_on: [kafka]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID:          "1"
      CONFIG_STORAGE_TOPIC:   debezium-configs
      OFFSET_STORAGE_TOPIC:   debezium-offsets
      STATUS_STORAGE_TOPIC:   debezium-status
# Register Debezium PostgreSQL connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "myapp-postgres-connector",
    "config": {
      "connector.class":          "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname":        "postgres",
      "database.port":            "5432",
      "database.user":            "cdc_user",
      "database.password":        "strong_password",
      "database.dbname":          "myapp",
      "database.server.name":     "myapp",
      "plugin.name":              "pgoutput",
      "slot.name":                "debezium_slot",
      "publication.name":         "debezium_publication",

      "table.include.list":       "public.users,public.workspaces,public.invoices",
      "column.exclude.list":      "public.users.password_hash",

      "transforms":               "unwrap",
      "transforms.unwrap.type":   "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",

      "topic.prefix":             "myapp",
      "heartbeat.interval.ms":    "10000"
    }
  }'

⚙️ DevOps Done Right — Zero Downtime, Full Automation

Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.

  • Staging + production environments with feature flags
  • Automated security scanning in the pipeline
  • Uptime monitoring + alerting + runbook automation
  • On-call support handover docs included

Consuming Debezium Events in Node.js (KafkaJS)

// workers/cdc-consumer.ts
import { Kafka } from "kafkajs";
import { prisma } from "@/lib/prisma";
import { searchIndex } from "@/lib/search";

const kafka = new Kafka({
  clientId: "myapp-cdc-consumer",
  brokers:  [process.env.KAFKA_BROKER!],
});

interface DebeziumEnvelope {
  before: Record<string, unknown> | null;
  after:  Record<string, unknown> | null;
  op:     "c" | "u" | "d" | "r";   // create/update/delete/read(snapshot)
  ts_ms:  number;
  source: { table: string; schema: string };
}

export async function startDebeziumConsumer(): Promise<void> {
  const consumer = kafka.consumer({ groupId: "myapp-cdc-group" });
  await consumer.connect();

  await consumer.subscribe({
    topics: [
      "myapp.public.users",
      "myapp.public.invoices",
    ],
    fromBeginning: false,
  });

  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      if (!message.value) return; // Tombstone (delete marker)

      const envelope: DebeziumEnvelope = JSON.parse(message.value.toString());
      const table = topic.split(".").pop()!;

      await handleChange(table, envelope);
    },
  });
}

async function handleChange(
  table:    string,
  envelope: DebeziumEnvelope
): Promise<void> {
  switch (table) {
    case "users":
      if (envelope.op === "u" || envelope.op === "c") {
        const user = envelope.after!;
        // Sync to search index
        await searchIndex.upsert("users", {
          id:       user.id as string,
          name:     user.name as string,
          email:    user.email as string,
        });
      } else if (envelope.op === "d") {
        await searchIndex.delete("users", envelope.before!.id as string);
      }
      break;

    case "invoices":
      // Invalidate cache, update analytics, etc.
      if (envelope.op === "u" && envelope.after?.status === "paid") {
        // Invoice marked paid — trigger downstream events
        await triggerInvoicePaidWebhooks(envelope.after.id as string);
      }
      break;
  }
}

Monitoring Replication Slot Lag

-- Alert when any slot lag exceeds 500MB
SELECT
  slot_name,
  pg_size_pretty(
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)
  ) AS lag,
  pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
  active
FROM pg_replication_slots
WHERE pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) > 524288000; -- 500MB

-- CloudWatch custom metric for lag (run every minute)
-- aws cloudwatch put-metric-data --namespace PostgreSQL/Replication
--   --metric-name ReplicationLagBytes --value <lag_bytes>

Cost and Timeline Estimates

ScopeTeamTimelineCost Range
Direct wal2json consumer (Node.js)1–2 devs2–3 days$800–1,500
Debezium + Kafka setup (local/dev)1–2 devs3–5 days$1,200–2,500
Production Debezium + MSK + monitoring2 devs1–2 weeks$4,000–9,000
AWS MSK (Kafka) m5.large × 3 brokers~$600/month

See Also


Working With Viprasol

CDC with logical decoding is the right answer for keeping multiple systems in sync — Elasticsearch for search, Redis for cache invalidation, analytics warehouses, audit logs. Our team implements the full pipeline: configuring wal_level=logical, managing replication slots with lag monitoring, and building consumers (wal2json for simple cases, Debezium+Kafka for production scale).

What we deliver:

  • postgresql.conf wal_level=logical setup with slot management and pg_replication_slots lag query
  • startCDCConsumer Node.js function: pg replication connection, wal2json JSON parse, LSN acknowledgment
  • changeToRecord helper: columnnames + columnvalues → plain object
  • Debezium connector JSON with column.exclude.list for PII, ExtractNewRecordState transform
  • KafkaJS consumer: eachMessage handler, Debezium envelope op c/u/d/r parsing
  • Monitoring query: slots with lag > 500MB (CloudWatch custom metric pattern)

Talk to our team about your real-time data pipeline architecture →

Or explore our cloud and data engineering services.

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need DevOps & Cloud Expertise?

Scale your infrastructure with confidence. AWS, GCP, Azure certified team.

Free consultation • No commitment • Response within 24 hours

Viprasol · Big Data & Analytics

Making sense of your data at scale?

Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.