Redis Advanced Patterns: Lua Scripts, Streams, Sorted Sets, and Distributed Locks
Master advanced Redis patterns: atomic Lua scripts, Redis Streams for event sourcing, sorted sets for leaderboards, and Redlock for distributed locks in production Node.js apps.
Redis ships as a cache. It earns its place in production as a coordination engine. Rate limiters, leaderboards, event streams, distributed locks โ every one of these patterns lives in Redis at scale because it combines in-memory speed with single-threaded atomicity. This post covers the four patterns our team reaches for most, with production-tested TypeScript code you can drop into your stack.
Why Redis Beats a DB for These Patterns
A PostgreSQL rate limiter requires a SELECT โฆ FOR UPDATE, an UPDATE, and a commit โ three round-trips under lock. A Redis Lua script runs in a single atomic operation. For anything that needs sub-millisecond coordination across multiple app servers, Redis wins on architecture, not just speed.
The four patterns covered here are:
- Lua scripts โ atomic multi-key operations with no race conditions
- Redis Streams โ append-only log with consumer groups
- Sorted sets โ real-time leaderboards and time-series indexes
- Redlock โ distributed mutual exclusion
1. Lua Scripts for Atomic Operations
The Problem with Non-Atomic Rate Limiting
// โ WRONG โ race condition between GET and INCR
async function rateLimitBad(key: string, limit: number): Promise<boolean> {
const current = await redis.get(key);
if (current && parseInt(current) >= limit) return false;
await redis.incr(key);
await redis.expire(key, 60);
return true;
}
// Two requests can both pass the GET check before either runs INCR
Lua Script: Sliding Window Rate Limiter
// src/lib/redis/rate-limit.ts
import { createClient } from 'redis';
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
// Lua script loaded once at startup; called via EVALSHA
const SLIDING_WINDOW_SCRIPT = `
local key = KEYS[1]
local window = tonumber(ARGV[1]) -- window in seconds
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3]) -- current epoch ms
-- Remove timestamps outside the window
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
-- Count requests in window
local count = redis.call('ZCARD', key)
if count >= limit then
return {0, count, window} -- blocked, current count, retry-after
end
-- Add current request
redis.call('ZADD', key, now, now .. '-' .. math.random(1, 1000000))
redis.call('EXPIRE', key, window + 1)
return {1, count + 1, 0} -- allowed, new count, 0 retry-after
`;
let scriptSha: string;
async function loadScript(): Promise<void> {
scriptSha = await redis.scriptLoad(SLIDING_WINDOW_SCRIPT);
}
export interface RateLimitResult {
allowed: boolean;
count: number;
retryAfter: number; // seconds
}
export async function checkRateLimit(
identifier: string, // e.g., `rate:api:${userId}` or `rate:ip:${ip}`
windowSec: number,
limit: number
): Promise<RateLimitResult> {
const key = `rl:${identifier}`;
const now = Date.now();
try {
const [allowed, count, retryAfter] = await redis.evalSha(
scriptSha,
{ keys: [key], arguments: [String(windowSec), String(limit), String(now)] }
) as [number, number, number];
return { allowed: allowed === 1, count, retryAfter };
} catch (err: any) {
// NOSCRIPT: script evicted from cache, reload and retry
if (err.message?.includes('NOSCRIPT')) {
await loadScript();
return checkRateLimit(identifier, windowSec, limit);
}
throw err;
}
}
// Initialize on app start
await loadScript();
Express Middleware Using the Rate Limiter
// src/middleware/rate-limit.middleware.ts
import { Request, Response, NextFunction } from 'express';
import { checkRateLimit } from '../lib/redis/rate-limit';
export function rateLimitMiddleware(
windowSec: number,
limit: number,
keyFn: (req: Request) => string = (req) => req.ip ?? 'unknown'
) {
return async (req: Request, res: Response, next: NextFunction) => {
const identifier = keyFn(req);
const result = await checkRateLimit(identifier, windowSec, limit);
res.setHeader('X-RateLimit-Limit', limit);
res.setHeader('X-RateLimit-Remaining', Math.max(0, limit - result.count));
res.setHeader('X-RateLimit-Window', windowSec);
if (!result.allowed) {
res.setHeader('Retry-After', result.retryAfter);
return res.status(429).json({
error: 'Too Many Requests',
retryAfter: result.retryAfter,
});
}
next();
};
}
// Usage:
// app.use('/api/', rateLimitMiddleware(60, 100)); // 100 req/min per IP
// app.use('/api/auth/login', rateLimitMiddleware(900, 5, req => req.body.email));
Token Bucket for Burst Allowance
-- token-bucket.lua
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2]) -- tokens per second
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4]) -- epoch seconds (float)
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
-- Calculate tokens added since last refill
local elapsed = now - last_refill
local new_tokens = math.min(capacity, tokens + elapsed * refill_rate)
if new_tokens < requested then
-- Calculate wait time
local wait = (requested - new_tokens) / refill_rate
return {0, new_tokens, wait}
end
-- Deduct tokens
redis.call('HMSET', key, 'tokens', new_tokens - requested, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 10)
return {1, new_tokens - requested, 0}
๐ Looking for a Dev Team That Actually Delivers?
Most agencies sell you a project manager and assign juniors. Viprasol is different โ senior engineers only, direct Slack access, and a 5.0โ Upwork record across 100+ projects.
- React, Next.js, Node.js, TypeScript โ production-grade stack
- Fixed-price contracts โ no surprise invoices
- Full source code ownership from day one
- 90-day post-launch support included
2. Redis Streams for Event Sourcing
Redis Streams (XADD/XREAD/XACK) give you a persistent, ordered event log with consumer group semantics โ like Kafka, but without the JVM.
Publishing Events
// src/lib/redis/streams.ts
import { createClient, RedisClientType } from 'redis';
const redis: RedisClientType = createClient({ url: process.env.REDIS_URL });
await redis.connect();
export interface StreamEvent {
eventType: string;
aggregateId: string;
aggregateType: string;
payload: Record<string, unknown>;
occurredAt?: string;
}
export async function publishEvent(
stream: string,
event: StreamEvent
): Promise<string> {
const fields: Record<string, string> = {
eventType: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: JSON.stringify(event.payload),
occurredAt: event.occurredAt ?? new Date().toISOString(),
};
// '*' = auto-generated ID (timestamp-sequence)
const id = await redis.xAdd(stream, '*', fields);
return id; // e.g., "1728345600000-0"
}
// Keep streams bounded โ trim to last 10,000 events
export async function publishEventBounded(
stream: string,
event: StreamEvent,
maxLen: number = 10_000
): Promise<string> {
const fields: Record<string, string> = {
eventType: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: JSON.stringify(event.payload),
occurredAt: new Date().toISOString(),
};
return redis.xAdd(stream, '*', fields, {
TRIM: { strategy: 'MAXLEN', strategyModifier: '~', threshold: maxLen },
});
}
Consumer Groups for Parallel Processing
// src/lib/redis/consumer.ts
import { createClient } from 'redis';
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
export interface ConsumerConfig {
stream: string;
group: string;
consumer: string; // unique per pod, e.g., `worker-${process.env.POD_NAME}`
batchSize: number;
blockMs: number; // 0 = block forever, >0 = timeout
}
export type MessageHandler = (
id: string,
fields: Record<string, string>
) => Promise<void>;
export async function ensureConsumerGroup(
stream: string,
group: string
): Promise<void> {
try {
// '$' = only new messages; '0' = all messages from beginning
await redis.xGroupCreate(stream, group, '$', { MKSTREAM: true });
} catch (err: any) {
if (!err.message?.includes('BUSYGROUP')) throw err;
// Group already exists โ fine
}
}
export async function startConsumer(
config: ConsumerConfig,
handler: MessageHandler,
signal?: AbortSignal
): Promise<void> {
await ensureConsumerGroup(config.stream, config.group);
while (!signal?.aborted) {
const messages = await redis.xReadGroup(
config.group,
config.consumer,
[{ key: config.stream, id: '>' }], // '>' = undelivered messages
{ COUNT: config.batchSize, BLOCK: config.blockMs }
);
if (!messages || messages.length === 0) continue;
for (const { name, messages: msgs } of messages) {
for (const { id, message } of msgs) {
try {
await handler(id, message as Record<string, string>);
// ACK only after successful processing
await redis.xAck(config.stream, config.group, id);
} catch (err) {
console.error(`Failed to process message ${id}:`, err);
// Message stays in PEL (pending entry list) for retry
// Monitor with XPENDING for dead-letter handling
}
}
}
}
}
// Reclaim stale messages from crashed consumers
export async function reclaimStalePendingMessages(
stream: string,
group: string,
consumer: string,
minIdleMs: number = 30_000
): Promise<number> {
const pending = await redis.xAutoClaim(
stream,
group,
consumer,
minIdleMs,
'0-0', // start from beginning
{ COUNT: 100 }
);
return pending.messages.length;
}
Order Processing with Streams
// src/services/order.service.ts
import { publishEvent } from '../lib/redis/streams';
import { startConsumer } from '../lib/redis/consumer';
import { db } from '../lib/db';
// Publisher: place an order and emit event
export async function placeOrder(
userId: string,
items: Array<{ productId: string; quantity: number; price: number }>
): Promise<{ orderId: string; eventId: string }> {
const total = items.reduce((sum, i) => sum + i.price * i.quantity, 0);
const order = await db.order.create({
data: { userId, status: 'pending', total, items: { create: items } },
});
const eventId = await publishEvent('orders', {
eventType: 'order.placed',
aggregateId: order.id,
aggregateType: 'order',
payload: { userId, total, itemCount: items.length },
});
return { orderId: order.id, eventId };
}
// Consumer: inventory service processes order events
const controller = new AbortController();
await startConsumer(
{
stream: 'orders',
group: 'inventory-service',
consumer: `inventory-${process.env.POD_NAME ?? 'local'}`,
batchSize: 10,
blockMs: 5000,
},
async (id, fields) => {
if (fields.eventType !== 'order.placed') return;
const payload = JSON.parse(fields.payload) as {
userId: string;
total: number;
itemCount: number;
};
console.log(`Processing order ${fields.aggregateId} for user ${payload.userId}`);
await db.inventoryReservation.create({
data: {
orderId: fields.aggregateId,
status: 'reserved',
reservedAt: new Date(),
},
});
},
controller.signal
);
3. Sorted Sets for Leaderboards and Time Windows
Real-Time Leaderboard
// src/lib/redis/leaderboard.ts
import { createClient } from 'redis';
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
const LEADERBOARD_KEY = 'lb:global';
const USER_METADATA_PREFIX = 'lb:user:';
export interface LeaderboardEntry {
userId: string;
score: number;
rank: number;
username: string;
avatar?: string;
}
export async function updateScore(
userId: string,
scoreDelta: number,
metadata: { username: string; avatar?: string }
): Promise<number> {
// Store metadata in a hash (TTL sync with leaderboard)
await redis.hSet(`${USER_METADATA_PREFIX}${userId}`, {
username: metadata.username,
...(metadata.avatar ? { avatar: metadata.avatar } : {}),
});
// ZINCRBY is atomic โ safe for concurrent updates
const newScore = await redis.zIncrBy(LEADERBOARD_KEY, scoreDelta, userId);
return newScore;
}
export async function getTopN(n: number): Promise<LeaderboardEntry[]> {
// ZREVRANGE returns members ordered highest-to-lowest score
const members = await redis.zRangeWithScores(
LEADERBOARD_KEY,
0,
n - 1,
{ REV: true }
);
const entries: LeaderboardEntry[] = [];
for (let i = 0; i < members.length; i++) {
const { value: userId, score } = members[i];
const meta = await redis.hGetAll(`${USER_METADATA_PREFIX}${userId}`);
entries.push({
userId,
score,
rank: i + 1,
username: meta.username ?? 'Unknown',
avatar: meta.avatar,
});
}
return entries;
}
export async function getUserRank(userId: string): Promise<{
rank: number;
score: number;
surrounding: LeaderboardEntry[];
} | null> {
const [rank, score] = await Promise.all([
redis.zRevRank(LEADERBOARD_KEY, userId),
redis.zScore(LEADERBOARD_KEY, userId),
]);
if (rank === null || score === null) return null;
// Get 2 above and 2 below for context
const start = Math.max(0, rank - 2);
const stop = rank + 2;
const surrounding = await redis.zRangeWithScores(
LEADERBOARD_KEY,
start,
stop,
{ REV: true }
);
const entries: LeaderboardEntry[] = await Promise.all(
surrounding.map(async ({ value: uid, score: s }, idx) => {
const meta = await redis.hGetAll(`${USER_METADATA_PREFIX}${uid}`);
return {
userId: uid,
score: s,
rank: start + idx + 1,
username: meta.username ?? 'Unknown',
avatar: meta.avatar,
};
})
);
return { rank: rank + 1, score, surrounding: entries };
}
Time-Series Index for Recent Activity
// src/lib/redis/activity-feed.ts
const FEED_PREFIX = 'feed:user:';
const GLOBAL_FEED = 'feed:global';
const MAX_FEED_SIZE = 500;
export interface ActivityEvent {
type: 'post' | 'comment' | 'like' | 'follow';
actorId: string;
targetId: string;
metadata?: Record<string, string>;
}
export async function recordActivity(event: ActivityEvent): Promise<void> {
const score = Date.now(); // Unix ms as score for time ordering
const member = JSON.stringify({
type: event.type,
actorId: event.actorId,
targetId: event.targetId,
metadata: event.metadata,
});
const userFeedKey = `${FEED_PREFIX}${event.actorId}`;
await Promise.all([
// User's personal feed
redis.zAdd(userFeedKey, { score, value: member }),
// Global activity feed
redis.zAdd(GLOBAL_FEED, { score, value: member }),
]);
// Trim to last MAX_FEED_SIZE events
await Promise.all([
redis.zRemRangeByRank(userFeedKey, 0, -(MAX_FEED_SIZE + 1)),
redis.zRemRangeByRank(GLOBAL_FEED, 0, -(MAX_FEED_SIZE + 1)),
]);
}
export async function getRecentActivity(
feedKey: string,
limit: number = 20,
beforeMs?: number
): Promise<ActivityEvent[]> {
const max = beforeMs ? `(${beforeMs}` : '+inf'; // exclusive upper bound
const members = await redis.zRangeByScoreWithScores(feedKey, '-inf', max, {
REV: true,
LIMIT: { offset: 0, count: limit },
});
return members.map(({ value }) => JSON.parse(value) as ActivityEvent);
}
๐ Senior Engineers. No Junior Handoffs. Ever.
You get the senior developer, not a project manager who relays your requirements to someone you never meet. Every Viprasol project has a senior lead from kickoff to launch.
- MVPs in 4โ8 weeks, full platforms in 3โ5 months
- Lighthouse 90+ performance scores standard
- Works across US, UK, AU timezones
- Free 30-min architecture review, no commitment
4. Redlock: Distributed Mutual Exclusion
Single-node Redis locks (SET key value NX PX ttl) fail silently when the Redis instance restarts mid-lock. Redlock quorums across N independent Redis nodes: a lock requires acknowledgment from โN/2โ+1 nodes.
Redlock Implementation (3-node quorum)
// src/lib/redis/redlock.ts
import { createClient, RedisClientType } from 'redis';
import crypto from 'crypto';
// Three independent Redis nodes (not cluster โ truly separate)
const nodes: RedisClientType[] = [
createClient({ url: process.env.REDIS_LOCK_URL_1 }),
createClient({ url: process.env.REDIS_LOCK_URL_2 }),
createClient({ url: process.env.REDIS_LOCK_URL_3 }),
];
await Promise.all(nodes.map((n) => n.connect()));
const LOCK_RELEASE_SCRIPT = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
export interface Lock {
key: string;
value: string; // random token for ownership proof
acquiredAt: number;
ttlMs: number;
}
export async function acquireLock(
resource: string,
ttlMs: number,
retries: number = 3,
retryDelayMs: number = 200
): Promise<Lock | null> {
const key = `lock:${resource}`;
const value = crypto.randomBytes(16).toString('hex');
const quorum = Math.floor(nodes.length / 2) + 1;
const drift = Math.floor(ttlMs * 0.01) + 2; // clock drift allowance
for (let attempt = 0; attempt <= retries; attempt++) {
const start = Date.now();
let acquired = 0;
await Promise.all(
nodes.map(async (node) => {
try {
const result = await node.set(key, value, {
NX: true,
PX: ttlMs,
});
if (result === 'OK') acquired++;
} catch {
// Node unavailable โ don't count it
}
})
);
const elapsed = Date.now() - start;
const validityMs = ttlMs - elapsed - drift;
if (acquired >= quorum && validityMs > 0) {
return { key, value, acquiredAt: start, ttlMs: validityMs };
}
// Failed to get quorum โ release all nodes we did acquire
await releaseLock({ key, value, acquiredAt: start, ttlMs });
if (attempt < retries) {
const jitter = Math.floor(Math.random() * retryDelayMs);
await new Promise((r) => setTimeout(r, retryDelayMs + jitter));
}
}
return null; // Could not acquire lock
}
export async function releaseLock(lock: Lock): Promise<void> {
await Promise.allSettled(
nodes.map((node) =>
node.eval(LOCK_RELEASE_SCRIPT, { keys: [lock.key], arguments: [lock.value] })
)
);
}
export async function withLock<T>(
resource: string,
ttlMs: number,
fn: () => Promise<T>
): Promise<T> {
const lock = await acquireLock(resource, ttlMs);
if (!lock) throw new Error(`Could not acquire lock on ${resource}`);
try {
return await fn();
} finally {
await releaseLock(lock);
}
}
// Usage:
// const result = await withLock('invoice:generate:user-123', 30_000, async () => {
// return generateAndStoreInvoice(userId);
// });
Safe Inventory Deduction with Redlock
// src/services/inventory.service.ts
import { withLock } from '../lib/redis/redlock';
import { db } from '../lib/db';
export async function reserveStock(
productId: string,
quantity: number,
orderId: string
): Promise<{ success: boolean; remaining: number }> {
return withLock(`inventory:${productId}`, 10_000, async () => {
const product = await db.product.findUniqueOrThrow({
where: { id: productId },
select: { stockLevel: true },
});
if (product.stockLevel < quantity) {
return { success: false, remaining: product.stockLevel };
}
await db.$transaction([
db.product.update({
where: { id: productId },
data: { stockLevel: { decrement: quantity } },
}),
db.stockReservation.create({
data: { productId, orderId, quantity, status: 'reserved' },
}),
]);
return { success: true, remaining: product.stockLevel - quantity };
});
}
Production Redis Configuration
redis.conf Hardening
# /etc/redis/redis.conf
# Persistence: AOF + RDB for durability
save 900 1
save 300 10
save 60 10000
appendonly yes
appendfsync everysec # balance durability vs performance
no-appendfsync-on-rewrite no # safer on high-write systems
# Memory management
maxmemory 4gb
maxmemory-policy allkeys-lru # evict LRU keys when memory full (for cache nodes)
# Use: noeviction for queue/stream nodes
# Connection limits
maxclients 10000
tcp-backlog 511
timeout 300
tcp-keepalive 60
# Disable dangerous commands in production
rename-command FLUSHALL ""
rename-command FLUSHDB ""
rename-command DEBUG ""
rename-command CONFIG "CONFIG-PROD-ONLY"
# Cluster mode (if applicable)
# cluster-enabled yes
# cluster-config-file nodes.conf
# cluster-node-timeout 15000
Connection Pool with ioredis
// src/lib/redis/client.ts โ production client setup
import { Redis, Cluster } from 'ioredis';
function createRedisClient(): Redis | Cluster {
if (process.env.REDIS_CLUSTER_NODES) {
const nodes = process.env.REDIS_CLUSTER_NODES.split(',').map((addr) => {
const [host, port] = addr.split(':');
return { host, port: parseInt(port, 10) };
});
return new Cluster(nodes, {
redisOptions: {
password: process.env.REDIS_PASSWORD,
tls: process.env.NODE_ENV === 'production' ? {} : undefined,
connectTimeout: 5000,
commandTimeout: 3000,
maxRetriesPerRequest: 3,
},
scaleReads: 'slave', // reads from replicas
clusterRetryStrategy: (times) => Math.min(times * 100, 3000),
});
}
return new Redis({
host: process.env.REDIS_HOST ?? '127.0.0.1',
port: parseInt(process.env.REDIS_PORT ?? '6379', 10),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB ?? '0', 10),
maxRetriesPerRequest: 3,
enableReadyCheck: true,
lazyConnect: false,
reconnectOnError: (err) => {
// Force reconnect on READONLY errors (failover scenarios)
return err.message.includes('READONLY');
},
});
}
export const redis = createRedisClient();
redis.on('error', (err) => console.error('Redis error:', err));
redis.on('connect', () => console.info('Redis connected'));
redis.on('reconnecting', () => console.warn('Redis reconnecting'));
Cost and Infrastructure Reference
| Setup | Monthly Cost (AWS ElastiCache) | Use Case |
|---|---|---|
cache.t4g.micro (1 node) | ~$14/mo | Dev / small apps |
cache.r7g.large (1 node, 13GB) | ~$185/mo | Single AZ production |
cache.r7g.large (1 primary + 2 replicas) | ~$555/mo | Multi-AZ HA |
cache.r7g.2xlarge cluster (6 shards) | ~$2,600/mo | High-throughput distributed |
Self-managed on EC2 m7g.xlarge | ~$140/mo (no SLA) | Cost-optimized with ops burden |
Typical client project sizing for SaaS applications:
- Startup (< 100K MAU): Single
r7g.largenode with Multi-AZ replica โ $370/mo - Growth (100Kโ1M MAU): 3-shard cluster โ $780/mo
- Scale (1M+ MAU): 6-shard cluster + read replicas per AZ โ $2,800โ5,000/mo
Key Rules Our Team Follows
- Never use KEYS * in production โ O(N) blocks all other commands. Use SCAN with cursor.
- Lua scripts for atomicity, not performance โ The speedup is a bonus; correctness is the reason.
- Set TTL on every key โ Memory leaks from orphaned keys are a common production incident.
- Monitor
SLOWLOG GET 25โ Any command >10ms in Redis is a red flag. - Separate Redis instances for caching vs. queuing โ
allkeys-lrueviction is catastrophic on a queue node. - Test reconnection logic โ Redis failover is routine; your app must survive it gracefully.
See Also
- OpenTelemetry for Node.js: Auto-Instrumentation to Custom Spans
- Caching Strategies: From Browser to Edge CDN
- Distributed Systems Patterns Every Engineer Should Know
- WebSocket Scaling: Redis Pub/Sub and Sticky Sessions
- Background Jobs Architecture with BullMQ
Working With Viprasol
Building a system that needs real-time coordination โ leaderboards, rate limiting, distributed locking, event streams? Our engineering team has shipped Redis-backed infrastructure across fintech, SaaS, and trading platforms.
We handle Redis cluster design, connection pool tuning, Lua script authoring, and full observability setup so your cache layer never becomes a reliability liability.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need a Modern Web Application?
From landing pages to complex SaaS platforms โ we build it all with Next.js and React.
Free consultation โข No commitment โข Response within 24 hours
Need a custom web application built?
We build React and Next.js web applications with Lighthouse โฅ90 scores, mobile-first design, and full source code ownership. Senior engineers only โ from architecture through deployment.