PostgreSQL Logical Decoding: Change Data Capture, Debezium, and Real-Time Data Pipelines
Implement Change Data Capture with PostgreSQL logical decoding. Covers wal2json and pgoutput plugins, Debezium connector setup, consuming WAL events in Node.js, replication slot management, lag monitoring, and building real-time sync pipelines.
Change Data Capture (CDC) with PostgreSQL logical decoding lets you stream every INSERT, UPDATE, and DELETE from your database to downstream systems — without polling, without triggers, and without touching application code. It's the backbone of real-time search index updates, event sourcing, audit trails, and multi-database sync.
The WAL (Write-Ahead Log) is PostgreSQL's truth. Logical decoding reads it in real time and converts it into structured change events.
Prerequisites and Configuration
-- Check current WAL level (must be 'logical' for CDC)
SHOW wal_level;
-- If 'replica' or 'minimal': change in postgresql.conf and restart
-- postgresql.conf changes required:
-- wal_level = logical -- Enable logical decoding
-- max_replication_slots = 10 -- Max concurrent replication slots
-- max_wal_senders = 10 -- Max concurrent WAL sender processes
-- Verify after restart
SHOW wal_level; -- Should be 'logical'
-- Grant replication permission to your CDC user
CREATE USER cdc_user WITH REPLICATION LOGIN PASSWORD 'strong_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO cdc_user;
Replication Slots
-- Create a logical replication slot with wal2json plugin
SELECT pg_create_logical_replication_slot(
'myapp_cdc', -- slot name (unique per slot)
'wal2json' -- output plugin: wal2json | pgoutput | test_decoding
);
-- List all replication slots and their lag
SELECT
slot_name,
plugin,
slot_type,
active,
restart_lsn,
confirmed_flush_lsn,
-- WAL lag in bytes (CRITICAL: monitor this — unread WAL blocks disk cleanup)
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_human
FROM pg_replication_slots;
-- ⚠️ CRITICAL WARNING: Replication slot lag
-- If a consumer stops reading, PostgreSQL retains ALL WAL since the slot's LSN
-- This can fill your disk. Monitor lag_bytes and alert > 1GB.
-- Drop a slot when no longer needed
SELECT pg_drop_replication_slot('myapp_cdc');
☁️ Is Your Cloud Costing Too Much?
Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.
- AWS, GCP, Azure certified engineers
- Infrastructure as Code (Terraform, CDK)
- Docker, Kubernetes, GitHub Actions CI/CD
- Typical audit recovers $500–$3,000/month in savings
wal2json: Direct Consumption in Node.js
// lib/cdc/wal2json-consumer.ts
import { Client } from "pg";
interface Wal2JsonChange {
kind: "insert" | "update" | "delete";
schema: string;
table: string;
columnnames?: string[];
columnvalues?: unknown[];
oldkeys?: { keynames: string[]; keyvalues: unknown[] };
}
interface Wal2JsonPayload {
change: Wal2JsonChange[];
}
export async function startCDCConsumer(
slotName: string,
tables: string[], // Tables to filter
handler: (change: Wal2JsonChange) => Promise<void>
): Promise<() => void> {
// CDC consumer needs its own dedicated connection
const client = new Client({
host: process.env.POSTGRES_HOST,
port: parseInt(process.env.POSTGRES_PORT ?? "5432"),
database: process.env.POSTGRES_DB,
user: "cdc_user",
password: process.env.CDC_PASSWORD,
// Replication connections use a different protocol
replication: "database",
});
await client.connect();
// Start streaming from the replication slot
await client.query(`
START_REPLICATION SLOT ${slotName} LOGICAL 0/0
(
"pretty-print" '0',
"include-lsn" '1',
"include-timestamp" '1',
"add-tables" '${tables.map((t) => `public.${t}`).join(",")}'
)
`);
// wal2json emits binary messages; decode them
client.on("replicationMessage", async (msg) => {
if (!msg.log) return; // Skip keepalive messages
try {
const payload: Wal2JsonPayload = JSON.parse(msg.log.toString());
for (const change of payload.change) {
await handler(change);
}
// Acknowledge LSN so PostgreSQL can clean up WAL
await client.query(
`SELECT pg_replication_slot_advance('${slotName}', '${msg.lsn}')`
);
} catch (err) {
console.error("[cdc] Error processing change:", err);
}
});
return () => {
void client.end();
};
}
// Convert wal2json column arrays to an object
export function changeToRecord(change: Wal2JsonChange): Record<string, unknown> {
if (!change.columnnames || !change.columnvalues) return {};
return Object.fromEntries(
change.columnnames.map((name, i) => [name, change.columnvalues![i]])
);
}
Debezium: Production CDC with Kafka
For high-throughput production workloads, Debezium is the standard choice. It handles replication slot management, offset tracking, schema evolution, and at-least-once delivery.
# docker-compose.yml for local Debezium development
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.1
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-connect:
image: debezium/connect:2.6
depends_on: [kafka]
ports: ["8083:8083"]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: "1"
CONFIG_STORAGE_TOPIC: debezium-configs
OFFSET_STORAGE_TOPIC: debezium-offsets
STATUS_STORAGE_TOPIC: debezium-status
# Register Debezium PostgreSQL connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "myapp-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "cdc_user",
"database.password": "strong_password",
"database.dbname": "myapp",
"database.server.name": "myapp",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
"table.include.list": "public.users,public.workspaces,public.invoices",
"column.exclude.list": "public.users.password_hash",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"topic.prefix": "myapp",
"heartbeat.interval.ms": "10000"
}
}'
⚙️ DevOps Done Right — Zero Downtime, Full Automation
Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.
- Staging + production environments with feature flags
- Automated security scanning in the pipeline
- Uptime monitoring + alerting + runbook automation
- On-call support handover docs included
Consuming Debezium Events in Node.js (KafkaJS)
// workers/cdc-consumer.ts
import { Kafka } from "kafkajs";
import { prisma } from "@/lib/prisma";
import { searchIndex } from "@/lib/search";
const kafka = new Kafka({
clientId: "myapp-cdc-consumer",
brokers: [process.env.KAFKA_BROKER!],
});
interface DebeziumEnvelope {
before: Record<string, unknown> | null;
after: Record<string, unknown> | null;
op: "c" | "u" | "d" | "r"; // create/update/delete/read(snapshot)
ts_ms: number;
source: { table: string; schema: string };
}
export async function startDebeziumConsumer(): Promise<void> {
const consumer = kafka.consumer({ groupId: "myapp-cdc-group" });
await consumer.connect();
await consumer.subscribe({
topics: [
"myapp.public.users",
"myapp.public.invoices",
],
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, message }) => {
if (!message.value) return; // Tombstone (delete marker)
const envelope: DebeziumEnvelope = JSON.parse(message.value.toString());
const table = topic.split(".").pop()!;
await handleChange(table, envelope);
},
});
}
async function handleChange(
table: string,
envelope: DebeziumEnvelope
): Promise<void> {
switch (table) {
case "users":
if (envelope.op === "u" || envelope.op === "c") {
const user = envelope.after!;
// Sync to search index
await searchIndex.upsert("users", {
id: user.id as string,
name: user.name as string,
email: user.email as string,
});
} else if (envelope.op === "d") {
await searchIndex.delete("users", envelope.before!.id as string);
}
break;
case "invoices":
// Invalidate cache, update analytics, etc.
if (envelope.op === "u" && envelope.after?.status === "paid") {
// Invoice marked paid — trigger downstream events
await triggerInvoicePaidWebhooks(envelope.after.id as string);
}
break;
}
}
Monitoring Replication Slot Lag
-- Alert when any slot lag exceeds 500MB
SELECT
slot_name,
pg_size_pretty(
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)
) AS lag,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
active
FROM pg_replication_slots
WHERE pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) > 524288000; -- 500MB
-- CloudWatch custom metric for lag (run every minute)
-- aws cloudwatch put-metric-data --namespace PostgreSQL/Replication
-- --metric-name ReplicationLagBytes --value <lag_bytes>
Cost and Timeline Estimates
| Scope | Team | Timeline | Cost Range |
|---|---|---|---|
| Direct wal2json consumer (Node.js) | 1–2 devs | 2–3 days | $800–1,500 |
| Debezium + Kafka setup (local/dev) | 1–2 devs | 3–5 days | $1,200–2,500 |
| Production Debezium + MSK + monitoring | 2 devs | 1–2 weeks | $4,000–9,000 |
| AWS MSK (Kafka) m5.large × 3 brokers | — | — | ~$600/month |
See Also
- PostgreSQL Logical Replication
- PostgreSQL Event Sourcing
- AWS SQS SNS Patterns
- PostgreSQL Triggers and Audit Logging
- AWS OpenSearch Analytics
Working With Viprasol
CDC with logical decoding is the right answer for keeping multiple systems in sync — Elasticsearch for search, Redis for cache invalidation, analytics warehouses, audit logs. Our team implements the full pipeline: configuring wal_level=logical, managing replication slots with lag monitoring, and building consumers (wal2json for simple cases, Debezium+Kafka for production scale).
What we deliver:
- postgresql.conf
wal_level=logicalsetup with slot management andpg_replication_slotslag query startCDCConsumerNode.js function:pgreplication connection, wal2json JSON parse, LSN acknowledgmentchangeToRecordhelper: columnnames + columnvalues → plain object- Debezium connector JSON with
column.exclude.listfor PII,ExtractNewRecordStatetransform - KafkaJS consumer:
eachMessagehandler, Debezium envelopeopc/u/d/r parsing - Monitoring query: slots with lag > 500MB (CloudWatch custom metric pattern)
Talk to our team about your real-time data pipeline architecture →
Or explore our cloud and data engineering services.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need DevOps & Cloud Expertise?
Scale your infrastructure with confidence. AWS, GCP, Azure certified team.
Free consultation • No commitment • Response within 24 hours
Making sense of your data at scale?
Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.