AWS SQS Message Processing: Consumer Workers, Visibility Timeout, DLQ, and Idempotency
Build reliable SQS message processing workers in Node.js. Covers long polling consumer loops, visibility timeout management, dead letter queues after max retries, idempotency keys to prevent duplicate processing, batch deletion, and Terraform setup.
SQS decouples work from the request/response cycle — heavy operations (sending emails, processing images, running reports) move off the hot path into reliable queues. The challenge isn't publishing to SQS (one SDK call) — it's building the consumer correctly: handling visibility timeout, not double-processing messages, routing failures to a DLQ, and making processors idempotent so retries don't cause side effects.
Terraform: SQS Queue with DLQ
# terraform/sqs.tf
# Dead Letter Queue: receives messages after maxReceiveCount failures
resource "aws_sqs_queue" "email_dlq" {
name = "${var.app_name}-email-dlq"
message_retention_seconds = 1209600 # 14 days (maximum)
tags = local.tags
}
# Main queue with DLQ redrive policy
resource "aws_sqs_queue" "email" {
name = "${var.app_name}-email"
visibility_timeout_seconds = 300 # 5 minutes: must be > max processing time
message_retention_seconds = 86400 # 1 day
receive_wait_time_seconds = 20 # Long polling (saves cost, reduces empty responses)
delay_seconds = 0 # No initial delay
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.email_dlq.arn
maxReceiveCount = 3 # Move to DLQ after 3 failed attempts
})
tags = local.tags
}
# Allow ECS task role to send/receive/delete from this queue
resource "aws_sqs_queue_policy" "email" {
queue_url = aws_sqs_queue.email.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { AWS = var.ecs_task_role_arn }
Action = [
"sqs:SendMessage",
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility",
]
Resource = aws_sqs_queue.email.arn
}]
})
}
# CloudWatch alarm: alert when DLQ has messages
resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
alarm_name = "${var.app_name}-email-dlq-not-empty"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 60
statistic = "Sum"
threshold = 0
alarm_actions = [var.sns_alert_topic_arn]
dimensions = {
QueueName = aws_sqs_queue.email_dlq.name
}
}
SQS Client and Message Types
// lib/queue/sqs-client.ts
import { SQSClient } from "@aws-sdk/client-sqs";
export const sqs = new SQSClient({ region: process.env.AWS_REGION });
// Typed message envelope
export interface QueueMessage<T = unknown> {
type: string; // e.g. "send_invoice_email"
payload: T;
idempotencyKey: string; // Unique per logical operation — prevent double processing
traceId?: string;
createdAt: string;
}
☁️ Is Your Cloud Costing Too Much?
Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.
- AWS, GCP, Azure certified engineers
- Infrastructure as Code (Terraform, CDK)
- Docker, Kubernetes, GitHub Actions CI/CD
- Typical audit recovers $500–$3,000/month in savings
Publishing to SQS
// lib/queue/producer.ts
import { SendMessageCommand } from "@aws-sdk/client-sqs";
import { sqs, type QueueMessage } from "./sqs-client";
import { nanoid } from "nanoid";
export async function enqueue<T>(
queueUrl: string,
type: string,
payload: T,
options?: {
idempotencyKey?: string;
delaySeconds?: number; // 0–900
groupId?: string; // FIFO queues only
}
): Promise<string> {
const message: QueueMessage<T> = {
type,
payload,
idempotencyKey: options?.idempotencyKey ?? nanoid(),
createdAt: new Date().toISOString(),
};
const result = await sqs.send(
new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(message),
DelaySeconds: options?.delaySeconds ?? 0,
MessageGroupId: options?.groupId, // FIFO only
// MessageDeduplicationId: options?.idempotencyKey, // FIFO only
})
);
return result.MessageId!;
}
// Usage
await enqueue(
process.env.SQS_EMAIL_QUEUE_URL!,
"send_invoice_email",
{ invoiceId: "inv_123", recipientEmail: "client@example.com" },
{ idempotencyKey: `invoice:inv_123:send_email` }
);
Consumer Worker
// workers/email-consumer.ts
import {
ReceiveMessageCommand,
DeleteMessageCommand,
DeleteMessageBatchCommand,
ChangeMessageVisibilityCommand,
type Message,
} from "@aws-sdk/client-sqs";
import { sqs, type QueueMessage } from "@/lib/queue/sqs-client";
import { prisma } from "@/lib/prisma";
import { processEmailMessage } from "./handlers/email";
const QUEUE_URL = process.env.SQS_EMAIL_QUEUE_URL!;
const MAX_MESSAGES = 10; // SQS max batch size
const VISIBILITY_TIMEOUT = 300; // Must match queue setting
let isShuttingDown = false;
// Graceful shutdown
process.on("SIGTERM", () => {
console.log("[worker] Received SIGTERM — finishing current batch then shutting down");
isShuttingDown = true;
});
export async function startEmailWorker(): Promise<void> {
console.log("[worker] Email worker started");
while (!isShuttingDown) {
await processBatch();
}
console.log("[worker] Email worker shut down cleanly");
}
async function processBatch(): Promise<void> {
// Long poll: waits up to 20s for messages (reduces empty receives and cost)
const response = await sqs.send(
new ReceiveMessageCommand({
QueueUrl: QUEUE_URL,
MaxNumberOfMessages: MAX_MESSAGES,
WaitTimeSeconds: 20, // Long polling
AttributeNames: ["ApproximateReceiveCount"],
MessageAttributeNames: ["All"],
})
);
const messages = response.Messages ?? [];
if (messages.length === 0) return;
console.log(`[worker] Processing ${messages.length} messages`);
// Process messages concurrently (but not more than batch size)
const results = await Promise.allSettled(
messages.map((msg) => processMessage(msg))
);
// Batch delete successfully processed messages
const toDelete = messages.filter((_, i) => results[i].status === "fulfilled");
if (toDelete.length > 0) {
await sqs.send(
new DeleteMessageBatchCommand({
QueueUrl: QUEUE_URL,
Entries: toDelete.map((msg) => ({
Id: msg.MessageId!,
ReceiptHandle: msg.ReceiptHandle!,
})),
})
);
}
// Log failures (message returns to queue after visibility timeout)
results.forEach((result, i) => {
if (result.status === "rejected") {
console.error(`[worker] Message ${messages[i].MessageId} failed:`, result.reason);
}
});
}
async function processMessage(sqsMessage: Message): Promise<void> {
const body = JSON.parse(sqsMessage.Body!) as QueueMessage;
const receiveCount = parseInt(
sqsMessage.Attributes?.ApproximateReceiveCount ?? "1"
);
// Check idempotency: has this message been processed before?
const alreadyProcessed = await prisma.processedMessage.findUnique({
where: { idempotencyKey: body.idempotencyKey },
});
if (alreadyProcessed) {
console.log(`[worker] Skipping duplicate: ${body.idempotencyKey}`);
return; // Delete from queue (it's fine)
}
// Extend visibility timeout for long-running jobs
// If processing takes >2 minutes, extend to avoid re-delivery
const heartbeat = setInterval(async () => {
await sqs.send(
new ChangeMessageVisibilityCommand({
QueueUrl: QUEUE_URL,
ReceiptHandle: sqsMessage.ReceiptHandle!,
VisibilityTimeout: VISIBILITY_TIMEOUT, // Reset the clock
})
);
}, 60_000); // Extend every 60 seconds
try {
// Route to the appropriate handler
switch (body.type) {
case "send_invoice_email":
await processEmailMessage(body.payload);
break;
default:
throw new Error(`Unknown message type: ${body.type}`);
}
// Mark as processed (idempotency record)
await prisma.processedMessage.create({
data: {
idempotencyKey: body.idempotencyKey,
type: body.type,
processedAt: new Date(),
},
});
} catch (err) {
console.error(
`[worker] Processing failed (attempt ${receiveCount}):`,
body.type, err
);
// If this is the last attempt, log to DB before DLQ
if (receiveCount >= 3) {
await prisma.failedMessage.create({
data: {
idempotencyKey: body.idempotencyKey,
type: body.type,
payload: body.payload as object,
error: (err as Error).message,
receiveCount,
},
});
}
throw err; // Re-throw: message stays in queue for retry
} finally {
clearInterval(heartbeat);
}
}
⚙️ DevOps Done Right — Zero Downtime, Full Automation
Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.
- Staging + production environments with feature flags
- Automated security scanning in the pipeline
- Uptime monitoring + alerting + runbook automation
- On-call support handover docs included
Idempotency Schema
-- Prevent double-processing after retries or duplicate messages
CREATE TABLE processed_messages (
idempotency_key TEXT PRIMARY KEY,
type TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Expire old idempotency records after 7 days (cron job or pg_partman)
-- Retention only needs to exceed SQS message retention (1 day)
CREATE INDEX idx_processed_messages_time ON processed_messages(processed_at);
CREATE TABLE failed_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
idempotency_key TEXT NOT NULL,
type TEXT NOT NULL,
payload JSONB NOT NULL,
error TEXT NOT NULL,
receive_count INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
DLQ Reprocessing
// scripts/reprocess-dlq.ts — manually reprocess DLQ messages after fixing the bug
import { ReceiveMessageCommand, SendMessageCommand, DeleteMessageCommand } from "@aws-sdk/client-sqs";
import { sqs } from "@/lib/queue/sqs-client";
async function reprocessDLQ() {
const dlqUrl = process.env.SQS_EMAIL_DLQ_URL!;
const mainUrl = process.env.SQS_EMAIL_QUEUE_URL!;
let processed = 0;
while (true) {
const res = await sqs.send(
new ReceiveMessageCommand({ QueueUrl: dlqUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 5 })
);
const messages = res.Messages ?? [];
if (messages.length === 0) break;
for (const msg of messages) {
// Re-send to main queue
await sqs.send(
new SendMessageCommand({ QueueUrl: mainUrl, MessageBody: msg.Body! })
);
// Delete from DLQ
await sqs.send(
new DeleteMessageCommand({ QueueUrl: dlqUrl, ReceiptHandle: msg.ReceiptHandle! })
);
processed++;
}
}
console.log(`Reprocessed ${processed} DLQ messages`);
}
reprocessDLQ().catch(console.error);
Cost Estimates
| Component | Cost |
|---|---|
| SQS standard queue | $0.40 per 1M requests |
| SQS FIFO queue | $0.50 per 1M requests |
| Long polling (20s) | Reduces empty receives ~95% |
| Data transfer (within region) | Free |
| CloudWatch alarm | ~$0.10/alarm/month |
See Also
- AWS SQS SNS Patterns
- AWS SQS FIFO Queues
- AWS Lambda Scheduled Events
- AWS EventBridge Patterns
- SaaS Webhook System
Working With Viprasol
The difference between a brittle queue consumer and a production-grade one is visibility timeout management, idempotency, and DLQ alerting. Our team builds SQS workers with long-polling batch receive, concurrent message processing with Promise.allSettled, heartbeat visibility extension for long jobs, idempotency keys checked against PostgreSQL before processing, and DLQ CloudWatch alarms that page on first failure.
What we deliver:
- SQS Terraform: main queue (5m visibility, 20s long poll, DLQ after 3 attempts) + DLQ (14d retention) + CloudWatch alarm
enqueue()helper: typedQueueMessageenvelope with idempotency key and traceIdstartEmailWorker: long-poll loop, concurrentPromise.allSettled, batch delete on successprocessMessage: idempotency check, heartbeatsetInterval60s, DLQ pre-record on last attemptprocessed_messages+failed_messagesschemareprocess-dlq.tsscript: receive DLQ → send to main → delete from DLQ
Talk to our team about your async job processing architecture →
Or explore our cloud infrastructure services.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need DevOps & Cloud Expertise?
Scale your infrastructure with confidence. AWS, GCP, Azure certified team.
Free consultation • No commitment • Response within 24 hours
Making sense of your data at scale?
Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.