Event Sourcing and CQRS: Immutable Event Logs, Projections, and State Rebuilding
Implement event sourcing with CQRS — immutable event store design, aggregate roots, projections for read models, snapshots for performance, and when event sourc
Event Sourcing and CQRS: Immutable Event Logs, Projections, and State Rebuilding
Event sourcing changes the fundamental storage model: instead of storing current state ("the order is in status 'shipped'"), you store every event that led to the current state ("OrderPlaced, PaymentProcessed, PickedAndPacked, Shipped"). Current state is derived by replaying events.
The benefits are real: complete audit trail, ability to rebuild any past state, natural event-driven integration, and time-travel debugging. The costs are also real: significantly higher complexity, eventual consistency challenges, and schema migration complexity. Most systems don't need event sourcing. Those that do benefit enormously from it.
When to Use Event Sourcing
Strong candidates:
- Financial systems (every transaction must be auditable)
- Compliance-heavy domains (healthcare, legal, regulated industries)
- Collaborative editing (reconstruct document state at any point)
- Debugging complex workflows (replay exactly what happened)
- Event-driven microservices (events as the integration mechanism)
Not a good fit:
- Simple CRUD applications
- Reporting-heavy workloads (harder to query event streams than relational tables)
- Small teams without DDD experience
- Systems where eventual consistency is unacceptable
Core Concepts
Traditional: Store current state
orders table: { id, status: 'shipped', amount: 9900 }
Event Sourcing: Store events, derive state
events table:
1. OrderPlaced { orderId, amount: 9900, items: [...] }
2. PaymentProcessed { orderId, paymentId, amount: 9900 }
3. OrderShipped { orderId, trackingNumber: 'UPS123' }
Current state = replay events 1, 2, 3
Past state = replay only events 1, 2 (before shipping)
Key terms:
- Event: An immutable fact that happened ("OrderPlaced"), past tense
- Aggregate: The entity that events belong to (an Order, a BankAccount)
- Event stream: All events for one aggregate instance, ordered
- Projection: A read model built by processing events (current state, statistics)
- Snapshot: Periodic state checkpoint to avoid replaying all events from the beginning
🌐 Looking for a Dev Team That Actually Delivers?
Most agencies sell you a project manager and assign juniors. Viprasol is different — senior engineers only, direct Slack access, and a 5.0★ Upwork record across 100+ projects.
- React, Next.js, Node.js, TypeScript — production-grade stack
- Fixed-price contracts — no surprise invoices
- Full source code ownership from day one
- 90-day post-launch support included
Event Store Schema
-- Simple PostgreSQL event store
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- e.g., 'order-abc123'
stream_type TEXT NOT NULL, -- e.g., 'Order'
event_type TEXT NOT NULL, -- e.g., 'OrderPlaced'
event_version INTEGER NOT NULL, -- Monotonic version per stream
data JSONB NOT NULL, -- Event payload
metadata JSONB NOT NULL DEFAULT '{}', -- User, timestamp, correlation ID
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Optimistic concurrency: prevent two writers at same version
UNIQUE (stream_id, event_version)
);
CREATE INDEX idx_events_stream ON events (stream_id, event_version);
CREATE INDEX idx_events_type ON events (event_type, created_at);
CREATE INDEX idx_events_created ON events (created_at);
-- Snapshots for performance optimization
CREATE TABLE snapshots (
stream_id TEXT PRIMARY KEY,
stream_type TEXT NOT NULL,
version INTEGER NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Aggregate Implementation
// domain/order/Order.ts
interface OrderEvent {
type: string;
data: Record<string, unknown>;
}
interface OrderState {
id: string;
status: 'pending' | 'paid' | 'shipped' | 'cancelled';
items: Array<{ productId: string; quantity: number; price: number }>;
totalCents: number;
customerId: string;
}
// All events the Order aggregate can emit
type OrderPlaced = {
type: 'OrderPlaced';
data: { orderId: string; customerId: string; items: OrderState['items']; totalCents: number };
};
type PaymentProcessed = {
type: 'PaymentProcessed';
data: { orderId: string; paymentId: string; amountCents: number };
};
type OrderShipped = {
type: 'OrderShipped';
data: { orderId: string; trackingNumber: string; carrier: string };
};
type OrderCancelled = {
type: 'OrderCancelled';
data: { orderId: string; reason: string };
};
type DomainEvent = OrderPlaced | PaymentProcessed | OrderShipped | OrderCancelled;
export class Order {
private state: Partial<OrderState> = {};
private pendingEvents: DomainEvent[] = [];
version = 0;
// Apply event to state — pure function, no side effects
private apply(event: DomainEvent): void {
switch (event.type) {
case 'OrderPlaced':
this.state = {
id: event.data.orderId,
status: 'pending',
items: event.data.items,
totalCents: event.data.totalCents,
customerId: event.data.customerId,
};
break;
case 'PaymentProcessed':
this.state.status = 'paid';
break;
case 'OrderShipped':
this.state.status = 'shipped';
break;
case 'OrderCancelled':
this.state.status = 'cancelled';
break;
}
this.version++;
}
// Command: place order — validates and emits event
static place(params: {
orderId: string;
customerId: string;
items: OrderState['items'];
}): Order {
const order = new Order();
if (params.items.length === 0) {
throw new Error('Order must have at least one item');
}
const totalCents = params.items.reduce(
(sum, item) => sum + item.price * item.quantity, 0
);
const event: OrderPlaced = {
type: 'OrderPlaced',
data: { orderId: params.orderId, customerId: params.customerId,
items: params.items, totalCents },
};
order.apply(event);
order.pendingEvents.push(event);
return order;
}
// Command: process payment
processPayment(paymentId: string, amountCents: number): void {
if (this.state.status !== 'pending') {
throw new Error(`Cannot process payment for order in status: ${this.state.status}`);
}
if (amountCents !== this.state.totalCents) {
throw new Error('Payment amount does not match order total');
}
const event: PaymentProcessed = {
type: 'PaymentProcessed',
data: { orderId: this.state.id!, paymentId, amountCents },
};
this.apply(event);
this.pendingEvents.push(event);
}
// Reconstitute from event history
static fromEvents(events: DomainEvent[]): Order {
const order = new Order();
for (const event of events) {
order.apply(event);
}
return order;
}
// Access pending events (for persistence)
flushEvents(): DomainEvent[] {
const events = [...this.pendingEvents];
this.pendingEvents = [];
return events;
}
getState(): Readonly<Partial<OrderState>> {
return this.state;
}
}
🚀 Senior Engineers. No Junior Handoffs. Ever.
You get the senior developer, not a project manager who relays your requirements to someone you never meet. Every Viprasol project has a senior lead from kickoff to launch.
- MVPs in 4–8 weeks, full platforms in 3–5 months
- Lighthouse 90+ performance scores standard
- Works across US, UK, AU timezones
- Free 30-min architecture review, no commitment
Event Store Repository
// infrastructure/OrderRepository.ts
import { Pool } from 'pg';
import { Order } from '../domain/order/Order';
export class OrderRepository {
constructor(private db: Pool) {}
async save(order: Order, expectedVersion: number): Promise<void> {
const events = order.flushEvents();
if (events.length === 0) return;
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (let i = 0; i < events.length; i++) {
const event = events[i];
const version = expectedVersion + i + 1;
// UNIQUE constraint on (stream_id, event_version) prevents
// concurrent writes at same version (optimistic concurrency)
await client.query(
`INSERT INTO events (stream_id, stream_type, event_type, event_version, data, metadata)
VALUES ($1, 'Order', $2, $3, $4, $5)`,
[
`order-${order.getState().id}`,
event.type,
version,
JSON.stringify(event.data),
JSON.stringify({ timestamp: new Date().toISOString() }),
]
);
}
await client.query('COMMIT');
} catch (err: any) {
await client.query('ROLLBACK');
if (err.code === '23505') { // Unique violation = optimistic concurrency conflict
throw new Error('Concurrent modification detected — please retry');
}
throw err;
} finally {
client.release();
}
}
async findById(orderId: string): Promise<Order | null> {
// Check for snapshot first (avoid replaying all events)
const snapshot = await this.db.query(
'SELECT version, state FROM snapshots WHERE stream_id = $1',
[`order-${orderId}`]
);
let startVersion = 0;
let order = new Order();
if (snapshot.rows.length > 0) {
// Start from snapshot state instead of event 1
startVersion = snapshot.rows[0].version;
// Reconstitute from snapshot...
}
// Load events since snapshot (or from beginning)
const result = await this.db.query(
`SELECT event_type, data FROM events
WHERE stream_id = $1 AND event_version > $2
ORDER BY event_version ASC`,
[`order-${orderId}`, startVersion]
);
if (result.rows.length === 0 && !snapshot.rows.length) return null;
const events = result.rows.map(row => ({
type: row.event_type,
data: row.data,
}));
return Order.fromEvents(events as any);
}
}
Projections (Read Models)
Event sourcing separates writes (events) from reads (projections). Projections listen to events and build optimized read models:
// projections/OrderSummaryProjection.ts
// Runs as a background process or triggered by event store subscription
export class OrderSummaryProjection {
constructor(private readDb: Pool) {}
async handle(event: { type: string; data: any; streamId: string }): Promise<void> {
switch (event.type) {
case 'OrderPlaced':
await this.readDb.query(
`INSERT INTO order_summaries (id, customer_id, status, total_cents, item_count, placed_at)
VALUES ($1, $2, 'pending', $3, $4, NOW())`,
[event.data.orderId, event.data.customerId,
event.data.totalCents, event.data.items.length]
);
break;
case 'PaymentProcessed':
await this.readDb.query(
'UPDATE order_summaries SET status = $1, paid_at = NOW() WHERE id = $2',
['paid', event.data.orderId]
);
break;
case 'OrderShipped':
await this.readDb.query(
`UPDATE order_summaries
SET status = 'shipped', tracking_number = $1, shipped_at = NOW()
WHERE id = $2`,
[event.data.trackingNumber, event.data.orderId]
);
break;
}
}
// Rebuild projection from scratch (when schema changes or bugs fixed)
async rebuild(): Promise<void> {
await this.readDb.query('TRUNCATE order_summaries');
const events = await this.readDb.query(
`SELECT event_type, data, stream_id FROM events
WHERE stream_type = 'Order'
ORDER BY id ASC`
);
for (const row of events.rows) {
await this.handle({ type: row.event_type, data: row.data, streamId: row.stream_id });
}
}
}
The power of projections: you can add new projections retroactively (a new analytics view, a new reporting table) and rebuild it from the full event history.
Snapshots for Performance
When an aggregate has thousands of events, replaying them all becomes slow. Snapshots cache the state periodically:
// Take snapshot every 100 events
const SNAPSHOT_THRESHOLD = 100;
async function saveWithSnapshot(order: Order, version: number): Promise<void> {
await repository.save(order, version);
if (version % SNAPSHOT_THRESHOLD === 0) {
await db.query(
`INSERT INTO snapshots (stream_id, stream_type, version, state)
VALUES ($1, 'Order', $2, $3)
ON CONFLICT (stream_id) DO UPDATE
SET version = $2, state = $3, created_at = NOW()`,
[`order-${order.getState().id}`, version, JSON.stringify(order.getState())]
);
}
}
Working With Viprasol
We design and implement event-sourced systems for domains where audit trails and state reconstruction matter — financial platforms, compliance-heavy applications, and complex workflow systems. We also help teams determine when event sourcing adds value vs. when simpler approaches serve better.
→ Talk to our team about event sourcing and CQRS architecture.
See Also
- Event-Driven Architecture — event-driven patterns without full event sourcing
- Software Architecture Patterns — architectural decision framework
- PostgreSQL Performance Optimization — optimizing the event store
- Webhook Design Patterns — external event propagation
- Web Development Services — complex domain architecture
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need a Modern Web Application?
From landing pages to complex SaaS platforms — we build it all with Next.js and React.
Free consultation • No commitment • Response within 24 hours
Need a custom web application built?
We build React and Next.js web applications with Lighthouse ≥90 scores, mobile-first design, and full source code ownership. Senior engineers only — from architecture through deployment.