Back to Blog

Event Sourcing with PostgreSQL: Append-Only Event Log, Projections, and Snapshots

Implement event sourcing with PostgreSQL. Covers append-only event store design, aggregate reconstruction, projections with event handlers, snapshot optimization, and query patterns for event-sourced systems.

Viprasol Tech Team
March 19, 2027
14 min read

Most applications update records in place — when a user changes their subscription plan, you run UPDATE subscriptions SET plan = 'pro'. But you lose the history: what was the old plan? When did it change? What triggered it? Event sourcing flips this model: instead of storing current state, you store every event that changed the state. Current state is reconstructed by replaying events.

This guide builds a production event sourcing system on PostgreSQL, covering the event store schema, aggregate pattern, projections, and snapshot optimization.

When to Use Event Sourcing

Event sourcing is worth the complexity when you need:

  • Full audit history without separate audit tables
  • Temporal queries ("what did this look like on date X?")
  • Event replay to rebuild broken projections
  • Event-driven integration — other services react to domain events
  • Debugging — reproduce bugs by replaying to the exact failure state

Don't use it for simple CRUD, when audit logs are good enough, or when your team isn't ready for the conceptual shift.

Event Store Schema

-- The core event store: append-only, never update or delete
CREATE TABLE domain_events (
  id              BIGSERIAL PRIMARY KEY,       -- Global ordering sequence
  event_id        UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
  aggregate_type  TEXT NOT NULL,               -- 'Order', 'Subscription', 'User'
  aggregate_id    UUID NOT NULL,
  event_type      TEXT NOT NULL,               -- 'OrderPlaced', 'OrderShipped'
  event_version   INTEGER NOT NULL,            -- Per-aggregate sequence number
  payload         JSONB NOT NULL,              -- Event data
  metadata        JSONB NOT NULL DEFAULT '{}', -- Correlation ID, causation ID, user, IP
  occurred_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),

  -- Optimistic concurrency: prevent two writers from writing version N+1 simultaneously
  UNIQUE(aggregate_id, event_version)
);

-- Projections (read models) — rebuilt from events, optimized for queries
CREATE TABLE order_projections (
  aggregate_id    UUID PRIMARY KEY,
  order_number    TEXT NOT NULL UNIQUE,
  customer_id     UUID NOT NULL,
  status          TEXT NOT NULL,
  line_items      JSONB NOT NULL DEFAULT '[]',
  subtotal        NUMERIC(12,2) NOT NULL DEFAULT 0,
  total           NUMERIC(12,2) NOT NULL DEFAULT 0,
  shipping_addr   JSONB,
  placed_at       TIMESTAMPTZ,
  shipped_at      TIMESTAMPTZ,
  delivered_at    TIMESTAMPTZ,
  cancelled_at    TIMESTAMPTZ,
  last_event_id   BIGINT NOT NULL,             -- Track which events are processed
  version         INTEGER NOT NULL,
  updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Snapshots: periodic aggregate state cache to avoid replaying 10K+ events
CREATE TABLE aggregate_snapshots (
  aggregate_type  TEXT NOT NULL,
  aggregate_id    UUID NOT NULL,
  version         INTEGER NOT NULL,            -- Event version at snapshot time
  state           JSONB NOT NULL,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),

  PRIMARY KEY (aggregate_type, aggregate_id, version)
);

-- Indexes for common access patterns
CREATE INDEX idx_events_aggregate ON domain_events(aggregate_id, event_version);
CREATE INDEX idx_events_type      ON domain_events(aggregate_type, occurred_at DESC);
CREATE INDEX idx_events_occurred  ON domain_events(occurred_at);
CREATE INDEX idx_events_global    ON domain_events(id);  -- For projection catch-up

☁️ Is Your Cloud Costing Too Much?

Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.

  • AWS, GCP, Azure certified engineers
  • Infrastructure as Code (Terraform, CDK)
  • Docker, Kubernetes, GitHub Actions CI/CD
  • Typical audit recovers $500–$3,000/month in savings

Event Type Definitions

// domain/events/order-events.ts
import { z } from "zod";

// Base event schema
const BaseEventSchema = z.object({
  eventId: z.string().uuid(),
  aggregateId: z.string().uuid(),
  occurredAt: z.string().datetime(),
  metadata: z.object({
    userId: z.string().optional(),
    correlationId: z.string().optional(),
    causationId: z.string().optional(),
  }).default({}),
});

// Individual event schemas
export const OrderPlacedSchema = BaseEventSchema.extend({
  eventType: z.literal("OrderPlaced"),
  payload: z.object({
    orderNumber: z.string(),
    customerId: z.string().uuid(),
    lineItems: z.array(z.object({
      productId: z.string(),
      sku: z.string(),
      name: z.string(),
      quantity: z.number().int().positive(),
      unitPrice: z.number().positive(),
    })),
    shippingAddress: z.object({
      line1: z.string(),
      city: z.string(),
      country: z.string(),
      postalCode: z.string(),
    }),
    subtotal: z.number(),
    shippingCost: z.number(),
    total: z.number(),
  }),
});

export const OrderShippedSchema = BaseEventSchema.extend({
  eventType: z.literal("OrderShipped"),
  payload: z.object({
    trackingNumber: z.string(),
    carrier: z.string(),
    estimatedDelivery: z.string().datetime().optional(),
  }),
});

export const OrderDeliveredSchema = BaseEventSchema.extend({
  eventType: z.literal("OrderDelivered"),
  payload: z.object({
    deliveredAt: z.string().datetime(),
    signedBy: z.string().optional(),
  }),
});

export const OrderCancelledSchema = BaseEventSchema.extend({
  eventType: z.literal("OrderCancelled"),
  payload: z.object({
    reason: z.string(),
    cancelledBy: z.string().uuid(),
    refundAmount: z.number().optional(),
  }),
});

// Union type for all order events
export type OrderEvent =
  | z.infer<typeof OrderPlacedSchema>
  | z.infer<typeof OrderShippedSchema>
  | z.infer<typeof OrderDeliveredSchema>
  | z.infer<typeof OrderCancelledSchema>;

Event Store Repository

// infrastructure/event-store.ts
import { prisma } from "@/lib/prisma";

export interface StoredEvent {
  id: bigint;
  eventId: string;
  aggregateType: string;
  aggregateId: string;
  eventType: string;
  eventVersion: number;
  payload: Record<string, unknown>;
  metadata: Record<string, unknown>;
  occurredAt: Date;
}

export class EventStore {
  async append(params: {
    aggregateType: string;
    aggregateId: string;
    events: Array<{
      eventType: string;
      payload: Record<string, unknown>;
      metadata?: Record<string, unknown>;
    }>;
    expectedVersion: number; // -1 for new aggregate, N for optimistic concurrency
  }): Promise<StoredEvent[]> {
    return prisma.$transaction(async (tx) => {
      // Verify current version (optimistic concurrency check)
      const latestEvent = await tx.domainEvent.findFirst({
        where: { aggregateId: params.aggregateId },
        orderBy: { eventVersion: "desc" },
        select: { eventVersion: true },
      });

      const currentVersion = latestEvent?.eventVersion ?? -1;

      if (currentVersion !== params.expectedVersion) {
        throw new ConcurrencyError(
          `Aggregate ${params.aggregateId} expected version ${params.expectedVersion}, got ${currentVersion}`
        );
      }

      // Append all events atomically
      const stored: StoredEvent[] = [];
      let version = currentVersion;

      for (const event of params.events) {
        version++;
        const record = await tx.domainEvent.create({
          data: {
            aggregateType: params.aggregateType,
            aggregateId: params.aggregateId,
            eventType: event.eventType,
            eventVersion: version,
            payload: event.payload,
            metadata: event.metadata ?? {},
          },
        });
        stored.push(record as unknown as StoredEvent);
      }

      return stored;
    });
  }

  async loadEvents(
    aggregateId: string,
    fromVersion = 0
  ): Promise<StoredEvent[]> {
    const events = await prisma.domainEvent.findMany({
      where: {
        aggregateId,
        eventVersion: { gte: fromVersion },
      },
      orderBy: { eventVersion: "asc" },
    });
    return events as unknown as StoredEvent[];
  }

  async loadEventsFromPosition(
    afterId: bigint,
    limit = 100
  ): Promise<StoredEvent[]> {
    const events = await prisma.domainEvent.findMany({
      where: { id: { gt: afterId } },
      orderBy: { id: "asc" },
      take: limit,
    });
    return events as unknown as StoredEvent[];
  }
}

export class ConcurrencyError extends Error {
  constructor(message: string) {
    super(message);
    this.name = "ConcurrencyError";
  }
}

⚙️ DevOps Done Right — Zero Downtime, Full Automation

Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.

  • Staging + production environments with feature flags
  • Automated security scanning in the pipeline
  • Uptime monitoring + alerting + runbook automation
  • On-call support handover docs included

Order Aggregate

// domain/aggregates/order.ts
import type { StoredEvent } from "@/infrastructure/event-store";
import type { OrderEvent } from "@/domain/events/order-events";

type OrderStatus = "pending" | "placed" | "shipped" | "delivered" | "cancelled";

interface OrderState {
  id: string;
  orderNumber: string;
  customerId: string;
  status: OrderStatus;
  lineItems: Array<{
    productId: string;
    sku: string;
    name: string;
    quantity: number;
    unitPrice: number;
  }>;
  total: number;
  shippingAddress: Record<string, string> | null;
  placedAt: Date | null;
  shippedAt: Date | null;
  deliveredAt: Date | null;
  cancelledAt: Date | null;
  version: number;
}

export class Order {
  private state: OrderState;
  private pendingEvents: Array<{
    eventType: string;
    payload: Record<string, unknown>;
  }> = [];

  constructor(id: string) {
    this.state = {
      id,
      orderNumber: "",
      customerId: "",
      status: "pending",
      lineItems: [],
      total: 0,
      shippingAddress: null,
      placedAt: null,
      shippedAt: null,
      deliveredAt: null,
      cancelledAt: null,
      version: -1,
    };
  }

  // Rebuild state from event history
  static fromEvents(id: string, events: StoredEvent[]): Order {
    const order = new Order(id);
    for (const event of events) {
      order.apply(event.eventType, event.payload as Record<string, unknown>);
      order.state.version = event.eventVersion;
    }
    return order;
  }

  // Rebuild from snapshot + subsequent events
  static fromSnapshot(
    snapshot: { state: OrderState; version: number },
    subsequentEvents: StoredEvent[]
  ): Order {
    const order = new Order(snapshot.state.id);
    order.state = snapshot.state;

    for (const event of subsequentEvents) {
      order.apply(event.eventType, event.payload as Record<string, unknown>);
      order.state.version = event.eventVersion;
    }
    return order;
  }

  // Command: place order
  place(params: {
    orderNumber: string;
    customerId: string;
    lineItems: OrderState["lineItems"];
    shippingAddress: Record<string, string>;
    subtotal: number;
    shippingCost: number;
    total: number;
  }): void {
    if (this.state.status !== "pending") {
      throw new Error("Order has already been placed");
    }

    this.recordEvent("OrderPlaced", {
      orderNumber: params.orderNumber,
      customerId: params.customerId,
      lineItems: params.lineItems,
      shippingAddress: params.shippingAddress,
      subtotal: params.subtotal,
      shippingCost: params.shippingCost,
      total: params.total,
    });
  }

  // Command: ship order
  ship(params: {
    trackingNumber: string;
    carrier: string;
    estimatedDelivery?: Date;
  }): void {
    if (this.state.status !== "placed") {
      throw new Error(`Cannot ship order in status: ${this.state.status}`);
    }

    this.recordEvent("OrderShipped", {
      trackingNumber: params.trackingNumber,
      carrier: params.carrier,
      estimatedDelivery: params.estimatedDelivery?.toISOString(),
    });
  }

  // Command: cancel order
  cancel(params: { reason: string; cancelledBy: string; refundAmount?: number }): void {
    if (["delivered", "cancelled"].includes(this.state.status)) {
      throw new Error(`Cannot cancel order in status: ${this.state.status}`);
    }

    this.recordEvent("OrderCancelled", {
      reason: params.reason,
      cancelledBy: params.cancelledBy,
      refundAmount: params.refundAmount,
    });
  }

  private recordEvent(
    eventType: string,
    payload: Record<string, unknown>
  ): void {
    this.pendingEvents.push({ eventType, payload });
    // Apply immediately so state is updated for subsequent commands
    this.apply(eventType, payload);
  }

  // State transition: idempotent, pure
  private apply(eventType: string, payload: Record<string, unknown>): void {
    switch (eventType) {
      case "OrderPlaced":
        this.state.status = "placed";
        this.state.orderNumber = payload.orderNumber as string;
        this.state.customerId = payload.customerId as string;
        this.state.lineItems = payload.lineItems as OrderState["lineItems"];
        this.state.total = payload.total as number;
        this.state.shippingAddress = payload.shippingAddress as Record<string, string>;
        this.state.placedAt = new Date();
        break;
      case "OrderShipped":
        this.state.status = "shipped";
        this.state.shippedAt = new Date();
        break;
      case "OrderDelivered":
        this.state.status = "delivered";
        this.state.deliveredAt = new Date(payload.deliveredAt as string);
        break;
      case "OrderCancelled":
        this.state.status = "cancelled";
        this.state.cancelledAt = new Date();
        break;
    }
  }

  get id() { return this.state.id; }
  get status() { return this.state.status; }
  get version() { return this.state.version; }
  get uncommittedEvents() { return [...this.pendingEvents]; }
  clearUncommittedEvents() { this.pendingEvents = []; }
}

Command Handler

// application/commands/place-order.ts
import { EventStore } from "@/infrastructure/event-store";
import { Order } from "@/domain/aggregates/order";
import { OrderProjectionHandler } from "@/infrastructure/projections/order-projection";

const eventStore = new EventStore();
const projectionHandler = new OrderProjectionHandler();

interface PlaceOrderCommand {
  orderId: string;
  customerId: string;
  lineItems: Array<{
    productId: string;
    sku: string;
    name: string;
    quantity: number;
    unitPrice: number;
  }>;
  shippingAddress: Record<string, string>;
}

export async function placeOrder(cmd: PlaceOrderCommand): Promise<void> {
  // Load aggregate (new order, so no existing events)
  const order = new Order(cmd.orderId);

  const orderNumber = await generateOrderNumber();
  const subtotal = cmd.lineItems.reduce(
    (sum, item) => sum + item.quantity * item.unitPrice,
    0
  );
  const shippingCost = calculateShipping(cmd.shippingAddress, subtotal);

  // Execute command — raises domain events
  order.place({
    orderNumber,
    customerId: cmd.customerId,
    lineItems: cmd.lineItems,
    shippingAddress: cmd.shippingAddress,
    subtotal,
    shippingCost,
    total: subtotal + shippingCost,
  });

  // Persist events
  const storedEvents = await eventStore.append({
    aggregateType: "Order",
    aggregateId: order.id,
    events: order.uncommittedEvents,
    expectedVersion: order.version, // -1 for new aggregate
  });

  order.clearUncommittedEvents();

  // Update read projection immediately (synchronous)
  for (const event of storedEvents) {
    await projectionHandler.handle(event);
  }
}

Projection Handler

// infrastructure/projections/order-projection.ts
import { prisma } from "@/lib/prisma";
import type { StoredEvent } from "@/infrastructure/event-store";

export class OrderProjectionHandler {
  async handle(event: StoredEvent): Promise<void> {
    switch (event.eventType) {
      case "OrderPlaced":
        await this.onOrderPlaced(event);
        break;
      case "OrderShipped":
        await this.onOrderShipped(event);
        break;
      case "OrderDelivered":
        await this.onOrderDelivered(event);
        break;
      case "OrderCancelled":
        await this.onOrderCancelled(event);
        break;
    }
  }

  private async onOrderPlaced(event: StoredEvent): Promise<void> {
    const p = event.payload;
    await prisma.orderProjection.upsert({
      where: { aggregateId: event.aggregateId },
      create: {
        aggregateId: event.aggregateId,
        orderNumber: p.orderNumber as string,
        customerId: p.customerId as string,
        status: "placed",
        lineItems: p.lineItems as any,
        subtotal: p.subtotal as number,
        total: p.total as number,
        shippingAddr: p.shippingAddress as any,
        placedAt: event.occurredAt,
        lastEventId: Number(event.id),
        version: event.eventVersion,
      },
      update: {
        status: "placed",
        lastEventId: Number(event.id),
        version: event.eventVersion,
      },
    });
  }

  private async onOrderShipped(event: StoredEvent): Promise<void> {
    await prisma.orderProjection.update({
      where: { aggregateId: event.aggregateId },
      data: {
        status: "shipped",
        shippedAt: event.occurredAt,
        lastEventId: Number(event.id),
        version: event.eventVersion,
        updatedAt: new Date(),
      },
    });
  }

  private async onOrderDelivered(event: StoredEvent): Promise<void> {
    await prisma.orderProjection.update({
      where: { aggregateId: event.aggregateId },
      data: {
        status: "delivered",
        deliveredAt: new Date(event.payload.deliveredAt as string),
        lastEventId: Number(event.id),
        version: event.eventVersion,
        updatedAt: new Date(),
      },
    });
  }

  private async onOrderCancelled(event: StoredEvent): Promise<void> {
    await prisma.orderProjection.update({
      where: { aggregateId: event.aggregateId },
      data: {
        status: "cancelled",
        cancelledAt: event.occurredAt,
        lastEventId: Number(event.id),
        version: event.eventVersion,
        updatedAt: new Date(),
      },
    });
  }
}

Snapshot Optimization

// infrastructure/snapshot-store.ts

const SNAPSHOT_INTERVAL = 50; // Take snapshot every 50 events

export async function saveSnapshotIfNeeded(
  aggregateType: string,
  aggregate: Order
): Promise<void> {
  if (aggregate.version > 0 && aggregate.version % SNAPSHOT_INTERVAL === 0) {
    await prisma.aggregateSnapshot.upsert({
      where: {
        aggregateType_aggregateId_version: {
          aggregateType,
          aggregateId: aggregate.id,
          version: aggregate.version,
        },
      },
      create: {
        aggregateType,
        aggregateId: aggregate.id,
        version: aggregate.version,
        state: aggregate.toSnapshot(),
      },
      update: {},
    });
  }
}

export async function loadAggregate(
  aggregateType: string,
  aggregateId: string,
  eventStore: EventStore
): Promise<Order> {
  // Find latest snapshot
  const snapshot = await prisma.aggregateSnapshot.findFirst({
    where: { aggregateType, aggregateId },
    orderBy: { version: "desc" },
  });

  if (snapshot) {
    // Load only events after snapshot
    const subsequentEvents = await eventStore.loadEvents(
      aggregateId,
      snapshot.version + 1
    );
    return Order.fromSnapshot(
      { state: snapshot.state as any, version: snapshot.version },
      subsequentEvents
    );
  }

  // No snapshot: replay all events
  const events = await eventStore.loadEvents(aggregateId);
  return Order.fromEvents(aggregateId, events);
}

Temporal Queries

-- What was the order status on a specific date?
SELECT
  aggregate_id,
  event_type,
  payload->>'status' AS status_after_event,
  occurred_at
FROM domain_events
WHERE aggregate_id = $1
  AND occurred_at <= $2  -- Target date
ORDER BY event_version DESC
LIMIT 1;

-- Replay all events for an aggregate (for debugging)
SELECT
  event_version,
  event_type,
  payload,
  metadata,
  occurred_at
FROM domain_events
WHERE aggregate_id = $1
ORDER BY event_version ASC;

-- Event counts by type over time (operational analytics)
SELECT
  DATE_TRUNC('day', occurred_at) AS day,
  event_type,
  COUNT(*) AS count
FROM domain_events
WHERE occurred_at > NOW() - INTERVAL '30 days'
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC;

Cost and Timeline Estimates

ScopeTeamTimelineCost Range
Basic event store + one aggregate1–2 devs1 week$2,000–4,000
Full CQRS (commands + projections + queries)2 devs3–4 weeks$8,000–16,000
Event sourcing migration from CRUD2–3 devs4–8 weeks$15,000–35,000
Enterprise (event bus, saga orchestration, replay tooling)3+ devs8–16 weeks$30,000–80,000

See Also


Working With Viprasol

Event sourcing is a significant architectural commitment — the benefits are real but so is the complexity. Our team has implemented event-sourced systems for fintech and e-commerce products where audit history, temporal queries, and event replay are non-negotiable requirements. We help teams decide whether event sourcing is the right fit and implement it correctly when it is.

What we deliver:

  • Append-only event store schema with optimistic concurrency
  • Aggregate pattern with command/event separation
  • Read projection handlers that rebuild query models
  • Snapshot mechanism for aggregates with long histories
  • Projection rebuild tooling for schema migrations

Talk to our team about event sourcing architecture →

Or explore our cloud infrastructure services.

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need DevOps & Cloud Expertise?

Scale your infrastructure with confidence. AWS, GCP, Azure certified team.

Free consultation • No commitment • Response within 24 hours

Viprasol · Big Data & Analytics

Making sense of your data at scale?

Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.