AWS SQS Worker Pattern in 2026: Consumer Workers, Dead-Letter Queues, and Terraform
Build production AWS SQS worker patterns: consumer workers with visibility timeout, dead-letter queues, FIFO queues, batch processing, poison pill handling, and complete Terraform setup.
AWS SQS Worker Pattern in 2026: Consumer Workers, Dead-Letter Queues, and Terraform
SQS is the backbone of async processing on AWS—email sending, report generation, webhook delivery, image resizing, payment reconciliation. When it's configured correctly, it gives you at-least-once delivery, automatic scaling, and dead-letter queues for poison pills. When it's configured badly, you get silent failures, infinite retry loops, and customers complaining that their reports never arrived.
This post covers the production SQS setup: queue configuration with visibility timeout tuning, a TypeScript consumer worker with graceful shutdown, batch processing, dead-letter queue (DLQ) handling, FIFO queues for ordered processing, and the Terraform module that manages it all.
SQS Concepts That Matter for Production
Visibility Timeout: After a consumer receives a message, it becomes invisible to other consumers for the visibility timeout duration. If the consumer doesn't delete it before timeout expires, the message reappears and can be reprocessed. Set this longer than your worst-case processing time.
Dead-Letter Queue (DLQ): After a message fails maxReceiveCount times, SQS moves it to the DLQ. Without a DLQ, failed messages retry forever.
Long Polling: Instead of returning empty responses when the queue is empty, long polling waits up to 20 seconds for a message. This reduces API calls and cost by 90%.
FIFO vs Standard: Standard queues have higher throughput but may deliver messages out-of-order or more than once. FIFO queues guarantee exactly-once processing and ordering within a message group, but are limited to 3,000 messages/second with batching.
Terraform Module
Variables
# modules/sqs-worker/variables.tf
variable "name" {
description = "Queue name (without environment suffix)"
type = string
}
variable "environment" {
type = string
}
variable "fifo" {
description = "Use FIFO queue for ordered, exactly-once delivery"
type = bool
default = false
}
variable "visibility_timeout_seconds" {
description = "Max time to process one message (set >= your p99 processing time)"
type = number
default = 300 # 5 minutes
}
variable "message_retention_seconds" {
description = "How long messages stay in queue before expiry"
type = number
default = 86400 # 24 hours
}
variable "max_receive_count" {
description = "Number of receive attempts before moving to DLQ"
type = number
default = 3
}
variable "dlq_retention_seconds" {
description = "How long DLQ messages are retained (for investigation)"
type = number
default = 1209600 # 14 days
}
variable "batch_size" {
description = "Max messages per batch (Lambda trigger)"
type = number
default = 10
}
variable "sns_alert_arn" {
description = "SNS topic ARN for DLQ alarms"
type = string
default = ""
}
variable "tags" {
type = map(string)
default = {}
}
Main module
# modules/sqs-worker/main.tf
locals {
suffix = var.fifo ? ".fifo" : ""
full_name = "${var.name}-${var.environment}${local.suffix}"
dlq_name = "${var.name}-${var.environment}-dlq${local.suffix}"
common_tags = merge(var.tags, {
Module = "sqs-worker"
Environment = var.environment
ManagedBy = "terraform"
})
}
# ─── Dead-Letter Queue ─────────────────────────────────────────────────────────
resource "aws_sqs_queue" "dlq" {
name = local.dlq_name
fifo_queue = var.fifo
content_based_deduplication = var.fifo
message_retention_seconds = var.dlq_retention_seconds
# Encrypt at rest
sqs_managed_sse_enabled = true
tags = local.common_tags
}
# ─── Main Queue ────────────────────────────────────────────────────────────────
resource "aws_sqs_queue" "main" {
name = local.full_name
fifo_queue = var.fifo
content_based_deduplication = var.fifo
visibility_timeout_seconds = var.visibility_timeout_seconds
message_retention_seconds = var.message_retention_seconds
receive_wait_time_seconds = 20 # Long polling (always enable)
max_message_size = 262144 # 256 KB
sqs_managed_sse_enabled = true
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = var.max_receive_count
})
tags = local.common_tags
}
# ─── Queue Policy (allow SNS publish if needed) ────────────────────────────────
resource "aws_sqs_queue_policy" "main" {
queue_url = aws_sqs_queue.main.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = { Service = "sns.amazonaws.com" }
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.main.arn
Condition = {
ArnLike = {
"aws:SourceArn" = "arn:aws:sns:*:${data.aws_caller_identity.current.account_id}:*"
}
}
}
]
})
}
# ─── CloudWatch Alarms ─────────────────────────────────────────────────────────
resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
count = var.sns_alert_arn != "" ? 1 : 0
alarm_name = "${local.full_name}-dlq-messages"
alarm_description = "Messages in DLQ — investigate failures"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 60
statistic = "Sum"
threshold = 0
treat_missing_data = "notBreaching"
dimensions = {
QueueName = aws_sqs_queue.dlq.name
}
alarm_actions = [var.sns_alert_arn]
tags = local.common_tags
}
resource "aws_cloudwatch_metric_alarm" "queue_depth_high" {
count = var.sns_alert_arn != "" ? 1 : 0
alarm_name = "${local.full_name}-queue-depth"
alarm_description = "Queue backlog growing — workers may be overloaded"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 3
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 300
statistic = "Average"
threshold = 1000
treat_missing_data = "notBreaching"
dimensions = {
QueueName = aws_sqs_queue.main.name
}
alarm_actions = [var.sns_alert_arn]
tags = local.common_tags
}
data "aws_caller_identity" "current" {}
Outputs
# modules/sqs-worker/outputs.tf
output "queue_url" {
value = aws_sqs_queue.main.id
}
output "queue_arn" {
value = aws_sqs_queue.main.arn
}
output "dlq_url" {
value = aws_sqs_queue.dlq.id
}
output "dlq_arn" {
value = aws_sqs_queue.dlq.arn
}
Usage
module "email_queue" {
source = "../../modules/sqs-worker"
name = "email-send"
environment = "prod"
visibility_timeout_seconds = 60 # Emails should process in <1 min
max_receive_count = 5 # Retry 5x before DLQ
sns_alert_arn = aws_sns_topic.alerts.arn
tags = { Project = "myapp" }
}
module "report_queue" {
source = "../../modules/sqs-worker"
name = "report-generate"
environment = "prod"
visibility_timeout_seconds = 900 # Reports can take 15 min
max_receive_count = 3
sns_alert_arn = aws_sns_topic.alerts.arn
}
☁️ Is Your Cloud Costing Too Much?
Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.
- AWS, GCP, Azure certified engineers
- Infrastructure as Code (Terraform, CDK)
- Docker, Kubernetes, GitHub Actions CI/CD
- Typical audit recovers $500–$3,000/month in savings
TypeScript Consumer Worker
// workers/sqs-consumer.ts
import {
SQSClient,
ReceiveMessageCommand,
DeleteMessageCommand,
ChangeMessageVisibilityCommand,
Message,
} from "@aws-sdk/client-sqs";
const sqs = new SQSClient({ region: process.env.AWS_REGION! });
interface WorkerOptions<T> {
queueUrl: string;
handler: (payload: T, message: Message) => Promise<void>;
batchSize?: number; // 1–10 messages per poll
visibilityTimeoutSeconds?: number;
heartbeatIntervalSeconds?: number; // Extend visibility while processing
concurrency?: number; // Parallel message processing
parseMessage?: (body: string) => T;
}
export class SQSWorker<T = unknown> {
private running = false;
private activeJobs = 0;
constructor(private options: WorkerOptions<T>) {
this.options = {
batchSize: 10,
visibilityTimeoutSeconds: 300,
heartbeatIntervalSeconds: 60,
concurrency: 5,
parseMessage: (body) => JSON.parse(body) as T,
...options,
};
}
async start(): Promise<void> {
this.running = true;
console.log(`SQS Worker started: ${this.options.queueUrl}`);
// Handle graceful shutdown
process.on("SIGTERM", () => this.stop());
process.on("SIGINT", () => this.stop());
while (this.running) {
await this.poll();
}
// Wait for active jobs to finish
while (this.activeJobs > 0) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
console.log("SQS Worker stopped cleanly");
}
async stop(): Promise<void> {
console.log("Stopping SQS Worker...");
this.running = false;
}
private async poll(): Promise<void> {
// Wait if at concurrency limit
while (this.activeJobs >= this.options.concurrency!) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
let messages: Message[] = [];
try {
const response = await sqs.send(
new ReceiveMessageCommand({
QueueUrl: this.options.queueUrl,
MaxNumberOfMessages: Math.min(
this.options.batchSize!,
this.options.concurrency! - this.activeJobs
),
WaitTimeSeconds: 20, // Long polling
MessageAttributeNames: ["All"],
AttributeNames: ["ApproximateReceiveCount"],
})
);
messages = response.Messages ?? [];
} catch (err) {
console.error("SQS receive error:", err);
await new Promise((resolve) => setTimeout(resolve, 5000));
return;
}
// Process messages concurrently
await Promise.all(
messages.map((message) => this.processMessage(message))
);
}
private async processMessage(message: Message): Promise<void> {
this.activeJobs++;
const receiveCount = parseInt(
message.Attributes?.ApproximateReceiveCount ?? "1",
10
);
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
try {
// Extend visibility timeout while processing (heartbeat)
heartbeatTimer = setInterval(async () => {
try {
await sqs.send(
new ChangeMessageVisibilityCommand({
QueueUrl: this.options.queueUrl,
ReceiptHandle: message.ReceiptHandle!,
VisibilityTimeout: this.options.visibilityTimeoutSeconds!,
})
);
} catch (err) {
console.warn("Failed to extend visibility timeout:", err);
}
}, this.options.heartbeatIntervalSeconds! * 1000);
const payload = this.options.parseMessage!(message.Body!);
await this.options.handler(payload, message);
// Success — delete the message
await sqs.send(
new DeleteMessageCommand({
QueueUrl: this.options.queueUrl,
ReceiptHandle: message.ReceiptHandle!,
})
);
} catch (err) {
console.error(
`Message processing failed (attempt ${receiveCount}):`,
err,
{ messageId: message.MessageId }
);
// Don't delete — let SQS visibility timeout expire so it retries
// After maxReceiveCount attempts, SQS moves it to DLQ automatically
} finally {
if (heartbeatTimer) clearInterval(heartbeatTimer);
this.activeJobs--;
}
}
}
Message Types and Handlers
// workers/handlers/email.handler.ts
import { SQSWorker } from "@/workers/sqs-consumer";
import { sendEmail } from "@/lib/email";
import { Message } from "@aws-sdk/client-sqs";
interface EmailJobPayload {
jobId: string;
to: string;
template: string;
variables: Record<string, string>;
userId?: string;
}
export async function handleEmailJob(
payload: EmailJobPayload,
message: Message
): Promise<void> {
const receiveCount = parseInt(
message.Attributes?.ApproximateReceiveCount ?? "1"
);
console.log(`Processing email job ${payload.jobId} (attempt ${receiveCount})`);
// Idempotency: check if already sent
const alreadySent = await db
.selectFrom("email_send_log")
.select(["id"])
.where("job_id", "=", payload.jobId)
.executeTakeFirst();
if (alreadySent) {
console.log(`Email job ${payload.jobId} already sent — skipping`);
return; // Return success so the message gets deleted
}
await sendEmail({
to: payload.to,
template: payload.template,
variables: payload.variables,
});
// Record successful send
await db
.insertInto("email_send_log")
.values({ job_id: payload.jobId, sent_at: new Date(), to: payload.to })
.execute();
console.log(`Email sent for job ${payload.jobId}`);
}
// Start the worker
const worker = new SQSWorker<EmailJobPayload>({
queueUrl: process.env.EMAIL_QUEUE_URL!,
handler: handleEmailJob,
batchSize: 10,
concurrency: 10, // 10 parallel email sends
visibilityTimeoutSeconds: 60,
heartbeatIntervalSeconds: 30,
});
worker.start().catch(console.error);
⚙️ DevOps Done Right — Zero Downtime, Full Automation
Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.
- Staging + production environments with feature flags
- Automated security scanning in the pipeline
- Uptime monitoring + alerting + runbook automation
- On-call support handover docs included
Sending Messages (Producer)
// lib/queues/producers.ts
import { SQSClient, SendMessageCommand, SendMessageBatchCommand } from "@aws-sdk/client-sqs";
import { randomUUID } from "crypto";
const sqs = new SQSClient({ region: process.env.AWS_REGION! });
export async function enqueueEmail(payload: {
to: string;
template: string;
variables: Record<string, string>;
userId?: string;
}): Promise<string> {
const jobId = randomUUID();
await sqs.send(
new SendMessageCommand({
QueueUrl: process.env.EMAIL_QUEUE_URL!,
MessageBody: JSON.stringify({ jobId, ...payload }),
MessageGroupId: payload.userId ?? "default", // FIFO: group by user
MessageDeduplicationId: jobId, // FIFO: prevent duplicates
MessageAttributes: {
template: {
DataType: "String",
StringValue: payload.template,
},
},
})
);
return jobId;
}
// Batch sending (up to 10 messages per API call)
export async function enqueueBatchEmails(
payloads: Array<{ to: string; template: string; variables: Record<string, string> }>
): Promise<void> {
// SQS batch limit: 10 messages
const chunks = [];
for (let i = 0; i < payloads.length; i += 10) {
chunks.push(payloads.slice(i, i + 10));
}
for (const chunk of chunks) {
const response = await sqs.send(
new SendMessageBatchCommand({
QueueUrl: process.env.EMAIL_QUEUE_URL!,
Entries: chunk.map((payload, i) => ({
Id: String(i),
MessageBody: JSON.stringify({ jobId: randomUUID(), ...payload }),
})),
})
);
if (response.Failed && response.Failed.length > 0) {
console.error("Failed to enqueue some messages:", response.Failed);
throw new Error(`${response.Failed.length} messages failed to enqueue`);
}
}
}
DLQ Processor: Investigate and Reprocess
// workers/dlq-processor.ts
import {
SQSClient,
ReceiveMessageCommand,
DeleteMessageCommand,
SendMessageCommand,
} from "@aws-sdk/client-sqs";
const sqs = new SQSClient({ region: process.env.AWS_REGION! });
/**
* Inspect DLQ messages and optionally replay them to the main queue.
* Run this manually or on a schedule after investigating failures.
*/
export async function processDLQ(
dlqUrl: string,
mainQueueUrl: string,
options: {
limit?: number;
filter?: (payload: unknown) => boolean;
dryRun?: boolean;
} = {}
) {
const { limit = 100, filter, dryRun = false } = options;
let processed = 0;
let replayed = 0;
let discarded = 0;
while (processed < limit) {
const response = await sqs.send(
new ReceiveMessageCommand({
QueueUrl: dlqUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 5,
AttributeNames: ["All"],
})
);
const messages = response.Messages ?? [];
if (messages.length === 0) break;
for (const message of messages) {
processed++;
let payload: unknown;
try {
payload = JSON.parse(message.Body!);
} catch {
payload = message.Body;
}
console.log(`DLQ message ${message.MessageId}:`, payload);
const shouldReplay = !filter || filter(payload);
if (!dryRun) {
if (shouldReplay) {
// Send back to main queue for retry
await sqs.send(
new SendMessageCommand({
QueueUrl: mainQueueUrl,
MessageBody: message.Body!,
})
);
replayed++;
} else {
discarded++;
}
// Remove from DLQ
await sqs.send(
new DeleteMessageCommand({
QueueUrl: dlqUrl,
ReceiptHandle: message.ReceiptHandle!,
})
);
}
}
}
console.log(`DLQ processing complete: ${processed} inspected, ${replayed} replayed, ${discarded} discarded`);
return { processed, replayed, discarded };
}
Lambda Integration (Serverless Consumer)
For event-driven processing without long-running workers:
// lambda/sqs-handler.ts
import type { SQSEvent, SQSBatchResponse, SQSBatchItemFailure } from "aws-lambda";
export async function handler(event: SQSEvent): Promise<SQSBatchResponse> {
const itemFailures: SQSBatchItemFailure[] = [];
await Promise.allSettled(
event.Records.map(async (record) => {
try {
const payload = JSON.parse(record.body);
await processMessage(payload);
} catch (err) {
console.error(`Failed to process ${record.messageId}:`, err);
// Report as failure — SQS will retry only this message
itemFailures.push({ itemIdentifier: record.messageId });
}
})
);
// Return partial batch failure report
// SQS retries only the failed messages, not the whole batch
return { batchItemFailures: itemFailures };
}
Terraform Lambda event source mapping:
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = module.email_queue.queue_arn
function_name = aws_lambda_function.worker.arn
batch_size = 10
maximum_batching_window_in_seconds = 5 # Wait up to 5s to fill a batch
function_response_types = ["ReportBatchItemFailures"]
scaling_config {
maximum_concurrency = 50 # Limit Lambda concurrency to protect DB
}
}
Cost and Timeline Estimates
| Component | Timeline | Cost (USD) |
|---|---|---|
| SQS Terraform + DLQ + alarms | 0.5–1 day | $400–$800 |
| TypeScript worker with heartbeat | 1–2 days | $800–$1,600 |
| Handler + idempotency + DLQ processor | 1–2 days | $800–$1,600 |
| Lambda integration + batch failure | 0.5–1 day | $400–$800 |
| Full async processing system | 1 week | $3,500–$5,500 |
AWS SQS pricing (2026, us-east-1): First 1M requests/month free; $0.40/million after. Standard queue at 10M messages/month ≈ $3.60/month. FIFO queue at 10M messages/month ≈ $5.00/month. Negligible at most scales.
See Also
- AWS EventBridge — Event routing and fan-out patterns
- AWS Lambda Layers — Packaging shared worker dependencies
- AWS CloudWatch Observability — Monitoring queue depth and DLQ alerts
- Stripe Webhook Handling — Idempotent processing patterns
Working With Viprasol
We build async processing infrastructure on AWS for SaaS products handling everything from transactional emails to large-scale data pipelines. Our cloud team has designed SQS worker systems processing millions of messages per day with sub-1% failure rates.
What we deliver:
- SQS queue configuration with proper visibility timeout and DLQ setup
- TypeScript consumer workers with graceful shutdown and heartbeating
- Idempotent message handler patterns
- DLQ monitoring and reprocessing tooling
- Lambda integration for serverless consumers
See our cloud infrastructure services or contact us to discuss your async processing requirements.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need DevOps & Cloud Expertise?
Scale your infrastructure with confidence. AWS, GCP, Azure certified team.
Free consultation • No commitment • Response within 24 hours
Making sense of your data at scale?
Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.