Back to Blog

Event-Driven Microservices: Kafka Patterns, Saga Orchestration, and Idempotency

Build event-driven microservices that work in production: design Kafka topic schemas with Avro, implement the saga pattern for distributed transactions, enforce idempotency to handle duplicate events, and handle consumer group rebalancing without data loss.

Viprasol Tech Team
October 27, 2026
13 min read

Event-driven microservices communicate by publishing and consuming events from a message broker (Kafka, RabbitMQ, AWS EventBridge). Each service is decoupled — the order service doesn't call the inventory service; it publishes an order.placed event, and the inventory service consumes it at its own pace.

The decoupling is real, but the complexity moves: you gain loose coupling and lose synchronous consistency. Distributed transactions now require the saga pattern, duplicate events require idempotency, and consumer failures require careful offset management.


Kafka Topic Design

// src/events/schemas/order.ts
// Event schemas are the API contract between services

// Versioned event schemas — never break the contract
// Use Avro or JSON Schema; this example uses TypeScript for clarity

export interface OrderPlacedV1 {
  eventId: string;        // UUID — for deduplication
  eventType: "order.placed";
  eventVersion: 1;
  occurredAt: string;     // ISO 8601
  aggregateId: string;    // Order ID
  aggregateType: "order";

  payload: {
    orderId: string;
    customerId: string;
    tenantId: string;
    items: Array<{
      productId: string;
      quantity: number;
      unitPriceUsd: number;
    }>;
    totalAmountUsd: number;
    shippingAddressId: string;
  };
}

export interface OrderCancelledV1 {
  eventId: string;
  eventType: "order.cancelled";
  eventVersion: 1;
  occurredAt: string;
  aggregateId: string;
  aggregateType: "order";
  payload: {
    orderId: string;
    customerId: string;
    reason: "customer_requested" | "payment_failed" | "out_of_stock";
  };
}

// Union type for all order events
export type OrderEvent = OrderPlacedV1 | OrderCancelledV1;
Kafka topic naming convention:
  {service}.{aggregate}.{event-type}
  orders.order.placed          — Order service, order aggregate, placed event
  inventory.product.reserved   — Inventory service
  payments.charge.succeeded    — Payments service

Partitioning:
  Partition by aggregateId (customerId, orderId, tenantId)
  → All events for the same entity go to the same partition
  → Guarantees ordering for the same entity
  
Retention:
  Transactional events: 7 days (for replay)
  Audit/compliance: 365 days
  Analytics: unlimited (BigQuery sink)

Kafka Producer

// src/events/producer.ts
import { Kafka, Producer, CompressionTypes } from "kafkajs";

const kafka = new Kafka({
  clientId: "orders-service",
  brokers: process.env.KAFKA_BROKERS!.split(","),
  ssl: process.env.NODE_ENV === "production",
  sasl:
    process.env.NODE_ENV === "production"
      ? {
          mechanism: "scram-sha-512",
          username: process.env.KAFKA_USERNAME!,
          password: process.env.KAFKA_PASSWORD!,
        }
      : undefined,
});

let producer: Producer | null = null;

export async function getProducer(): Promise<Producer> {
  if (!producer) {
    producer = kafka.producer({
      transactionalId: "orders-service-producer",  // Enable exactly-once semantics
      maxInFlightRequests: 5,
      idempotent: true,  // Prevent duplicate messages on retry
    });
    await producer.connect();
  }
  return producer;
}

export async function publishEvent<T extends { eventId: string; aggregateId: string }>(
  topic: string,
  event: T
): Promise<void> {
  const prod = await getProducer();

  await prod.send({
    topic,
    messages: [
      {
        // Partition key: route all events for same aggregate to same partition
        key: event.aggregateId,
        value: JSON.stringify(event),
        headers: {
          "event-id": event.eventId,
          "content-type": "application/json",
          "schema-version": "1",
        },
      },
    ],
    compression: CompressionTypes.GZIP,
    acks: -1, // All replicas must acknowledge (strongest durability guarantee)
  });
}

// Usage in order service:
export async function placeOrder(orderData: CreateOrderInput): Promise<Order> {
  const order = await db.transaction(async (trx) => {
    const order = await trx.insert(orders).values(orderData).returning();
    return order[0];
  });

  // Publish event AFTER successful DB write
  await publishEvent("orders.order.placed", {
    eventId: crypto.randomUUID(),
    eventType: "order.placed" as const,
    eventVersion: 1,
    occurredAt: new Date().toISOString(),
    aggregateId: order.id,
    aggregateType: "order",
    payload: {
      orderId: order.id,
      customerId: order.customerId,
      tenantId: order.tenantId,
      items: order.items,
      totalAmountUsd: order.totalAmountUsd,
      shippingAddressId: order.shippingAddressId,
    },
  } satisfies OrderPlacedV1);

  return order;
}

☁️ Is Your Cloud Costing Too Much?

Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.

  • AWS, GCP, Azure certified engineers
  • Infrastructure as Code (Terraform, CDK)
  • Docker, Kubernetes, GitHub Actions CI/CD
  • Typical audit recovers $500–$3,000/month in savings

Idempotent Consumer

// src/events/consumer.ts
import { Kafka, Consumer, EachMessagePayload } from "kafkajs";

const consumer = kafka.consumer({
  groupId: "inventory-service",
  sessionTimeout: 30_000,
  heartbeatInterval: 3_000,
});

// Idempotency: track processed event IDs to handle duplicates
async function isEventAlreadyProcessed(eventId: string): Promise<boolean> {
  const { rows } = await db.query(
    "SELECT 1 FROM processed_events WHERE event_id = $1",
    [eventId]
  );
  return rows.length > 0;
}

async function markEventProcessed(
  eventId: string,
  topic: string
): Promise<void> {
  await db.query(
    `INSERT INTO processed_events (event_id, topic, processed_at)
     VALUES ($1, $2, NOW())
     ON CONFLICT (event_id) DO NOTHING`,
    [eventId, topic]
  );
}

async function handleOrderPlaced(event: OrderPlacedV1): Promise<void> {
  // Business logic: reserve inventory for each item
  await db.transaction(async (trx) => {
    for (const item of event.payload.items) {
      const result = await trx.query<{ quantity: number }>(
        `UPDATE inventory
         SET reserved = reserved + $1,
             available = available - $1
         WHERE product_id = $2
           AND available >= $1
         RETURNING quantity`,
        [item.quantity, item.productId]
      );

      if (result.rows.length === 0) {
        // Insufficient inventory — publish compensating event
        await publishEvent("inventory.product.reservation-failed", {
          eventId: crypto.randomUUID(),
          eventType: "inventory.reservation.failed",
          eventVersion: 1,
          occurredAt: new Date().toISOString(),
          aggregateId: event.payload.orderId,
          aggregateType: "order",
          payload: {
            orderId: event.payload.orderId,
            productId: item.productId,
            requestedQuantity: item.quantity,
          },
        });
        return; // Stop processing — saga will handle compensation
      }
    }

    // Mark event as processed within the same transaction
    await markEventProcessed(event.eventId, "orders.order.placed");
  });
}

// Message handler with idempotency check
async function processMessage({
  topic,
  message,
}: EachMessagePayload): Promise<void> {
  const rawEvent = JSON.parse(message.value!.toString());
  const eventId = message.headers?.["event-id"]?.toString();

  if (!eventId) {
    console.warn("Event missing event-id header, cannot guarantee idempotency");
  } else if (await isEventAlreadyProcessed(eventId)) {
    console.log(`Skipping duplicate event: ${eventId}`);
    return; // Already processed — safe to skip
  }

  switch (rawEvent.eventType) {
    case "order.placed":
      await handleOrderPlaced(rawEvent as OrderPlacedV1);
      break;
    case "order.cancelled":
      await handleOrderCancelled(rawEvent as OrderCancelledV1);
      break;
    default:
      console.warn(`Unknown event type: ${rawEvent.eventType}`);
  }
}

Saga Pattern for Distributed Transactions

// src/sagas/order-fulfillment.saga.ts
// Choreography saga: services react to events and publish compensating events on failure

// Happy path:
// order.placed → inventory.reserved → payment.charged → shipment.created

// Compensation (rollback) path if payment fails:
// payment.failed → inventory.release-reservation (compensating event)
//                → order.cancelled

type SagaState = "inventory_reserved" | "payment_charging" | "completed" | "cancelled";

async function handleInventoryReservationFailed(
  event: InventoryReservationFailedEvent
): Promise<void> {
  // Compensation: cancel the order
  await db.query(
    "UPDATE orders SET status = 'cancelled', cancelled_reason = 'out_of_stock' WHERE id = $1",
    [event.payload.orderId]
  );

  await publishEvent("orders.order.cancelled", {
    eventId: crypto.randomUUID(),
    eventType: "order.cancelled",
    eventVersion: 1,
    occurredAt: new Date().toISOString(),
    aggregateId: event.payload.orderId,
    aggregateType: "order",
    payload: {
      orderId: event.payload.orderId,
      customerId: event.payload.customerId,
      reason: "out_of_stock",
    },
  } satisfies OrderCancelledV1);
}

async function handlePaymentFailed(event: PaymentFailedEvent): Promise<void> {
  // Compensation step 1: release inventory reservation
  await publishEvent("inventory.reservation.release", {
    eventId: crypto.randomUUID(),
    eventType: "inventory.reservation.release",
    eventVersion: 1,
    occurredAt: new Date().toISOString(),
    aggregateId: event.payload.orderId,
    aggregateType: "order",
    payload: { orderId: event.payload.orderId },
  });

  // Compensation step 2: cancel order
  await publishEvent("orders.order.cancelled", {
    eventId: crypto.randomUUID(),
    eventType: "order.cancelled",
    eventVersion: 1,
    occurredAt: new Date().toISOString(),
    aggregateId: event.payload.orderId,
    aggregateType: "order",
    payload: {
      orderId: event.payload.orderId,
      customerId: event.payload.customerId,
      reason: "payment_failed",
    },
  } satisfies OrderCancelledV1);
}

⚙️ DevOps Done Right — Zero Downtime, Full Automation

Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.

  • Staging + production environments with feature flags
  • Automated security scanning in the pipeline
  • Uptime monitoring + alerting + runbook automation
  • On-call support handover docs included

Consumer Group Rebalancing Safety

// Commit offsets only AFTER successful processing (not before)
await consumer.run({
  autoCommit: false,  // Manual offset commits only
  eachMessage: async (payload) => {
    try {
      await processMessage(payload);

      // Commit offset only after successful processing
      await consumer.commitOffsets([
        {
          topic: payload.topic,
          partition: payload.partition,
          offset: (BigInt(payload.message.offset) + 1n).toString(),
        },
      ]);
    } catch (error) {
      console.error("Message processing failed:", error);
      // Don't commit offset — message will be redelivered after rebalance
      // Ensure your handler is idempotent (it will be called again)
      throw error; // KafkaJS will pause and retry
    }
  },
});

See Also


Working With Viprasol

Event-driven microservices shift complexity from synchronous APIs to asynchronous event flows. We design Kafka topic schemas, implement idempotent consumers that handle duplicate events gracefully, build saga patterns for distributed transactions, and instrument consumer lag monitoring so teams can detect processing failures before they cascade.

Microservices architecture → | Talk to our engineers →

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need DevOps & Cloud Expertise?

Scale your infrastructure with confidence. AWS, GCP, Azure certified team.

Free consultation • No commitment • Response within 24 hours

Viprasol · Big Data & Analytics

Making sense of your data at scale?

Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.