Back to Blog

AWS SQS Message Processing: Consumer Workers, Visibility Timeout, DLQ, and Idempotency

Build reliable SQS message processing workers in Node.js. Covers long polling consumer loops, visibility timeout management, dead letter queues after max retries, idempotency keys to prevent duplicate processing, batch deletion, and Terraform setup.

Viprasol Tech Team
May 23, 2027
13 min read

SQS decouples work from the request/response cycle — heavy operations (sending emails, processing images, running reports) move off the hot path into reliable queues. The challenge isn't publishing to SQS (one SDK call) — it's building the consumer correctly: handling visibility timeout, not double-processing messages, routing failures to a DLQ, and making processors idempotent so retries don't cause side effects.

Terraform: SQS Queue with DLQ

# terraform/sqs.tf

# Dead Letter Queue: receives messages after maxReceiveCount failures
resource "aws_sqs_queue" "email_dlq" {
  name                      = "${var.app_name}-email-dlq"
  message_retention_seconds = 1209600  # 14 days (maximum)
  tags                      = local.tags
}

# Main queue with DLQ redrive policy
resource "aws_sqs_queue" "email" {
  name                       = "${var.app_name}-email"
  visibility_timeout_seconds = 300     # 5 minutes: must be > max processing time
  message_retention_seconds  = 86400   # 1 day
  receive_wait_time_seconds  = 20      # Long polling (saves cost, reduces empty responses)
  delay_seconds              = 0       # No initial delay

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.email_dlq.arn
    maxReceiveCount     = 3   # Move to DLQ after 3 failed attempts
  })

  tags = local.tags
}

# Allow ECS task role to send/receive/delete from this queue
resource "aws_sqs_queue_policy" "email" {
  queue_url = aws_sqs_queue.email.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect    = "Allow"
      Principal = { AWS = var.ecs_task_role_arn }
      Action    = [
        "sqs:SendMessage",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:ChangeMessageVisibility",
      ]
      Resource  = aws_sqs_queue.email.arn
    }]
  })
}

# CloudWatch alarm: alert when DLQ has messages
resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
  alarm_name          = "${var.app_name}-email-dlq-not-empty"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 60
  statistic           = "Sum"
  threshold           = 0
  alarm_actions       = [var.sns_alert_topic_arn]

  dimensions = {
    QueueName = aws_sqs_queue.email_dlq.name
  }
}

SQS Client and Message Types

// lib/queue/sqs-client.ts
import { SQSClient } from "@aws-sdk/client-sqs";

export const sqs = new SQSClient({ region: process.env.AWS_REGION });

// Typed message envelope
export interface QueueMessage<T = unknown> {
  type:         string;   // e.g. "send_invoice_email"
  payload:      T;
  idempotencyKey: string; // Unique per logical operation — prevent double processing
  traceId?:     string;
  createdAt:    string;
}

☁️ Is Your Cloud Costing Too Much?

Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.

  • AWS, GCP, Azure certified engineers
  • Infrastructure as Code (Terraform, CDK)
  • Docker, Kubernetes, GitHub Actions CI/CD
  • Typical audit recovers $500–$3,000/month in savings

Publishing to SQS

// lib/queue/producer.ts
import { SendMessageCommand } from "@aws-sdk/client-sqs";
import { sqs, type QueueMessage } from "./sqs-client";
import { nanoid } from "nanoid";

export async function enqueue<T>(
  queueUrl: string,
  type:     string,
  payload:  T,
  options?: {
    idempotencyKey?: string;
    delaySeconds?:   number;   // 0–900
    groupId?:        string;   // FIFO queues only
  }
): Promise<string> {
  const message: QueueMessage<T> = {
    type,
    payload,
    idempotencyKey: options?.idempotencyKey ?? nanoid(),
    createdAt:      new Date().toISOString(),
  };

  const result = await sqs.send(
    new SendMessageCommand({
      QueueUrl:               queueUrl,
      MessageBody:            JSON.stringify(message),
      DelaySeconds:           options?.delaySeconds ?? 0,
      MessageGroupId:         options?.groupId,  // FIFO only
      // MessageDeduplicationId: options?.idempotencyKey,  // FIFO only
    })
  );

  return result.MessageId!;
}

// Usage
await enqueue(
  process.env.SQS_EMAIL_QUEUE_URL!,
  "send_invoice_email",
  { invoiceId: "inv_123", recipientEmail: "client@example.com" },
  { idempotencyKey: `invoice:inv_123:send_email` }
);

Consumer Worker

// workers/email-consumer.ts
import {
  ReceiveMessageCommand,
  DeleteMessageCommand,
  DeleteMessageBatchCommand,
  ChangeMessageVisibilityCommand,
  type Message,
} from "@aws-sdk/client-sqs";
import { sqs, type QueueMessage } from "@/lib/queue/sqs-client";
import { prisma } from "@/lib/prisma";
import { processEmailMessage } from "./handlers/email";

const QUEUE_URL       = process.env.SQS_EMAIL_QUEUE_URL!;
const MAX_MESSAGES    = 10;      // SQS max batch size
const VISIBILITY_TIMEOUT = 300; // Must match queue setting

let isShuttingDown = false;

// Graceful shutdown
process.on("SIGTERM", () => {
  console.log("[worker] Received SIGTERM — finishing current batch then shutting down");
  isShuttingDown = true;
});

export async function startEmailWorker(): Promise<void> {
  console.log("[worker] Email worker started");

  while (!isShuttingDown) {
    await processBatch();
  }

  console.log("[worker] Email worker shut down cleanly");
}

async function processBatch(): Promise<void> {
  // Long poll: waits up to 20s for messages (reduces empty receives and cost)
  const response = await sqs.send(
    new ReceiveMessageCommand({
      QueueUrl:            QUEUE_URL,
      MaxNumberOfMessages: MAX_MESSAGES,
      WaitTimeSeconds:     20,        // Long polling
      AttributeNames:      ["ApproximateReceiveCount"],
      MessageAttributeNames: ["All"],
    })
  );

  const messages = response.Messages ?? [];
  if (messages.length === 0) return;

  console.log(`[worker] Processing ${messages.length} messages`);

  // Process messages concurrently (but not more than batch size)
  const results = await Promise.allSettled(
    messages.map((msg) => processMessage(msg))
  );

  // Batch delete successfully processed messages
  const toDelete = messages.filter((_, i) => results[i].status === "fulfilled");
  if (toDelete.length > 0) {
    await sqs.send(
      new DeleteMessageBatchCommand({
        QueueUrl: QUEUE_URL,
        Entries:  toDelete.map((msg) => ({
          Id:            msg.MessageId!,
          ReceiptHandle: msg.ReceiptHandle!,
        })),
      })
    );
  }

  // Log failures (message returns to queue after visibility timeout)
  results.forEach((result, i) => {
    if (result.status === "rejected") {
      console.error(`[worker] Message ${messages[i].MessageId} failed:`, result.reason);
    }
  });
}

async function processMessage(sqsMessage: Message): Promise<void> {
  const body    = JSON.parse(sqsMessage.Body!) as QueueMessage;
  const receiveCount = parseInt(
    sqsMessage.Attributes?.ApproximateReceiveCount ?? "1"
  );

  // Check idempotency: has this message been processed before?
  const alreadyProcessed = await prisma.processedMessage.findUnique({
    where: { idempotencyKey: body.idempotencyKey },
  });

  if (alreadyProcessed) {
    console.log(`[worker] Skipping duplicate: ${body.idempotencyKey}`);
    return; // Delete from queue (it's fine)
  }

  // Extend visibility timeout for long-running jobs
  // If processing takes >2 minutes, extend to avoid re-delivery
  const heartbeat = setInterval(async () => {
    await sqs.send(
      new ChangeMessageVisibilityCommand({
        QueueUrl:          QUEUE_URL,
        ReceiptHandle:     sqsMessage.ReceiptHandle!,
        VisibilityTimeout: VISIBILITY_TIMEOUT,  // Reset the clock
      })
    );
  }, 60_000); // Extend every 60 seconds

  try {
    // Route to the appropriate handler
    switch (body.type) {
      case "send_invoice_email":
        await processEmailMessage(body.payload);
        break;
      default:
        throw new Error(`Unknown message type: ${body.type}`);
    }

    // Mark as processed (idempotency record)
    await prisma.processedMessage.create({
      data: {
        idempotencyKey: body.idempotencyKey,
        type:           body.type,
        processedAt:    new Date(),
      },
    });
  } catch (err) {
    console.error(
      `[worker] Processing failed (attempt ${receiveCount}):`,
      body.type, err
    );

    // If this is the last attempt, log to DB before DLQ
    if (receiveCount >= 3) {
      await prisma.failedMessage.create({
        data: {
          idempotencyKey: body.idempotencyKey,
          type:           body.type,
          payload:        body.payload as object,
          error:          (err as Error).message,
          receiveCount,
        },
      });
    }

    throw err; // Re-throw: message stays in queue for retry
  } finally {
    clearInterval(heartbeat);
  }
}

⚙️ DevOps Done Right — Zero Downtime, Full Automation

Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.

  • Staging + production environments with feature flags
  • Automated security scanning in the pipeline
  • Uptime monitoring + alerting + runbook automation
  • On-call support handover docs included

Idempotency Schema

-- Prevent double-processing after retries or duplicate messages
CREATE TABLE processed_messages (
  idempotency_key TEXT PRIMARY KEY,
  type            TEXT NOT NULL,
  processed_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Expire old idempotency records after 7 days (cron job or pg_partman)
-- Retention only needs to exceed SQS message retention (1 day)
CREATE INDEX idx_processed_messages_time ON processed_messages(processed_at);

CREATE TABLE failed_messages (
  id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  idempotency_key TEXT NOT NULL,
  type            TEXT NOT NULL,
  payload         JSONB NOT NULL,
  error           TEXT NOT NULL,
  receive_count   INTEGER NOT NULL,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

DLQ Reprocessing

// scripts/reprocess-dlq.ts — manually reprocess DLQ messages after fixing the bug
import { ReceiveMessageCommand, SendMessageCommand, DeleteMessageCommand } from "@aws-sdk/client-sqs";
import { sqs } from "@/lib/queue/sqs-client";

async function reprocessDLQ() {
  const dlqUrl   = process.env.SQS_EMAIL_DLQ_URL!;
  const mainUrl  = process.env.SQS_EMAIL_QUEUE_URL!;
  let processed  = 0;

  while (true) {
    const res = await sqs.send(
      new ReceiveMessageCommand({ QueueUrl: dlqUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 5 })
    );

    const messages = res.Messages ?? [];
    if (messages.length === 0) break;

    for (const msg of messages) {
      // Re-send to main queue
      await sqs.send(
        new SendMessageCommand({ QueueUrl: mainUrl, MessageBody: msg.Body! })
      );
      // Delete from DLQ
      await sqs.send(
        new DeleteMessageCommand({ QueueUrl: dlqUrl, ReceiptHandle: msg.ReceiptHandle! })
      );
      processed++;
    }
  }

  console.log(`Reprocessed ${processed} DLQ messages`);
}

reprocessDLQ().catch(console.error);

Cost Estimates

ComponentCost
SQS standard queue$0.40 per 1M requests
SQS FIFO queue$0.50 per 1M requests
Long polling (20s)Reduces empty receives ~95%
Data transfer (within region)Free
CloudWatch alarm~$0.10/alarm/month

See Also


Working With Viprasol

The difference between a brittle queue consumer and a production-grade one is visibility timeout management, idempotency, and DLQ alerting. Our team builds SQS workers with long-polling batch receive, concurrent message processing with Promise.allSettled, heartbeat visibility extension for long jobs, idempotency keys checked against PostgreSQL before processing, and DLQ CloudWatch alarms that page on first failure.

What we deliver:

  • SQS Terraform: main queue (5m visibility, 20s long poll, DLQ after 3 attempts) + DLQ (14d retention) + CloudWatch alarm
  • enqueue() helper: typed QueueMessage envelope with idempotency key and traceId
  • startEmailWorker: long-poll loop, concurrent Promise.allSettled, batch delete on success
  • processMessage: idempotency check, heartbeat setInterval 60s, DLQ pre-record on last attempt
  • processed_messages + failed_messages schema
  • reprocess-dlq.ts script: receive DLQ → send to main → delete from DLQ

Talk to our team about your async job processing architecture →

Or explore our cloud infrastructure services.

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need DevOps & Cloud Expertise?

Scale your infrastructure with confidence. AWS, GCP, Azure certified team.

Free consultation • No commitment • Response within 24 hours

Viprasol · Big Data & Analytics

Making sense of your data at scale?

Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.