Event Sourcing with PostgreSQL: Append-Only Event Log, Projections, and Snapshots
Implement event sourcing with PostgreSQL. Covers append-only event store design, aggregate reconstruction, projections with event handlers, snapshot optimization, and query patterns for event-sourced systems.
Most applications update records in place — when a user changes their subscription plan, you run UPDATE subscriptions SET plan = 'pro'. But you lose the history: what was the old plan? When did it change? What triggered it? Event sourcing flips this model: instead of storing current state, you store every event that changed the state. Current state is reconstructed by replaying events.
This guide builds a production event sourcing system on PostgreSQL, covering the event store schema, aggregate pattern, projections, and snapshot optimization.
When to Use Event Sourcing
Event sourcing is worth the complexity when you need:
- Full audit history without separate audit tables
- Temporal queries ("what did this look like on date X?")
- Event replay to rebuild broken projections
- Event-driven integration — other services react to domain events
- Debugging — reproduce bugs by replaying to the exact failure state
Don't use it for simple CRUD, when audit logs are good enough, or when your team isn't ready for the conceptual shift.
Event Store Schema
-- The core event store: append-only, never update or delete
CREATE TABLE domain_events (
id BIGSERIAL PRIMARY KEY, -- Global ordering sequence
event_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
aggregate_type TEXT NOT NULL, -- 'Order', 'Subscription', 'User'
aggregate_id UUID NOT NULL,
event_type TEXT NOT NULL, -- 'OrderPlaced', 'OrderShipped'
event_version INTEGER NOT NULL, -- Per-aggregate sequence number
payload JSONB NOT NULL, -- Event data
metadata JSONB NOT NULL DEFAULT '{}', -- Correlation ID, causation ID, user, IP
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Optimistic concurrency: prevent two writers from writing version N+1 simultaneously
UNIQUE(aggregate_id, event_version)
);
-- Projections (read models) — rebuilt from events, optimized for queries
CREATE TABLE order_projections (
aggregate_id UUID PRIMARY KEY,
order_number TEXT NOT NULL UNIQUE,
customer_id UUID NOT NULL,
status TEXT NOT NULL,
line_items JSONB NOT NULL DEFAULT '[]',
subtotal NUMERIC(12,2) NOT NULL DEFAULT 0,
total NUMERIC(12,2) NOT NULL DEFAULT 0,
shipping_addr JSONB,
placed_at TIMESTAMPTZ,
shipped_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
cancelled_at TIMESTAMPTZ,
last_event_id BIGINT NOT NULL, -- Track which events are processed
version INTEGER NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Snapshots: periodic aggregate state cache to avoid replaying 10K+ events
CREATE TABLE aggregate_snapshots (
aggregate_type TEXT NOT NULL,
aggregate_id UUID NOT NULL,
version INTEGER NOT NULL, -- Event version at snapshot time
state JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (aggregate_type, aggregate_id, version)
);
-- Indexes for common access patterns
CREATE INDEX idx_events_aggregate ON domain_events(aggregate_id, event_version);
CREATE INDEX idx_events_type ON domain_events(aggregate_type, occurred_at DESC);
CREATE INDEX idx_events_occurred ON domain_events(occurred_at);
CREATE INDEX idx_events_global ON domain_events(id); -- For projection catch-up
☁️ Is Your Cloud Costing Too Much?
Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.
- AWS, GCP, Azure certified engineers
- Infrastructure as Code (Terraform, CDK)
- Docker, Kubernetes, GitHub Actions CI/CD
- Typical audit recovers $500–$3,000/month in savings
Event Type Definitions
// domain/events/order-events.ts
import { z } from "zod";
// Base event schema
const BaseEventSchema = z.object({
eventId: z.string().uuid(),
aggregateId: z.string().uuid(),
occurredAt: z.string().datetime(),
metadata: z.object({
userId: z.string().optional(),
correlationId: z.string().optional(),
causationId: z.string().optional(),
}).default({}),
});
// Individual event schemas
export const OrderPlacedSchema = BaseEventSchema.extend({
eventType: z.literal("OrderPlaced"),
payload: z.object({
orderNumber: z.string(),
customerId: z.string().uuid(),
lineItems: z.array(z.object({
productId: z.string(),
sku: z.string(),
name: z.string(),
quantity: z.number().int().positive(),
unitPrice: z.number().positive(),
})),
shippingAddress: z.object({
line1: z.string(),
city: z.string(),
country: z.string(),
postalCode: z.string(),
}),
subtotal: z.number(),
shippingCost: z.number(),
total: z.number(),
}),
});
export const OrderShippedSchema = BaseEventSchema.extend({
eventType: z.literal("OrderShipped"),
payload: z.object({
trackingNumber: z.string(),
carrier: z.string(),
estimatedDelivery: z.string().datetime().optional(),
}),
});
export const OrderDeliveredSchema = BaseEventSchema.extend({
eventType: z.literal("OrderDelivered"),
payload: z.object({
deliveredAt: z.string().datetime(),
signedBy: z.string().optional(),
}),
});
export const OrderCancelledSchema = BaseEventSchema.extend({
eventType: z.literal("OrderCancelled"),
payload: z.object({
reason: z.string(),
cancelledBy: z.string().uuid(),
refundAmount: z.number().optional(),
}),
});
// Union type for all order events
export type OrderEvent =
| z.infer<typeof OrderPlacedSchema>
| z.infer<typeof OrderShippedSchema>
| z.infer<typeof OrderDeliveredSchema>
| z.infer<typeof OrderCancelledSchema>;
Event Store Repository
// infrastructure/event-store.ts
import { prisma } from "@/lib/prisma";
export interface StoredEvent {
id: bigint;
eventId: string;
aggregateType: string;
aggregateId: string;
eventType: string;
eventVersion: number;
payload: Record<string, unknown>;
metadata: Record<string, unknown>;
occurredAt: Date;
}
export class EventStore {
async append(params: {
aggregateType: string;
aggregateId: string;
events: Array<{
eventType: string;
payload: Record<string, unknown>;
metadata?: Record<string, unknown>;
}>;
expectedVersion: number; // -1 for new aggregate, N for optimistic concurrency
}): Promise<StoredEvent[]> {
return prisma.$transaction(async (tx) => {
// Verify current version (optimistic concurrency check)
const latestEvent = await tx.domainEvent.findFirst({
where: { aggregateId: params.aggregateId },
orderBy: { eventVersion: "desc" },
select: { eventVersion: true },
});
const currentVersion = latestEvent?.eventVersion ?? -1;
if (currentVersion !== params.expectedVersion) {
throw new ConcurrencyError(
`Aggregate ${params.aggregateId} expected version ${params.expectedVersion}, got ${currentVersion}`
);
}
// Append all events atomically
const stored: StoredEvent[] = [];
let version = currentVersion;
for (const event of params.events) {
version++;
const record = await tx.domainEvent.create({
data: {
aggregateType: params.aggregateType,
aggregateId: params.aggregateId,
eventType: event.eventType,
eventVersion: version,
payload: event.payload,
metadata: event.metadata ?? {},
},
});
stored.push(record as unknown as StoredEvent);
}
return stored;
});
}
async loadEvents(
aggregateId: string,
fromVersion = 0
): Promise<StoredEvent[]> {
const events = await prisma.domainEvent.findMany({
where: {
aggregateId,
eventVersion: { gte: fromVersion },
},
orderBy: { eventVersion: "asc" },
});
return events as unknown as StoredEvent[];
}
async loadEventsFromPosition(
afterId: bigint,
limit = 100
): Promise<StoredEvent[]> {
const events = await prisma.domainEvent.findMany({
where: { id: { gt: afterId } },
orderBy: { id: "asc" },
take: limit,
});
return events as unknown as StoredEvent[];
}
}
export class ConcurrencyError extends Error {
constructor(message: string) {
super(message);
this.name = "ConcurrencyError";
}
}
⚙️ DevOps Done Right — Zero Downtime, Full Automation
Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.
- Staging + production environments with feature flags
- Automated security scanning in the pipeline
- Uptime monitoring + alerting + runbook automation
- On-call support handover docs included
Order Aggregate
// domain/aggregates/order.ts
import type { StoredEvent } from "@/infrastructure/event-store";
import type { OrderEvent } from "@/domain/events/order-events";
type OrderStatus = "pending" | "placed" | "shipped" | "delivered" | "cancelled";
interface OrderState {
id: string;
orderNumber: string;
customerId: string;
status: OrderStatus;
lineItems: Array<{
productId: string;
sku: string;
name: string;
quantity: number;
unitPrice: number;
}>;
total: number;
shippingAddress: Record<string, string> | null;
placedAt: Date | null;
shippedAt: Date | null;
deliveredAt: Date | null;
cancelledAt: Date | null;
version: number;
}
export class Order {
private state: OrderState;
private pendingEvents: Array<{
eventType: string;
payload: Record<string, unknown>;
}> = [];
constructor(id: string) {
this.state = {
id,
orderNumber: "",
customerId: "",
status: "pending",
lineItems: [],
total: 0,
shippingAddress: null,
placedAt: null,
shippedAt: null,
deliveredAt: null,
cancelledAt: null,
version: -1,
};
}
// Rebuild state from event history
static fromEvents(id: string, events: StoredEvent[]): Order {
const order = new Order(id);
for (const event of events) {
order.apply(event.eventType, event.payload as Record<string, unknown>);
order.state.version = event.eventVersion;
}
return order;
}
// Rebuild from snapshot + subsequent events
static fromSnapshot(
snapshot: { state: OrderState; version: number },
subsequentEvents: StoredEvent[]
): Order {
const order = new Order(snapshot.state.id);
order.state = snapshot.state;
for (const event of subsequentEvents) {
order.apply(event.eventType, event.payload as Record<string, unknown>);
order.state.version = event.eventVersion;
}
return order;
}
// Command: place order
place(params: {
orderNumber: string;
customerId: string;
lineItems: OrderState["lineItems"];
shippingAddress: Record<string, string>;
subtotal: number;
shippingCost: number;
total: number;
}): void {
if (this.state.status !== "pending") {
throw new Error("Order has already been placed");
}
this.recordEvent("OrderPlaced", {
orderNumber: params.orderNumber,
customerId: params.customerId,
lineItems: params.lineItems,
shippingAddress: params.shippingAddress,
subtotal: params.subtotal,
shippingCost: params.shippingCost,
total: params.total,
});
}
// Command: ship order
ship(params: {
trackingNumber: string;
carrier: string;
estimatedDelivery?: Date;
}): void {
if (this.state.status !== "placed") {
throw new Error(`Cannot ship order in status: ${this.state.status}`);
}
this.recordEvent("OrderShipped", {
trackingNumber: params.trackingNumber,
carrier: params.carrier,
estimatedDelivery: params.estimatedDelivery?.toISOString(),
});
}
// Command: cancel order
cancel(params: { reason: string; cancelledBy: string; refundAmount?: number }): void {
if (["delivered", "cancelled"].includes(this.state.status)) {
throw new Error(`Cannot cancel order in status: ${this.state.status}`);
}
this.recordEvent("OrderCancelled", {
reason: params.reason,
cancelledBy: params.cancelledBy,
refundAmount: params.refundAmount,
});
}
private recordEvent(
eventType: string,
payload: Record<string, unknown>
): void {
this.pendingEvents.push({ eventType, payload });
// Apply immediately so state is updated for subsequent commands
this.apply(eventType, payload);
}
// State transition: idempotent, pure
private apply(eventType: string, payload: Record<string, unknown>): void {
switch (eventType) {
case "OrderPlaced":
this.state.status = "placed";
this.state.orderNumber = payload.orderNumber as string;
this.state.customerId = payload.customerId as string;
this.state.lineItems = payload.lineItems as OrderState["lineItems"];
this.state.total = payload.total as number;
this.state.shippingAddress = payload.shippingAddress as Record<string, string>;
this.state.placedAt = new Date();
break;
case "OrderShipped":
this.state.status = "shipped";
this.state.shippedAt = new Date();
break;
case "OrderDelivered":
this.state.status = "delivered";
this.state.deliveredAt = new Date(payload.deliveredAt as string);
break;
case "OrderCancelled":
this.state.status = "cancelled";
this.state.cancelledAt = new Date();
break;
}
}
get id() { return this.state.id; }
get status() { return this.state.status; }
get version() { return this.state.version; }
get uncommittedEvents() { return [...this.pendingEvents]; }
clearUncommittedEvents() { this.pendingEvents = []; }
}
Command Handler
// application/commands/place-order.ts
import { EventStore } from "@/infrastructure/event-store";
import { Order } from "@/domain/aggregates/order";
import { OrderProjectionHandler } from "@/infrastructure/projections/order-projection";
const eventStore = new EventStore();
const projectionHandler = new OrderProjectionHandler();
interface PlaceOrderCommand {
orderId: string;
customerId: string;
lineItems: Array<{
productId: string;
sku: string;
name: string;
quantity: number;
unitPrice: number;
}>;
shippingAddress: Record<string, string>;
}
export async function placeOrder(cmd: PlaceOrderCommand): Promise<void> {
// Load aggregate (new order, so no existing events)
const order = new Order(cmd.orderId);
const orderNumber = await generateOrderNumber();
const subtotal = cmd.lineItems.reduce(
(sum, item) => sum + item.quantity * item.unitPrice,
0
);
const shippingCost = calculateShipping(cmd.shippingAddress, subtotal);
// Execute command — raises domain events
order.place({
orderNumber,
customerId: cmd.customerId,
lineItems: cmd.lineItems,
shippingAddress: cmd.shippingAddress,
subtotal,
shippingCost,
total: subtotal + shippingCost,
});
// Persist events
const storedEvents = await eventStore.append({
aggregateType: "Order",
aggregateId: order.id,
events: order.uncommittedEvents,
expectedVersion: order.version, // -1 for new aggregate
});
order.clearUncommittedEvents();
// Update read projection immediately (synchronous)
for (const event of storedEvents) {
await projectionHandler.handle(event);
}
}
Projection Handler
// infrastructure/projections/order-projection.ts
import { prisma } from "@/lib/prisma";
import type { StoredEvent } from "@/infrastructure/event-store";
export class OrderProjectionHandler {
async handle(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case "OrderPlaced":
await this.onOrderPlaced(event);
break;
case "OrderShipped":
await this.onOrderShipped(event);
break;
case "OrderDelivered":
await this.onOrderDelivered(event);
break;
case "OrderCancelled":
await this.onOrderCancelled(event);
break;
}
}
private async onOrderPlaced(event: StoredEvent): Promise<void> {
const p = event.payload;
await prisma.orderProjection.upsert({
where: { aggregateId: event.aggregateId },
create: {
aggregateId: event.aggregateId,
orderNumber: p.orderNumber as string,
customerId: p.customerId as string,
status: "placed",
lineItems: p.lineItems as any,
subtotal: p.subtotal as number,
total: p.total as number,
shippingAddr: p.shippingAddress as any,
placedAt: event.occurredAt,
lastEventId: Number(event.id),
version: event.eventVersion,
},
update: {
status: "placed",
lastEventId: Number(event.id),
version: event.eventVersion,
},
});
}
private async onOrderShipped(event: StoredEvent): Promise<void> {
await prisma.orderProjection.update({
where: { aggregateId: event.aggregateId },
data: {
status: "shipped",
shippedAt: event.occurredAt,
lastEventId: Number(event.id),
version: event.eventVersion,
updatedAt: new Date(),
},
});
}
private async onOrderDelivered(event: StoredEvent): Promise<void> {
await prisma.orderProjection.update({
where: { aggregateId: event.aggregateId },
data: {
status: "delivered",
deliveredAt: new Date(event.payload.deliveredAt as string),
lastEventId: Number(event.id),
version: event.eventVersion,
updatedAt: new Date(),
},
});
}
private async onOrderCancelled(event: StoredEvent): Promise<void> {
await prisma.orderProjection.update({
where: { aggregateId: event.aggregateId },
data: {
status: "cancelled",
cancelledAt: event.occurredAt,
lastEventId: Number(event.id),
version: event.eventVersion,
updatedAt: new Date(),
},
});
}
}
Snapshot Optimization
// infrastructure/snapshot-store.ts
const SNAPSHOT_INTERVAL = 50; // Take snapshot every 50 events
export async function saveSnapshotIfNeeded(
aggregateType: string,
aggregate: Order
): Promise<void> {
if (aggregate.version > 0 && aggregate.version % SNAPSHOT_INTERVAL === 0) {
await prisma.aggregateSnapshot.upsert({
where: {
aggregateType_aggregateId_version: {
aggregateType,
aggregateId: aggregate.id,
version: aggregate.version,
},
},
create: {
aggregateType,
aggregateId: aggregate.id,
version: aggregate.version,
state: aggregate.toSnapshot(),
},
update: {},
});
}
}
export async function loadAggregate(
aggregateType: string,
aggregateId: string,
eventStore: EventStore
): Promise<Order> {
// Find latest snapshot
const snapshot = await prisma.aggregateSnapshot.findFirst({
where: { aggregateType, aggregateId },
orderBy: { version: "desc" },
});
if (snapshot) {
// Load only events after snapshot
const subsequentEvents = await eventStore.loadEvents(
aggregateId,
snapshot.version + 1
);
return Order.fromSnapshot(
{ state: snapshot.state as any, version: snapshot.version },
subsequentEvents
);
}
// No snapshot: replay all events
const events = await eventStore.loadEvents(aggregateId);
return Order.fromEvents(aggregateId, events);
}
Temporal Queries
-- What was the order status on a specific date?
SELECT
aggregate_id,
event_type,
payload->>'status' AS status_after_event,
occurred_at
FROM domain_events
WHERE aggregate_id = $1
AND occurred_at <= $2 -- Target date
ORDER BY event_version DESC
LIMIT 1;
-- Replay all events for an aggregate (for debugging)
SELECT
event_version,
event_type,
payload,
metadata,
occurred_at
FROM domain_events
WHERE aggregate_id = $1
ORDER BY event_version ASC;
-- Event counts by type over time (operational analytics)
SELECT
DATE_TRUNC('day', occurred_at) AS day,
event_type,
COUNT(*) AS count
FROM domain_events
WHERE occurred_at > NOW() - INTERVAL '30 days'
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC;
Cost and Timeline Estimates
| Scope | Team | Timeline | Cost Range |
|---|---|---|---|
| Basic event store + one aggregate | 1–2 devs | 1 week | $2,000–4,000 |
| Full CQRS (commands + projections + queries) | 2 devs | 3–4 weeks | $8,000–16,000 |
| Event sourcing migration from CRUD | 2–3 devs | 4–8 weeks | $15,000–35,000 |
| Enterprise (event bus, saga orchestration, replay tooling) | 3+ devs | 8–16 weeks | $30,000–80,000 |
See Also
- SaaS Audit Trail with Immutable Logging
- PostgreSQL CTEs and Recursive Queries
- AWS EventBridge Event-Driven Architecture
- PostgreSQL Triggers and Audit Tables
- SaaS Webhook System with Delivery Guarantees
Working With Viprasol
Event sourcing is a significant architectural commitment — the benefits are real but so is the complexity. Our team has implemented event-sourced systems for fintech and e-commerce products where audit history, temporal queries, and event replay are non-negotiable requirements. We help teams decide whether event sourcing is the right fit and implement it correctly when it is.
What we deliver:
- Append-only event store schema with optimistic concurrency
- Aggregate pattern with command/event separation
- Read projection handlers that rebuild query models
- Snapshot mechanism for aggregates with long histories
- Projection rebuild tooling for schema migrations
Talk to our team about event sourcing architecture →
Or explore our cloud infrastructure services.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need DevOps & Cloud Expertise?
Scale your infrastructure with confidence. AWS, GCP, Azure certified team.
Free consultation • No commitment • Response within 24 hours
Making sense of your data at scale?
Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.