Back to Blog

Event Sourcing and CQRS: Immutable Event Logs, Projections, and State Rebuilding

Implement event sourcing with CQRS — immutable event store design, aggregate roots, projections for read models, snapshots for performance, and when event sourc

Viprasol Tech Team
May 10, 2026
14 min read

Event Sourcing and CQRS: Immutable Event Logs, Projections, and State Rebuilding

Event sourcing changes the fundamental storage model: instead of storing current state ("the order is in status 'shipped'"), you store every event that led to the current state ("OrderPlaced, PaymentProcessed, PickedAndPacked, Shipped"). Current state is derived by replaying events.

The benefits are real: complete audit trail, ability to rebuild any past state, natural event-driven integration, and time-travel debugging. The costs are also real: significantly higher complexity, eventual consistency challenges, and schema migration complexity. Most systems don't need event sourcing. Those that do benefit enormously from it.


When to Use Event Sourcing

Strong candidates:

  • Financial systems (every transaction must be auditable)
  • Compliance-heavy domains (healthcare, legal, regulated industries)
  • Collaborative editing (reconstruct document state at any point)
  • Debugging complex workflows (replay exactly what happened)
  • Event-driven microservices (events as the integration mechanism)

Not a good fit:

  • Simple CRUD applications
  • Reporting-heavy workloads (harder to query event streams than relational tables)
  • Small teams without DDD experience
  • Systems where eventual consistency is unacceptable

Core Concepts

Traditional: Store current state
  orders table: { id, status: 'shipped', amount: 9900 }

Event Sourcing: Store events, derive state
  events table:
    1. OrderPlaced     { orderId, amount: 9900, items: [...] }
    2. PaymentProcessed { orderId, paymentId, amount: 9900 }
    3. OrderShipped    { orderId, trackingNumber: 'UPS123' }

Current state = replay events 1, 2, 3
Past state = replay only events 1, 2 (before shipping)

Key terms:

  • Event: An immutable fact that happened ("OrderPlaced"), past tense
  • Aggregate: The entity that events belong to (an Order, a BankAccount)
  • Event stream: All events for one aggregate instance, ordered
  • Projection: A read model built by processing events (current state, statistics)
  • Snapshot: Periodic state checkpoint to avoid replaying all events from the beginning

🌐 Looking for a Dev Team That Actually Delivers?

Most agencies sell you a project manager and assign juniors. Viprasol is different — senior engineers only, direct Slack access, and a 5.0★ Upwork record across 100+ projects.

  • React, Next.js, Node.js, TypeScript — production-grade stack
  • Fixed-price contracts — no surprise invoices
  • Full source code ownership from day one
  • 90-day post-launch support included

Event Store Schema

-- Simple PostgreSQL event store
CREATE TABLE events (
    id              BIGSERIAL PRIMARY KEY,
    stream_id       TEXT NOT NULL,         -- e.g., 'order-abc123'
    stream_type     TEXT NOT NULL,         -- e.g., 'Order'
    event_type      TEXT NOT NULL,         -- e.g., 'OrderPlaced'
    event_version   INTEGER NOT NULL,      -- Monotonic version per stream
    data            JSONB NOT NULL,        -- Event payload
    metadata        JSONB NOT NULL DEFAULT '{}',  -- User, timestamp, correlation ID
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    -- Optimistic concurrency: prevent two writers at same version
    UNIQUE (stream_id, event_version)
);

CREATE INDEX idx_events_stream ON events (stream_id, event_version);
CREATE INDEX idx_events_type ON events (event_type, created_at);
CREATE INDEX idx_events_created ON events (created_at);

-- Snapshots for performance optimization
CREATE TABLE snapshots (
    stream_id       TEXT PRIMARY KEY,
    stream_type     TEXT NOT NULL,
    version         INTEGER NOT NULL,
    state           JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Aggregate Implementation

// domain/order/Order.ts
interface OrderEvent {
  type: string;
  data: Record<string, unknown>;
}

interface OrderState {
  id: string;
  status: 'pending' | 'paid' | 'shipped' | 'cancelled';
  items: Array<{ productId: string; quantity: number; price: number }>;
  totalCents: number;
  customerId: string;
}

// All events the Order aggregate can emit
type OrderPlaced = {
  type: 'OrderPlaced';
  data: { orderId: string; customerId: string; items: OrderState['items']; totalCents: number };
};
type PaymentProcessed = {
  type: 'PaymentProcessed';
  data: { orderId: string; paymentId: string; amountCents: number };
};
type OrderShipped = {
  type: 'OrderShipped';
  data: { orderId: string; trackingNumber: string; carrier: string };
};
type OrderCancelled = {
  type: 'OrderCancelled';
  data: { orderId: string; reason: string };
};

type DomainEvent = OrderPlaced | PaymentProcessed | OrderShipped | OrderCancelled;

export class Order {
  private state: Partial<OrderState> = {};
  private pendingEvents: DomainEvent[] = [];
  version = 0;

  // Apply event to state — pure function, no side effects
  private apply(event: DomainEvent): void {
    switch (event.type) {
      case 'OrderPlaced':
        this.state = {
          id: event.data.orderId,
          status: 'pending',
          items: event.data.items,
          totalCents: event.data.totalCents,
          customerId: event.data.customerId,
        };
        break;
      case 'PaymentProcessed':
        this.state.status = 'paid';
        break;
      case 'OrderShipped':
        this.state.status = 'shipped';
        break;
      case 'OrderCancelled':
        this.state.status = 'cancelled';
        break;
    }
    this.version++;
  }

  // Command: place order — validates and emits event
  static place(params: {
    orderId: string;
    customerId: string;
    items: OrderState['items'];
  }): Order {
    const order = new Order();

    if (params.items.length === 0) {
      throw new Error('Order must have at least one item');
    }

    const totalCents = params.items.reduce(
      (sum, item) => sum + item.price * item.quantity, 0
    );

    const event: OrderPlaced = {
      type: 'OrderPlaced',
      data: { orderId: params.orderId, customerId: params.customerId,
              items: params.items, totalCents },
    };

    order.apply(event);
    order.pendingEvents.push(event);
    return order;
  }

  // Command: process payment
  processPayment(paymentId: string, amountCents: number): void {
    if (this.state.status !== 'pending') {
      throw new Error(`Cannot process payment for order in status: ${this.state.status}`);
    }
    if (amountCents !== this.state.totalCents) {
      throw new Error('Payment amount does not match order total');
    }

    const event: PaymentProcessed = {
      type: 'PaymentProcessed',
      data: { orderId: this.state.id!, paymentId, amountCents },
    };
    this.apply(event);
    this.pendingEvents.push(event);
  }

  // Reconstitute from event history
  static fromEvents(events: DomainEvent[]): Order {
    const order = new Order();
    for (const event of events) {
      order.apply(event);
    }
    return order;
  }

  // Access pending events (for persistence)
  flushEvents(): DomainEvent[] {
    const events = [...this.pendingEvents];
    this.pendingEvents = [];
    return events;
  }

  getState(): Readonly<Partial<OrderState>> {
    return this.state;
  }
}

🚀 Senior Engineers. No Junior Handoffs. Ever.

You get the senior developer, not a project manager who relays your requirements to someone you never meet. Every Viprasol project has a senior lead from kickoff to launch.

  • MVPs in 4–8 weeks, full platforms in 3–5 months
  • Lighthouse 90+ performance scores standard
  • Works across US, UK, AU timezones
  • Free 30-min architecture review, no commitment

Event Store Repository

// infrastructure/OrderRepository.ts
import { Pool } from 'pg';
import { Order } from '../domain/order/Order';

export class OrderRepository {
  constructor(private db: Pool) {}

  async save(order: Order, expectedVersion: number): Promise<void> {
    const events = order.flushEvents();
    if (events.length === 0) return;

    const client = await this.db.connect();
    try {
      await client.query('BEGIN');

      for (let i = 0; i < events.length; i++) {
        const event = events[i];
        const version = expectedVersion + i + 1;

        // UNIQUE constraint on (stream_id, event_version) prevents
        // concurrent writes at same version (optimistic concurrency)
        await client.query(
          `INSERT INTO events (stream_id, stream_type, event_type, event_version, data, metadata)
           VALUES ($1, 'Order', $2, $3, $4, $5)`,
          [
            `order-${order.getState().id}`,
            event.type,
            version,
            JSON.stringify(event.data),
            JSON.stringify({ timestamp: new Date().toISOString() }),
          ]
        );
      }

      await client.query('COMMIT');
    } catch (err: any) {
      await client.query('ROLLBACK');
      if (err.code === '23505') {  // Unique violation = optimistic concurrency conflict
        throw new Error('Concurrent modification detected — please retry');
      }
      throw err;
    } finally {
      client.release();
    }
  }

  async findById(orderId: string): Promise<Order | null> {
    // Check for snapshot first (avoid replaying all events)
    const snapshot = await this.db.query(
      'SELECT version, state FROM snapshots WHERE stream_id = $1',
      [`order-${orderId}`]
    );

    let startVersion = 0;
    let order = new Order();

    if (snapshot.rows.length > 0) {
      // Start from snapshot state instead of event 1
      startVersion = snapshot.rows[0].version;
      // Reconstitute from snapshot...
    }

    // Load events since snapshot (or from beginning)
    const result = await this.db.query(
      `SELECT event_type, data FROM events
       WHERE stream_id = $1 AND event_version > $2
       ORDER BY event_version ASC`,
      [`order-${orderId}`, startVersion]
    );

    if (result.rows.length === 0 && !snapshot.rows.length) return null;

    const events = result.rows.map(row => ({
      type: row.event_type,
      data: row.data,
    }));

    return Order.fromEvents(events as any);
  }
}

Projections (Read Models)

Event sourcing separates writes (events) from reads (projections). Projections listen to events and build optimized read models:

// projections/OrderSummaryProjection.ts
// Runs as a background process or triggered by event store subscription

export class OrderSummaryProjection {
  constructor(private readDb: Pool) {}

  async handle(event: { type: string; data: any; streamId: string }): Promise<void> {
    switch (event.type) {
      case 'OrderPlaced':
        await this.readDb.query(
          `INSERT INTO order_summaries (id, customer_id, status, total_cents, item_count, placed_at)
           VALUES ($1, $2, 'pending', $3, $4, NOW())`,
          [event.data.orderId, event.data.customerId,
           event.data.totalCents, event.data.items.length]
        );
        break;

      case 'PaymentProcessed':
        await this.readDb.query(
          'UPDATE order_summaries SET status = $1, paid_at = NOW() WHERE id = $2',
          ['paid', event.data.orderId]
        );
        break;

      case 'OrderShipped':
        await this.readDb.query(
          `UPDATE order_summaries
           SET status = 'shipped', tracking_number = $1, shipped_at = NOW()
           WHERE id = $2`,
          [event.data.trackingNumber, event.data.orderId]
        );
        break;
    }
  }

  // Rebuild projection from scratch (when schema changes or bugs fixed)
  async rebuild(): Promise<void> {
    await this.readDb.query('TRUNCATE order_summaries');

    const events = await this.readDb.query(
      `SELECT event_type, data, stream_id FROM events
       WHERE stream_type = 'Order'
       ORDER BY id ASC`
    );

    for (const row of events.rows) {
      await this.handle({ type: row.event_type, data: row.data, streamId: row.stream_id });
    }
  }
}

The power of projections: you can add new projections retroactively (a new analytics view, a new reporting table) and rebuild it from the full event history.


Snapshots for Performance

When an aggregate has thousands of events, replaying them all becomes slow. Snapshots cache the state periodically:

// Take snapshot every 100 events
const SNAPSHOT_THRESHOLD = 100;

async function saveWithSnapshot(order: Order, version: number): Promise<void> {
  await repository.save(order, version);

  if (version % SNAPSHOT_THRESHOLD === 0) {
    await db.query(
      `INSERT INTO snapshots (stream_id, stream_type, version, state)
       VALUES ($1, 'Order', $2, $3)
       ON CONFLICT (stream_id) DO UPDATE
       SET version = $2, state = $3, created_at = NOW()`,
      [`order-${order.getState().id}`, version, JSON.stringify(order.getState())]
    );
  }
}

Working With Viprasol

We design and implement event-sourced systems for domains where audit trails and state reconstruction matter — financial platforms, compliance-heavy applications, and complex workflow systems. We also help teams determine when event sourcing adds value vs. when simpler approaches serve better.

Talk to our team about event sourcing and CQRS architecture.


See Also

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need a Modern Web Application?

From landing pages to complex SaaS platforms — we build it all with Next.js and React.

Free consultation • No commitment • Response within 24 hours

Viprasol · Web Development

Need a custom web application built?

We build React and Next.js web applications with Lighthouse ≥90 scores, mobile-first design, and full source code ownership. Senior engineers only — from architecture through deployment.