Event-Driven Microservices: Kafka Patterns, Saga Orchestration, and Idempotency
Build event-driven microservices that work in production: design Kafka topic schemas with Avro, implement the saga pattern for distributed transactions, enforce idempotency to handle duplicate events, and handle consumer group rebalancing without data loss.
Event-driven microservices communicate by publishing and consuming events from a message broker (Kafka, RabbitMQ, AWS EventBridge). Each service is decoupled — the order service doesn't call the inventory service; it publishes an order.placed event, and the inventory service consumes it at its own pace.
The decoupling is real, but the complexity moves: you gain loose coupling and lose synchronous consistency. Distributed transactions now require the saga pattern, duplicate events require idempotency, and consumer failures require careful offset management.
Kafka Topic Design
// src/events/schemas/order.ts
// Event schemas are the API contract between services
// Versioned event schemas — never break the contract
// Use Avro or JSON Schema; this example uses TypeScript for clarity
export interface OrderPlacedV1 {
eventId: string; // UUID — for deduplication
eventType: "order.placed";
eventVersion: 1;
occurredAt: string; // ISO 8601
aggregateId: string; // Order ID
aggregateType: "order";
payload: {
orderId: string;
customerId: string;
tenantId: string;
items: Array<{
productId: string;
quantity: number;
unitPriceUsd: number;
}>;
totalAmountUsd: number;
shippingAddressId: string;
};
}
export interface OrderCancelledV1 {
eventId: string;
eventType: "order.cancelled";
eventVersion: 1;
occurredAt: string;
aggregateId: string;
aggregateType: "order";
payload: {
orderId: string;
customerId: string;
reason: "customer_requested" | "payment_failed" | "out_of_stock";
};
}
// Union type for all order events
export type OrderEvent = OrderPlacedV1 | OrderCancelledV1;
Kafka topic naming convention:
{service}.{aggregate}.{event-type}
orders.order.placed — Order service, order aggregate, placed event
inventory.product.reserved — Inventory service
payments.charge.succeeded — Payments service
Partitioning:
Partition by aggregateId (customerId, orderId, tenantId)
→ All events for the same entity go to the same partition
→ Guarantees ordering for the same entity
Retention:
Transactional events: 7 days (for replay)
Audit/compliance: 365 days
Analytics: unlimited (BigQuery sink)
Kafka Producer
// src/events/producer.ts
import { Kafka, Producer, CompressionTypes } from "kafkajs";
const kafka = new Kafka({
clientId: "orders-service",
brokers: process.env.KAFKA_BROKERS!.split(","),
ssl: process.env.NODE_ENV === "production",
sasl:
process.env.NODE_ENV === "production"
? {
mechanism: "scram-sha-512",
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_PASSWORD!,
}
: undefined,
});
let producer: Producer | null = null;
export async function getProducer(): Promise<Producer> {
if (!producer) {
producer = kafka.producer({
transactionalId: "orders-service-producer", // Enable exactly-once semantics
maxInFlightRequests: 5,
idempotent: true, // Prevent duplicate messages on retry
});
await producer.connect();
}
return producer;
}
export async function publishEvent<T extends { eventId: string; aggregateId: string }>(
topic: string,
event: T
): Promise<void> {
const prod = await getProducer();
await prod.send({
topic,
messages: [
{
// Partition key: route all events for same aggregate to same partition
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
"event-id": event.eventId,
"content-type": "application/json",
"schema-version": "1",
},
},
],
compression: CompressionTypes.GZIP,
acks: -1, // All replicas must acknowledge (strongest durability guarantee)
});
}
// Usage in order service:
export async function placeOrder(orderData: CreateOrderInput): Promise<Order> {
const order = await db.transaction(async (trx) => {
const order = await trx.insert(orders).values(orderData).returning();
return order[0];
});
// Publish event AFTER successful DB write
await publishEvent("orders.order.placed", {
eventId: crypto.randomUUID(),
eventType: "order.placed" as const,
eventVersion: 1,
occurredAt: new Date().toISOString(),
aggregateId: order.id,
aggregateType: "order",
payload: {
orderId: order.id,
customerId: order.customerId,
tenantId: order.tenantId,
items: order.items,
totalAmountUsd: order.totalAmountUsd,
shippingAddressId: order.shippingAddressId,
},
} satisfies OrderPlacedV1);
return order;
}
☁️ Is Your Cloud Costing Too Much?
Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.
- AWS, GCP, Azure certified engineers
- Infrastructure as Code (Terraform, CDK)
- Docker, Kubernetes, GitHub Actions CI/CD
- Typical audit recovers $500–$3,000/month in savings
Idempotent Consumer
// src/events/consumer.ts
import { Kafka, Consumer, EachMessagePayload } from "kafkajs";
const consumer = kafka.consumer({
groupId: "inventory-service",
sessionTimeout: 30_000,
heartbeatInterval: 3_000,
});
// Idempotency: track processed event IDs to handle duplicates
async function isEventAlreadyProcessed(eventId: string): Promise<boolean> {
const { rows } = await db.query(
"SELECT 1 FROM processed_events WHERE event_id = $1",
[eventId]
);
return rows.length > 0;
}
async function markEventProcessed(
eventId: string,
topic: string
): Promise<void> {
await db.query(
`INSERT INTO processed_events (event_id, topic, processed_at)
VALUES ($1, $2, NOW())
ON CONFLICT (event_id) DO NOTHING`,
[eventId, topic]
);
}
async function handleOrderPlaced(event: OrderPlacedV1): Promise<void> {
// Business logic: reserve inventory for each item
await db.transaction(async (trx) => {
for (const item of event.payload.items) {
const result = await trx.query<{ quantity: number }>(
`UPDATE inventory
SET reserved = reserved + $1,
available = available - $1
WHERE product_id = $2
AND available >= $1
RETURNING quantity`,
[item.quantity, item.productId]
);
if (result.rows.length === 0) {
// Insufficient inventory — publish compensating event
await publishEvent("inventory.product.reservation-failed", {
eventId: crypto.randomUUID(),
eventType: "inventory.reservation.failed",
eventVersion: 1,
occurredAt: new Date().toISOString(),
aggregateId: event.payload.orderId,
aggregateType: "order",
payload: {
orderId: event.payload.orderId,
productId: item.productId,
requestedQuantity: item.quantity,
},
});
return; // Stop processing — saga will handle compensation
}
}
// Mark event as processed within the same transaction
await markEventProcessed(event.eventId, "orders.order.placed");
});
}
// Message handler with idempotency check
async function processMessage({
topic,
message,
}: EachMessagePayload): Promise<void> {
const rawEvent = JSON.parse(message.value!.toString());
const eventId = message.headers?.["event-id"]?.toString();
if (!eventId) {
console.warn("Event missing event-id header, cannot guarantee idempotency");
} else if (await isEventAlreadyProcessed(eventId)) {
console.log(`Skipping duplicate event: ${eventId}`);
return; // Already processed — safe to skip
}
switch (rawEvent.eventType) {
case "order.placed":
await handleOrderPlaced(rawEvent as OrderPlacedV1);
break;
case "order.cancelled":
await handleOrderCancelled(rawEvent as OrderCancelledV1);
break;
default:
console.warn(`Unknown event type: ${rawEvent.eventType}`);
}
}
Saga Pattern for Distributed Transactions
// src/sagas/order-fulfillment.saga.ts
// Choreography saga: services react to events and publish compensating events on failure
// Happy path:
// order.placed → inventory.reserved → payment.charged → shipment.created
// Compensation (rollback) path if payment fails:
// payment.failed → inventory.release-reservation (compensating event)
// → order.cancelled
type SagaState = "inventory_reserved" | "payment_charging" | "completed" | "cancelled";
async function handleInventoryReservationFailed(
event: InventoryReservationFailedEvent
): Promise<void> {
// Compensation: cancel the order
await db.query(
"UPDATE orders SET status = 'cancelled', cancelled_reason = 'out_of_stock' WHERE id = $1",
[event.payload.orderId]
);
await publishEvent("orders.order.cancelled", {
eventId: crypto.randomUUID(),
eventType: "order.cancelled",
eventVersion: 1,
occurredAt: new Date().toISOString(),
aggregateId: event.payload.orderId,
aggregateType: "order",
payload: {
orderId: event.payload.orderId,
customerId: event.payload.customerId,
reason: "out_of_stock",
},
} satisfies OrderCancelledV1);
}
async function handlePaymentFailed(event: PaymentFailedEvent): Promise<void> {
// Compensation step 1: release inventory reservation
await publishEvent("inventory.reservation.release", {
eventId: crypto.randomUUID(),
eventType: "inventory.reservation.release",
eventVersion: 1,
occurredAt: new Date().toISOString(),
aggregateId: event.payload.orderId,
aggregateType: "order",
payload: { orderId: event.payload.orderId },
});
// Compensation step 2: cancel order
await publishEvent("orders.order.cancelled", {
eventId: crypto.randomUUID(),
eventType: "order.cancelled",
eventVersion: 1,
occurredAt: new Date().toISOString(),
aggregateId: event.payload.orderId,
aggregateType: "order",
payload: {
orderId: event.payload.orderId,
customerId: event.payload.customerId,
reason: "payment_failed",
},
} satisfies OrderCancelledV1);
}
⚙️ DevOps Done Right — Zero Downtime, Full Automation
Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.
- Staging + production environments with feature flags
- Automated security scanning in the pipeline
- Uptime monitoring + alerting + runbook automation
- On-call support handover docs included
Consumer Group Rebalancing Safety
// Commit offsets only AFTER successful processing (not before)
await consumer.run({
autoCommit: false, // Manual offset commits only
eachMessage: async (payload) => {
try {
await processMessage(payload);
// Commit offset only after successful processing
await consumer.commitOffsets([
{
topic: payload.topic,
partition: payload.partition,
offset: (BigInt(payload.message.offset) + 1n).toString(),
},
]);
} catch (error) {
console.error("Message processing failed:", error);
// Don't commit offset — message will be redelivered after rebalance
// Ensure your handler is idempotent (it will be called again)
throw error; // KafkaJS will pause and retry
}
},
});
See Also
- Distributed Systems Patterns — CAP theorem, consistency
- Event Sourcing Architecture — events as the system of record
- Background Jobs Architecture — job queues vs event streams
- WebSocket Scaling — real-time delivery from events
Working With Viprasol
Event-driven microservices shift complexity from synchronous APIs to asynchronous event flows. We design Kafka topic schemas, implement idempotent consumers that handle duplicate events gracefully, build saga patterns for distributed transactions, and instrument consumer lag monitoring so teams can detect processing failures before they cascade.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need DevOps & Cloud Expertise?
Scale your infrastructure with confidence. AWS, GCP, Azure certified team.
Free consultation • No commitment • Response within 24 hours
Making sense of your data at scale?
Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.