Back to Blog

AWS SQS and SNS Patterns in 2026: Fan-Out, FIFO Queues, and Message Filtering

Production AWS SQS and SNS patterns: fan-out architecture, FIFO queues with deduplication, message filtering, Lambda trigger configuration, DLQ setup, and Terraform configuration.

Viprasol Tech Team
February 13, 2027
13 min read

AWS SQS and SNS Patterns in 2026: Fan-Out, FIFO Queues, and Message Filtering

SQS and SNS are the backbone of async messaging on AWS. SQS decouples producers from consumers and handles retries, backpressure, and dead-letter queues. SNS broadcasts a single event to multiple subscribers — the fan-out pattern. Together they enable event-driven architectures that scale independently.

This post covers the patterns teams actually need: SNS fan-out to multiple SQS queues, FIFO queues for ordered processing, subscription filter policies to route messages, Lambda trigger configuration, DLQ setup, and Terraform for all of it.


SQS Alone: Decoupling a Single Consumer

// lib/aws/sqs-client.ts
import {
  SQSClient,
  SendMessageCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
  ChangeMessageVisibilityCommand,
} from "@aws-sdk/client-sqs";

const sqs = new SQSClient({ region: process.env.AWS_REGION });
const QUEUE_URL = process.env.SQS_QUEUE_URL!;

// Producer: send a message
export async function enqueueJob(payload: object, deduplicationId?: string) {
  await sqs.send(new SendMessageCommand({
    QueueUrl: QUEUE_URL,
    MessageBody: JSON.stringify(payload),
    // For FIFO queues:
    MessageGroupId: "default",            // Messages in same group are ordered
    MessageDeduplicationId: deduplicationId, // Prevents duplicates within 5 min
    // MessageAttributes for metadata without touching body:
    MessageAttributes: {
      eventType: { DataType: "String", StringValue: "order.created" },
      version: { DataType: "Number", StringValue: "1" },
    },
  }));
}

// Consumer: poll and process
export async function processMessages(handler: (body: unknown) => Promise<void>) {
  while (true) {
    const result = await sqs.send(new ReceiveMessageCommand({
      QueueUrl: QUEUE_URL,
      MaxNumberOfMessages: 10,          // Batch up to 10
      WaitTimeSeconds: 20,              // Long polling — reduces empty receives
      VisibilityTimeout: 30,            // 30s to process before message reappears
      AttributeNames: ["ApproximateReceiveCount"],
    }));

    if (!result.Messages?.length) continue;

    await Promise.allSettled(
      result.Messages.map(async (message) => {
        const body = JSON.parse(message.Body!);
        const receiveCount = parseInt(
          message.Attributes?.ApproximateReceiveCount ?? "1", 10
        );

        try {
          await handler(body);
          // Delete on success
          await sqs.send(new DeleteMessageCommand({
            QueueUrl: QUEUE_URL,
            ReceiptHandle: message.ReceiptHandle!,
          }));
        } catch (err) {
          console.error(`Failed (attempt ${receiveCount}):`, err);
          // Don't delete — message will reappear after VisibilityTimeout
          // After maxReceiveCount, SQS moves it to DLQ automatically
        }
      })
    );
  }
}

SNS → SQS Fan-Out Pattern

One SNS topic, multiple SQS queues. Each queue serves a different consumer:

# terraform/messaging.tf

# SNS Topic (the event bus)
resource "aws_sns_topic" "orders" {
  name = "${var.name}-${var.environment}-orders"

  # For FIFO:
  # name              = "${var.name}-${var.environment}-orders.fifo"
  # fifo_topic        = true
  # content_based_deduplication = true

  tags = var.common_tags
}

# SQS Queue: fulfillment service
resource "aws_sqs_queue" "fulfillment" {
  name                      = "${var.name}-${var.environment}-fulfillment"
  visibility_timeout_seconds = 60
  message_retention_seconds  = 86400   # 1 day
  receive_wait_time_seconds  = 20      # Long polling

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.fulfillment_dlq.arn
    maxReceiveCount     = 3            # Move to DLQ after 3 failures
  })

  tags = var.common_tags
}

resource "aws_sqs_queue" "fulfillment_dlq" {
  name                      = "${var.name}-${var.environment}-fulfillment-dlq"
  message_retention_seconds = 1209600  # 14 days

  tags = var.common_tags
}

# SQS Queue: notification service
resource "aws_sqs_queue" "notifications" {
  name                       = "${var.name}-${var.environment}-notifications"
  visibility_timeout_seconds = 30
  message_retention_seconds  = 3600    # 1 hour (notifications expire quickly)
  receive_wait_time_seconds  = 20

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.notifications_dlq.arn
    maxReceiveCount     = 2
  })

  tags = var.common_tags
}

resource "aws_sqs_queue" "notifications_dlq" {
  name = "${var.name}-${var.environment}-notifications-dlq"
  tags = var.common_tags
}

# SQS Queue: analytics service (only certain events)
resource "aws_sqs_queue" "analytics" {
  name                       = "${var.name}-${var.environment}-analytics"
  visibility_timeout_seconds = 120     # Analytics jobs take longer
  message_retention_seconds  = 604800  # 7 days

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.analytics_dlq.arn
    maxReceiveCount     = 5
  })

  tags = var.common_tags
}

resource "aws_sqs_queue" "analytics_dlq" {
  name = "${var.name}-${var.environment}-analytics-dlq"
  tags = var.common_tags
}

# Allow SNS to publish to each SQS queue
resource "aws_sqs_queue_policy" "fulfillment" {
  queue_url = aws_sqs_queue.fulfillment.id
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect    = "Allow"
      Principal = { Service = "sns.amazonaws.com" }
      Action    = "sqs:SendMessage"
      Resource  = aws_sqs_queue.fulfillment.arn
      Condition = {
        ArnEquals = { "aws:SourceArn" = aws_sns_topic.orders.arn }
      }
    }]
  })
}

# SNS → SQS subscriptions with message filtering
resource "aws_sns_topic_subscription" "fulfillment" {
  topic_arn = aws_sns_topic.orders.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.fulfillment.arn

  # Filter policy: only receive order.created and order.updated events
  filter_policy = jsonencode({
    eventType = ["order.created", "order.updated", "order.cancelled"]
  })

  # Deliver message as raw (not wrapped in SNS envelope)
  raw_message_delivery = true
}

resource "aws_sns_topic_subscription" "notifications" {
  topic_arn = aws_sns_topic.orders.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.notifications.arn

  # Notifications need all order events
  filter_policy = jsonencode({
    eventType = ["order.created", "order.shipped", "order.delivered", "order.cancelled"]
  })

  raw_message_delivery = true
}

resource "aws_sns_topic_subscription" "analytics" {
  topic_arn = aws_sns_topic.orders.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.analytics.arn

  # Analytics gets EVERYTHING (no filter)
  raw_message_delivery = true
}

☁️ Is Your Cloud Costing Too Much?

Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.

  • AWS, GCP, Azure certified engineers
  • Infrastructure as Code (Terraform, CDK)
  • Docker, Kubernetes, GitHub Actions CI/CD
  • Typical audit recovers $500–$3,000/month in savings

SNS Message Filtering with Attributes

// lib/aws/sns-publisher.ts
import { SNSClient, PublishCommand } from "@aws-sdk/client-sns";

const sns = new SNSClient({ region: process.env.AWS_REGION });

interface OrderEvent {
  eventType: "order.created" | "order.updated" | "order.shipped" | "order.cancelled" | "order.delivered";
  orderId: string;
  customerId: string;
  total: number;
  priority?: "standard" | "express";
}

export async function publishOrderEvent(event: OrderEvent) {
  await sns.send(new PublishCommand({
    TopicArn: process.env.SNS_ORDERS_TOPIC_ARN!,
    Message: JSON.stringify(event),
    Subject: event.eventType,
    // Message attributes control filter policy routing
    MessageAttributes: {
      eventType: {
        DataType: "String",
        StringValue: event.eventType,
      },
      priority: {
        DataType: "String",
        StringValue: event.priority ?? "standard",
      },
      totalRange: {
        DataType: "Number",
        StringValue: String(event.total),
      },
    },
  }));
}

FIFO Queues for Ordered Processing

Use FIFO when order matters — e.g., account state machines, inventory adjustments:

// lib/aws/fifo-queue.ts
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
import { createHash } from "crypto";

const sqs = new SQSClient({ region: process.env.AWS_REGION });

export async function enqueueOrderedEvent({
  queueUrl,
  groupId,          // All messages with same groupId are processed in order
  payload,
  deduplicationId,  // Prevents duplicates within 5-minute window
}: {
  queueUrl: string;
  groupId: string;
  payload: object;
  deduplicationId?: string;
}) {
  const body = JSON.stringify(payload);
  const dedupId = deduplicationId ??
    createHash("sha256").update(body).digest("hex"); // Content-based dedup

  await sqs.send(new SendMessageCommand({
    QueueUrl: queueUrl,          // Must end in .fifo
    MessageBody: body,
    MessageGroupId: groupId,     // e.g., customerId, accountId, orderId
    MessageDeduplicationId: dedupId,
  }));
}

// Example: process subscription state machine in order per customer
await enqueueOrderedEvent({
  queueUrl: process.env.SUBSCRIPTION_FIFO_QUEUE_URL!,
  groupId: `customer-${customerId}`,   // One group per customer
  payload: { event: "subscription.upgraded", customerId, newPlan: "pro" },
  deduplicationId: `upgrade-${subscriptionId}-${Date.now()}`,
});

⚙️ DevOps Done Right — Zero Downtime, Full Automation

Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.

  • Staging + production environments with feature flags
  • Automated security scanning in the pipeline
  • Uptime monitoring + alerting + runbook automation
  • On-call support handover docs included

Lambda Trigger with Batch Processing

# Lambda triggered by SQS (fulfillment queue)
resource "aws_lambda_event_source_mapping" "fulfillment" {
  event_source_arn = aws_sqs_queue.fulfillment.arn
  function_name    = aws_lambda_function.fulfillment.arn

  batch_size                         = 10    # Process up to 10 messages at once
  maximum_batching_window_in_seconds = 5     # Wait up to 5s to fill batch

  # Partial batch failure reporting — only delete successfully processed messages
  function_response_types = ["ReportBatchItemFailures"]

  # Concurrency: max 5 concurrent Lambda invocations for this queue
  scaling_config {
    maximum_concurrency = 5
  }
}
// handlers/fulfillment.ts — Lambda handler with partial batch failure
import type { SQSEvent, SQSBatchResponse } from "aws-lambda";

export async function handler(event: SQSEvent): Promise<SQSBatchResponse> {
  const failures: string[] = [];

  await Promise.allSettled(
    event.Records.map(async (record) => {
      try {
        const body = JSON.parse(record.body);
        await processFulfillmentEvent(body);
      } catch (err) {
        console.error(`Failed to process message ${record.messageId}:`, err);
        // Report this message as failed — SQS will NOT delete it
        // (after maxReceiveCount, it goes to DLQ)
        failures.push(record.messageId);
      }
    })
  );

  return {
    batchItemFailures: failures.map((id) => ({ itemIdentifier: id })),
  };
}

async function processFulfillmentEvent(body: {
  eventType: string;
  orderId: string;
}) {
  switch (body.eventType) {
    case "order.created":
      await reserveInventory(body.orderId);
      await scheduleShipment(body.orderId);
      break;
    case "order.cancelled":
      await releaseInventory(body.orderId);
      break;
  }
}

DLQ Alarm + Reprocessing

# Alert when DLQ has messages
resource "aws_cloudwatch_metric_alarm" "fulfillment_dlq" {
  alarm_name          = "${var.name}-fulfillment-dlq-depth"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 60
  statistic           = "Sum"
  threshold           = 0       # Alert on ANY DLQ message
  alarm_description   = "Fulfillment DLQ has failed messages"
  alarm_actions       = [aws_sns_topic.alerts.arn]

  dimensions = {
    QueueName = aws_sqs_queue.fulfillment_dlq.name
  }
}
// scripts/redrive-dlq.ts — Reprocess DLQ messages after fixing the bug
import {
  SQSClient,
  StartMessageMoveTaskCommand,
  ListMessageMoveTasksCommand,
} from "@aws-sdk/client-sqs";

const sqs = new SQSClient({ region: process.env.AWS_REGION! });

async function redriveFromDLQ(dlqArn: string, targetQueueArn: string) {
  // Start redrive task (built-in SQS feature — no need to write code)
  const task = await sqs.send(new StartMessageMoveTaskCommand({
    SourceArn: dlqArn,
    DestinationArn: targetQueueArn,
    MaxNumberOfMessagesPerSecond: 100, // Throttle to avoid overwhelming consumer
  }));

  console.log("Redrive task started:", task.TaskHandle);

  // Monitor progress
  while (true) {
    const tasks = await sqs.send(new ListMessageMoveTasksCommand({
      SourceArn: dlqArn,
    }));
    const current = tasks.Results?.find((t) => t.TaskHandle === task.TaskHandle);

    if (!current) break;
    console.log(`Moved: ${current.ApproximateNumberOfMessagesMoved} / ${current.ApproximateNumberOfMessagesToMove}`);

    if (current.Status === "COMPLETED" || current.Status === "FAILED") break;
    await new Promise((r) => setTimeout(r, 5000));
  }
}

Cost Comparison

ScenarioMonthly Cost (est.)
1M SQS messages (standard)$0.40
1M SQS messages (FIFO)$0.50
1M SNS publishes$0.50
1M SNS → 3 SQS fan-out$0.50 SNS + $1.20 SQS = $1.70
Lambda: 1M invocations, 128MB, 1s$0.20
CloudWatch alarms (DLQ)$0.10 per alarm

SQS/SNS is extremely cheap — messaging is rarely the cost driver. Lambda execution and DLQ alarm management are more relevant cost considerations.


See Also


Working With Viprasol

We design and implement SQS/SNS messaging architectures for event-driven SaaS products — from simple single-queue setups through multi-service fan-out with FIFO ordering and DLQ monitoring. Our cloud team has shipped messaging systems processing millions of events per day with zero message loss.

What we deliver:

  • SNS fan-out architecture design (topics, subscriptions, filter policies)
  • FIFO queue setup for ordered processing requirements
  • Lambda event source mapping with partial batch failure
  • DLQ CloudWatch alarms and one-click redrive automation
  • Terraform module for the full messaging stack

See our cloud infrastructure services or contact us to design your messaging architecture.

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need DevOps & Cloud Expertise?

Scale your infrastructure with confidence. AWS, GCP, Azure certified team.

Free consultation • No commitment • Response within 24 hours

Viprasol · Big Data & Analytics

Making sense of your data at scale?

Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.