PostgreSQL Logical Replication in 2026: CDC, Replication Slots, and Streaming to Data Warehouse
Use PostgreSQL logical replication for change data capture: replication slots, publications, Debezium CDC, streaming to Redshift/BigQuery, and monitoring slot lag.
PostgreSQL Logical Replication in 2026: CDC, Replication Slots, and Streaming to Data Warehouse
Logical replication turns PostgreSQL into an event stream. Every INSERT, UPDATE, and DELETE on replicated tables is captured and forwarded โ to a replica, a data warehouse, a search index, or a cache invalidation system. Change Data Capture (CDC) is the basis for keeping analytics in sync with production without expensive full-table ETL jobs.
This post covers PostgreSQL logical replication internals, replication slots and publications, Debezium for managed CDC, streaming to Redshift/BigQuery via Kafka, the dangerous slot lag problem, and monitoring.
Logical vs Physical Replication
| Aspect | Physical (Streaming) | Logical |
|---|---|---|
| Granularity | Block-level (full WAL) | Row-level (decoded changes) |
| Cross-version | โ Same major version only | โ Across major versions |
| Cross-platform | โ Same OS/arch only | โ Any PostgreSQL |
| Selective tables | โ All or nothing | โ Per-table publications |
| Destination | PostgreSQL replica | Any system (Kafka, S3, Redshift) |
| DML only | No (DDL included) | Yes (DDL not streamed by default) |
| Use case | HA/failover replica | CDC, data pipelines, migration |
PostgreSQL Configuration
-- postgresql.conf changes required (restart needed):
-- wal_level = logical (default is 'replica' โ must change to 'logical')
-- max_replication_slots = 10 (default 10 โ increase for multiple consumers)
-- max_wal_senders = 10 (default 10 โ connections for logical replication)
-- Check current settings:
SHOW wal_level; -- Should show 'logical'
SHOW max_replication_slots;
SHOW max_wal_senders;
-- On AWS RDS/Aurora: set via parameter group
-- rds.logical_replication = 1 (requires reboot)
๐ Looking for a Dev Team That Actually Delivers?
Most agencies sell you a project manager and assign juniors. Viprasol is different โ senior engineers only, direct Slack access, and a 5.0โ Upwork record across 100+ projects.
- React, Next.js, Node.js, TypeScript โ production-grade stack
- Fixed-price contracts โ no surprise invoices
- Full source code ownership from day one
- 90-day post-launch support included
Publications: What to Replicate
-- Publish all tables in the current database
CREATE PUBLICATION all_tables FOR ALL TABLES;
-- Publish specific tables
CREATE PUBLICATION app_events FOR TABLE
orders,
order_items,
customers,
subscriptions;
-- Publish with row filter (PostgreSQL 15+): only publish active subscriptions
CREATE PUBLICATION active_subscriptions
FOR TABLE subscriptions (id, customer_id, plan, status, updated_at)
WHERE (status != 'cancelled');
-- Publish only specific columns (column list filtering, PostgreSQL 15+)
CREATE PUBLICATION customers_public
FOR TABLE customers (id, email, name, created_at)
-- Excludes: phone, address, payment_method, notes
;
-- List publications
SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete
FROM pg_publication;
Replication Slots
-- Create a logical replication slot (for direct subscription)
SELECT pg_create_logical_replication_slot(
'my_consumer', -- Slot name (must be unique)
'pgoutput' -- Output plugin ('pgoutput' = standard, 'wal2json' = JSON)
);
-- List all slots and their lag
SELECT
slot_name,
plugin,
active,
active_pid,
confirmed_flush_lsn,
pg_current_wal_lsn() - confirmed_flush_lsn AS lag_bytes,
pg_size_pretty(pg_current_wal_lsn() - confirmed_flush_lsn) AS lag_pretty
FROM pg_replication_slots;
-- โ ๏ธ CRITICAL: unused slots accumulate WAL indefinitely (can fill disk!)
-- Always monitor slot lag and drop unused slots immediately
SELECT pg_drop_replication_slot('my_consumer');
๐ Senior Engineers. No Junior Handoffs. Ever.
You get the senior developer, not a project manager who relays your requirements to someone you never meet. Every Viprasol project has a senior lead from kickoff to launch.
- MVPs in 4โ8 weeks, full platforms in 3โ5 months
- Lighthouse 90+ performance scores standard
- Works across US, UK, AU timezones
- Free 30-min architecture review, no commitment
Native Subscription (PostgreSQL โ PostgreSQL)
-- On the SUBSCRIBER (replica) database:
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=primary-db.example.com port=5432 dbname=myapp user=replication password=...'
PUBLICATION app_events
WITH (
copy_data = true, -- Copy initial snapshot
connect = true,
enabled = true,
slot_name = 'my_sub_slot', -- Auto-created on publisher
synchronous_commit = off -- Don't wait for subscriber confirmation
);
-- Monitor subscription status
SELECT subname, subenabled, subslotname, subpublications
FROM pg_subscription;
-- Check replication lag on subscriber
SELECT
application_name,
state,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
(sent_lsn - replay_lsn) AS lag_bytes
FROM pg_stat_replication;
Debezium: CDC to Kafka
Debezium is the standard for PostgreSQL CDC. It reads the WAL via a replication slot and publishes change events to Kafka topics:
# debezium/postgres-connector.json
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "primary-db.example.com",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/secrets.properties:db.password}",
"database.dbname": "myapp",
"database.server.name": "myapp",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "app_events",
"table.include.list": "public.orders,public.customers,public.subscriptions",
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO debezium_heartbeat (id) VALUES (1) ON CONFLICT (id) DO UPDATE SET updated_at = now()",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"snapshot.mode": "initial",
"decimal.handling.mode": "double",
"time.precision.mode": "connect"
}
}
-- Required: replication user with permissions
CREATE USER debezium WITH REPLICATION PASSWORD 'strong-password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
-- Heartbeat table (prevents slot lag during quiet periods)
CREATE TABLE debezium_heartbeat (
id INTEGER PRIMARY KEY DEFAULT 1,
updated_at TIMESTAMPTZ DEFAULT now()
);
GRANT INSERT, UPDATE ON debezium_heartbeat TO debezium;
Kafka โ Data Warehouse (Custom Consumer)
// consumers/warehouse-sink.ts
// Consumes Debezium CDC events from Kafka, sinks to Redshift/BigQuery
import { Kafka } from "kafkajs";
import { Pool } from "pg"; // Redshift is PostgreSQL-compatible
const kafka = new Kafka({
clientId: "warehouse-sink",
brokers: process.env.KAFKA_BROKERS!.split(","),
ssl: true,
sasl: { mechanism: "scram-sha-256", username: process.env.KAFKA_USER!, password: process.env.KAFKA_PASS! },
});
const redshift = new Pool({ connectionString: process.env.REDSHIFT_URL });
const consumer = kafka.consumer({ groupId: "warehouse-sink-group" });
async function processChangeEvent(event: {
op: "c" | "u" | "d" | "r"; // create, update, delete, read (snapshot)
id: string;
[key: string]: unknown;
}) {
switch (event.op) {
case "c":
case "r": // Snapshot read = same as create
await redshift.query(
`INSERT INTO orders_dw (id, customer_id, total, status, created_at, synced_at)
VALUES ($1, $2, $3, $4, $5, now())
ON CONFLICT (id) DO UPDATE SET
customer_id = EXCLUDED.customer_id,
total = EXCLUDED.total,
status = EXCLUDED.status,
synced_at = now()`,
[event.id, event.customer_id, event.total, event.status, event.created_at]
);
break;
case "u":
await redshift.query(
`UPDATE orders_dw SET
total = $2,
status = $3,
synced_at = now()
WHERE id = $1`,
[event.id, event.total, event.status]
);
break;
case "d":
await redshift.query(
`UPDATE orders_dw SET deleted_at = now() WHERE id = $1`,
[event.id]
);
break;
}
}
async function run() {
await consumer.connect();
await consumer.subscribe({ topics: ["myapp.public.orders"], fromBeginning: false });
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
const event = JSON.parse(message.value!.toString());
await processChangeEvent(event);
resolveOffset(message.offset);
await heartbeat();
}
},
});
}
run().catch(console.error);
Slot Lag Monitoring (Critical)
-- Monitor slot lag โ alert if above threshold (e.g., 1GB)
SELECT
slot_name,
active,
pg_size_pretty(pg_current_wal_lsn() - confirmed_flush_lsn) AS lag,
(pg_current_wal_lsn() - confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
ORDER BY lag_bytes DESC NULLS LAST;
-- Alert query for monitoring (CloudWatch custom metric or Prometheus):
-- Trigger alarm if lag_bytes > 1073741824 (1GB)
// lib/monitoring/slot-lag.ts
// Run every 5 minutes via EventBridge cron
import { db } from "@/lib/db";
import { CloudWatchClient, PutMetricDataCommand } from "@aws-sdk/client-cloudwatch";
const cw = new CloudWatchClient({ region: process.env.AWS_REGION });
export async function reportSlotLag() {
const slots = await db.$queryRaw<{
slot_name: string;
lag_bytes: bigint;
active: boolean;
}[]>`
SELECT
slot_name,
pg_current_wal_lsn() - confirmed_flush_lsn AS lag_bytes,
active
FROM pg_replication_slots
`;
await cw.send(new PutMetricDataCommand({
Namespace: "MyApp/Database",
MetricData: slots.map((slot) => ({
MetricName: "ReplicationSlotLagBytes",
Dimensions: [{ Name: "SlotName", Value: slot.slot_name }],
Value: Number(slot.lag_bytes ?? 0),
Unit: "Bytes",
})),
}));
}
Cost and Timeline
| Component | Timeline | Cost (USD) |
|---|---|---|
| PostgreSQL CDC setup (publications + slot) | 0.5 day | $300โ$500 |
| Debezium connector configuration | 0.5โ1 day | $400โ$800 |
| Kafka consumer + warehouse sink | 1โ2 days | $800โ$1,600 |
| Slot lag monitoring | 0.5 day | $300โ$500 |
| Full CDC pipeline | 2โ4 weeks | $10,000โ$20,000 |
See Also
- PostgreSQL Partitioning Advanced โ Partitioned tables with replication
- AWS SQS SNS Patterns โ Alternative event streaming for smaller scale
- AWS EventBridge โ Lighter alternative for simple CDC use cases
- PostgreSQL EXPLAIN ANALYZE โ Query performance on the replica
Working With Viprasol
We design and implement PostgreSQL CDC pipelines for SaaS products โ from simple publication/subscription replication through full Debezium + Kafka + data warehouse pipelines. Our data engineering team has shipped CDC systems processing millions of change events per day.
What we deliver:
- PostgreSQL wal_level=logical configuration and publication setup
- Replication user with minimal required permissions
- Debezium connector configuration with heartbeat and slot monitoring
- Kafka consumer โ Redshift/BigQuery sink
- Slot lag CloudWatch alarm (alert before disk fills)
Explore our web development services or contact us to build your PostgreSQL CDC pipeline.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need a Modern Web Application?
From landing pages to complex SaaS platforms โ we build it all with Next.js and React.
Free consultation โข No commitment โข Response within 24 hours
Need a custom web application built?
We build React and Next.js web applications with Lighthouse โฅ90 scores, mobile-first design, and full source code ownership. Senior engineers only โ from architecture through deployment.