Back to Blog

AWS OpenSearch Analytics: Index Mappings, Aggregations, and Dashboards

Build analytics pipelines with AWS OpenSearch Service. Covers cluster setup with Terraform, index mapping design, aggregation queries for metrics, real-time log ingestion, and OpenSearch Dashboards.

Viprasol Tech Team
March 14, 2027
13 min read

AWS OpenSearch Service (the managed Elasticsearch fork) handles use cases where PostgreSQL full-text search runs out of runway: datasets above 10M documents, faceted search with many filter combinations, log analytics at scale, and real-time dashboards that aggregate across millions of events. It's the right tool when you need sub-second aggregations over hundreds of millions of records.

This guide covers cluster provisioning with Terraform, index mapping design, the aggregation queries that power analytics dashboards, and log ingestion patterns.

When OpenSearch vs PostgreSQL

FactorPostgreSQL FTSOpenSearch
Row countUp to ~10M efficiently100M+ no problem
Faceted searchSlow beyond 3–4 facetsFast with term aggregations
Log/event analyticsDifficult (no time-series opt)Native with date histograms
Relevance rankingBasic (ts_rank)Full BM25 + learning-to-rank
InfrastructureAlready have itNew cluster to manage
CostIncluded in DB cost$50–500+/month additional
Full-text + relational joinsExcellentPoor (denormalize data)

Use PostgreSQL first. Add OpenSearch when you hit its limits.

Terraform: OpenSearch Cluster

# opensearch/main.tf

locals {
  domain_name = "${var.project}-opensearch"
}

data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

# Dedicated master nodes for production stability
resource "aws_opensearch_domain" "main" {
  domain_name    = local.domain_name
  engine_version = "OpenSearch_2.15"

  cluster_config {
    # Data nodes
    instance_count = var.environment == "production" ? 3 : 1
    instance_type  = var.environment == "production" ? "r6g.large.search" : "t3.small.search"

    # Dedicated masters (production only — prevents split brain)
    dedicated_master_enabled = var.environment == "production"
    dedicated_master_count   = 3
    dedicated_master_type    = "m6g.large.search"

    # Multi-AZ
    zone_awareness_enabled = var.environment == "production"
    zone_awareness_config {
      availability_zone_count = 3
    }
  }

  ebs_options {
    ebs_enabled = true
    volume_type = "gp3"
    volume_size = var.environment == "production" ? 100 : 20  # GB per node
    throughput  = 250   # MB/s (gp3)
    iops        = 3000
  }

  # Encryption at rest
  encrypt_at_rest {
    enabled    = true
    kms_key_id = aws_kms_key.opensearch.arn
  }

  # Encryption in transit
  node_to_node_encryption {
    enabled = true
  }

  # HTTPS only
  domain_endpoint_options {
    enforce_https                   = true
    tls_security_policy             = "Policy-Min-TLS-1-2-2019-07"
    custom_endpoint_enabled         = var.environment == "production"
    custom_endpoint                 = var.environment == "production" ? "search.${var.domain}" : null
    custom_endpoint_certificate_arn = var.environment == "production" ? var.acm_cert_arn : null
  }

  # Fine-grained access control
  advanced_security_options {
    enabled                        = true
    anonymous_auth_enabled         = false
    internal_user_database_enabled = true

    master_user_options {
      master_user_name     = var.opensearch_master_user
      master_user_password = var.opensearch_master_password
    }
  }

  # VPC deployment (recommended for production)
  vpc_options {
    subnet_ids         = var.private_subnet_ids
    security_group_ids = [aws_security_group.opensearch.id]
  }

  # Auto-tune for memory optimization
  auto_tune_options {
    desired_state       = "ENABLED"
    rollback_on_disable = "NO_ROLLBACK"

    maintenance_schedule {
      start_at                       = "2027-03-15T01:00:00Z"
      cron_expression_for_recurrence = "cron(0 1 ? * SUN *)"
      duration {
        value = 2
        unit  = "HOURS"
      }
    }
  }

  # Slow log publishing
  log_publishing_options {
    log_type                 = "INDEX_SLOW_LOGS"
    cloudwatch_log_group_arn = "${aws_cloudwatch_log_group.opensearch_slow.arn}:*"
    enabled                  = true
  }

  log_publishing_options {
    log_type                 = "SEARCH_SLOW_LOGS"
    cloudwatch_log_group_arn = "${aws_cloudwatch_log_group.opensearch_slow.arn}:*"
    enabled                  = true
  }

  tags = var.common_tags
}

# Access policy
resource "aws_opensearch_domain_policy" "main" {
  domain_name = aws_opensearch_domain.main.domain_name

  access_policies = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect    = "Allow"
        Principal = { AWS = aws_iam_role.app_opensearch.arn }
        Action    = "es:*"
        Resource  = "${aws_opensearch_domain.main.arn}/*"
      }
    ]
  })
}

resource "aws_security_group" "opensearch" {
  name   = "${var.project}-opensearch-sg"
  vpc_id = var.vpc_id

  ingress {
    from_port       = 443
    to_port         = 443
    protocol        = "tcp"
    security_groups = [var.app_security_group_id]
    description     = "HTTPS from application"
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

☁️ Is Your Cloud Costing Too Much?

Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.

  • AWS, GCP, Azure certified engineers
  • Infrastructure as Code (Terraform, CDK)
  • Docker, Kubernetes, GitHub Actions CI/CD
  • Typical audit recovers $500–$3,000/month in savings

Index Mapping Design

// lib/opensearch/mappings.ts
import { Client } from "@opensearch-project/opensearch";

const client = new Client({
  node: process.env.OPENSEARCH_URL!,
  auth: {
    username: process.env.OPENSEARCH_USER!,
    password: process.env.OPENSEARCH_PASSWORD!,
  },
  ssl: { rejectUnauthorized: true },
});

// Events index — for analytics and dashboards
const eventsMapping = {
  settings: {
    number_of_shards: 3,
    number_of_replicas: 1,
    // Index Lifecycle Management: hot → warm → cold → delete
    "index.lifecycle.name": "events-policy",
    "index.lifecycle.rollover_alias": "events",
    refresh_interval: "5s",   // Trade off freshness for throughput
    "index.codec": "best_compression",
  },
  mappings: {
    dynamic: "strict" as const, // Reject unknown fields to prevent mapping explosion
    properties: {
      // Event identification
      event_id: { type: "keyword" as const },
      event_type: { type: "keyword" as const },
      timestamp: { type: "date" as const, format: "strict_date_optional_time||epoch_millis" },

      // Actor
      user_id: { type: "keyword" as const },
      workspace_id: { type: "keyword" as const },
      session_id: { type: "keyword" as const },

      // Request context
      ip_address: { type: "ip" as const },
      user_agent: { type: "keyword" as const, index: false }, // store but don't index
      country: { type: "keyword" as const },
      device_type: { type: "keyword" as const },

      // Business data
      plan: { type: "keyword" as const },
      entity_type: { type: "keyword" as const },
      entity_id: { type: "keyword" as const },

      // Metrics
      duration_ms: { type: "integer" as const },
      status_code: { type: "short" as const },
      bytes_sent: { type: "long" as const },

      // Full-text searchable fields
      description: {
        type: "text" as const,
        analyzer: "english",
        fields: {
          keyword: { type: "keyword" as const, ignore_above: 256 },
        },
      },

      // Nested properties (avoid if possible — expensive)
      metadata: {
        type: "object" as const,
        dynamic: false,  // Don't index metadata sub-fields
        enabled: false,  // Store only, no indexing
      },
    },
  },
};

// Products index — for search
const productsMapping = {
  settings: {
    number_of_shards: 2,
    number_of_replicas: 1,
    analysis: {
      analyzer: {
        product_analyzer: {
          type: "custom",
          tokenizer: "standard",
          filter: ["lowercase", "asciifolding", "product_synonyms", "autocomplete_filter"],
        },
        autocomplete_analyzer: {
          type: "custom",
          tokenizer: "standard",
          filter: ["lowercase", "asciifolding", "autocomplete_filter"],
        },
      },
      filter: {
        autocomplete_filter: {
          type: "edge_ngram",
          min_gram: 2,
          max_gram: 20,
        },
        product_synonyms: {
          type: "synonym",
          synonyms: [
            "tv, television",
            "laptop, notebook",
            "mobile, phone, smartphone",
          ],
        },
      },
    },
  },
  mappings: {
    properties: {
      product_id: { type: "keyword" as const },
      name: {
        type: "text" as const,
        analyzer: "product_analyzer",
        search_analyzer: "standard",
        fields: {
          keyword: { type: "keyword" as const },
          autocomplete: { type: "text" as const, analyzer: "autocomplete_analyzer", search_analyzer: "standard" },
        },
      },
      description: { type: "text" as const, analyzer: "english" },
      category: { type: "keyword" as const },
      tags: { type: "keyword" as const },
      price: { type: "scaled_float" as const, scaling_factor: 100 },
      stock: { type: "integer" as const },
      rating: { type: "half_float" as const },
      review_count: { type: "integer" as const },
      brand: { type: "keyword" as const },
      is_active: { type: "boolean" as const },
      created_at: { type: "date" as const },
      updated_at: { type: "date" as const },
    },
  },
};

export async function createIndices() {
  // Create index template for time-series events (ILM rollover)
  await client.indices.putIndexTemplate({
    name: "events-template",
    body: {
      index_patterns: ["events-*"],
      priority: 100,
      template: eventsMapping,
    },
  });

  // Create initial index and alias
  await client.indices.create({
    index: "events-000001",
    body: {
      aliases: { events: { is_write_index: true } },
    },
  });

  // Create products index
  await client.indices.create({
    index: "products",
    body: productsMapping,
  });
}

Ingesting Events

// lib/opensearch/ingest.ts
import { Client, errors } from "@opensearch-project/opensearch";

const client = new Client({ node: process.env.OPENSEARCH_URL! });

interface AnalyticsEvent {
  eventId: string;
  eventType: string;
  timestamp: Date;
  userId?: string;
  workspaceId?: string;
  sessionId?: string;
  ipAddress?: string;
  country?: string;
  deviceType?: string;
  entityType?: string;
  entityId?: string;
  durationMs?: number;
  statusCode?: number;
  metadata?: Record<string, unknown>;
}

// Buffer events and bulk-index for throughput
const eventBuffer: AnalyticsEvent[] = [];
let flushTimer: ReturnType<typeof setTimeout> | null = null;

export function trackEvent(event: AnalyticsEvent): void {
  eventBuffer.push(event);

  // Flush if buffer is large enough
  if (eventBuffer.length >= 100) {
    void flush();
  } else if (!flushTimer) {
    // Flush after 5 seconds even if buffer isn't full
    flushTimer = setTimeout(() => void flush(), 5000);
  }
}

async function flush(): Promise<void> {
  if (flushTimer) {
    clearTimeout(flushTimer);
    flushTimer = null;
  }

  if (eventBuffer.length === 0) return;

  const events = eventBuffer.splice(0, eventBuffer.length);

  // Build bulk request body
  const body = events.flatMap((event) => [
    { index: { _index: "events", _id: event.eventId } },
    {
      event_id: event.eventId,
      event_type: event.eventType,
      timestamp: event.timestamp.toISOString(),
      user_id: event.userId,
      workspace_id: event.workspaceId,
      session_id: event.sessionId,
      ip_address: event.ipAddress,
      country: event.country,
      device_type: event.deviceType,
      entity_type: event.entityType,
      entity_id: event.entityId,
      duration_ms: event.durationMs,
      status_code: event.statusCode,
    },
  ]);

  try {
    const response = await client.bulk({ body, index: "events" });

    if (response.body.errors) {
      const failed = response.body.items.filter(
        (item: any) => item.index?.error
      );
      console.error(`Bulk index: ${failed.length} documents failed:`, failed[0]?.index?.error);
    }
  } catch (err) {
    console.error("OpenSearch bulk index error:", err);
    // Re-queue events for retry (simplified — use a real queue in production)
    eventBuffer.unshift(...events);
  }
}

⚙️ DevOps Done Right — Zero Downtime, Full Automation

Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.

  • Staging + production environments with feature flags
  • Automated security scanning in the pipeline
  • Uptime monitoring + alerting + runbook automation
  • On-call support handover docs included

Analytics Aggregation Queries

// lib/opensearch/analytics.ts

// Dashboard: active users over time with event breakdown
export async function getDailyActiveUsers(params: {
  workspaceId: string;
  from: string;
  to: string;
  interval: "day" | "week" | "month";
}) {
  const response = await client.search({
    index: "events",
    body: {
      size: 0,  // Don't return documents, only aggregations
      query: {
        bool: {
          filter: [
            { term: { workspace_id: params.workspaceId } },
            { range: { timestamp: { gte: params.from, lte: params.to } } },
          ],
        },
      },
      aggs: {
        over_time: {
          date_histogram: {
            field: "timestamp",
            calendar_interval: params.interval,
            time_zone: "UTC",
          },
          aggs: {
            unique_users: {
              cardinality: { field: "user_id", precision_threshold: 1000 },
            },
            by_event_type: {
              terms: { field: "event_type", size: 10 },
            },
          },
        },
      },
    },
  });

  const buckets = response.body.aggregations.over_time.buckets;
  return buckets.map((b: any) => ({
    date: b.key_as_string,
    uniqueUsers: b.unique_users.value,
    events: b.doc_count,
    byType: Object.fromEntries(
      b.by_event_type.buckets.map((t: any) => [t.key, t.doc_count])
    ),
  }));
}

// Funnel analysis: conversion steps
export async function getFunnelConversion(params: {
  workspaceId: string;
  steps: string[];   // event_type values in order
  from: string;
  to: string;
}) {
  // Run parallel cardinality queries for each step
  const stepQueries = params.steps.map((step) =>
    client.search({
      index: "events",
      body: {
        size: 0,
        query: {
          bool: {
            filter: [
              { term: { workspace_id: params.workspaceId } },
              { term: { event_type: step } },
              { range: { timestamp: { gte: params.from, lte: params.to } } },
            ],
          },
        },
        aggs: {
          unique_users: { cardinality: { field: "user_id" } },
        },
      },
    })
  );

  const results = await Promise.all(stepQueries);
  const counts = results.map(
    (r) => r.body.aggregations.unique_users.value as number
  );

  return params.steps.map((step, i) => ({
    step,
    users: counts[i],
    conversionFromPrevious: i === 0 ? 100 : Math.round((counts[i] / counts[i - 1]) * 100),
    conversionFromFirst: Math.round((counts[i] / counts[0]) * 100),
  }));
}

// Revenue analytics with percentiles
export async function getRevenueMetrics(params: {
  workspaceId: string;
  from: string;
  to: string;
}) {
  const response = await client.search({
    index: "events",
    body: {
      size: 0,
      query: {
        bool: {
          filter: [
            { term: { workspace_id: params.workspaceId } },
            { term: { event_type: "payment_completed" } },
            { range: { timestamp: { gte: params.from, lte: params.to } } },
          ],
        },
      },
      aggs: {
        total_revenue: { sum: { field: "metadata.amount_cents" } },
        avg_order_value: { avg: { field: "metadata.amount_cents" } },
        order_percentiles: {
          percentiles: {
            field: "metadata.amount_cents",
            percents: [25, 50, 75, 90, 95, 99],
          },
        },
        by_plan: {
          terms: { field: "plan", size: 10 },
          aggs: {
            revenue: { sum: { field: "metadata.amount_cents" } },
            count: { value_count: { field: "event_id" } },
          },
        },
        over_time: {
          date_histogram: {
            field: "timestamp",
            calendar_interval: "day",
          },
          aggs: {
            daily_revenue: { sum: { field: "metadata.amount_cents" } },
          },
        },
      },
    },
  });

  const aggs = response.body.aggregations;
  return {
    totalRevenue: aggs.total_revenue.value / 100,
    avgOrderValue: aggs.avg_order_value.value / 100,
    percentiles: Object.fromEntries(
      Object.entries(aggs.order_percentiles.values).map(([k, v]) => [
        k,
        (v as number) / 100,
      ])
    ),
    byPlan: aggs.by_plan.buckets.map((b: any) => ({
      plan: b.key,
      revenue: b.revenue.value / 100,
      orders: b.count.value,
    })),
    timeSeries: aggs.over_time.buckets.map((b: any) => ({
      date: b.key_as_string,
      revenue: b.daily_revenue.value / 100,
    })),
  };
}

// Full-text search with facets
export async function searchProducts(params: {
  query: string;
  filters: {
    category?: string[];
    brand?: string[];
    priceMin?: number;
    priceMax?: number;
    minRating?: number;
    inStock?: boolean;
  };
  sort?: "relevance" | "price_asc" | "price_desc" | "rating";
  page: number;
  pageSize: number;
}) {
  const must: any[] = [];
  const filter: any[] = [{ term: { is_active: true } }];

  if (params.query) {
    must.push({
      multi_match: {
        query: params.query,
        fields: ["name^3", "name.autocomplete^2", "description", "brand^2", "tags^1.5"],
        type: "best_fields",
        fuzziness: "AUTO",
        prefix_length: 2,
      },
    });
  }

  if (params.filters.category?.length) {
    filter.push({ terms: { category: params.filters.category } });
  }
  if (params.filters.brand?.length) {
    filter.push({ terms: { brand: params.filters.brand } });
  }
  if (params.filters.priceMin !== undefined || params.filters.priceMax !== undefined) {
    filter.push({
      range: {
        price: {
          ...(params.filters.priceMin !== undefined && { gte: params.filters.priceMin }),
          ...(params.filters.priceMax !== undefined && { lte: params.filters.priceMax }),
        },
      },
    });
  }
  if (params.filters.minRating) {
    filter.push({ range: { rating: { gte: params.filters.minRating } } });
  }
  if (params.filters.inStock) {
    filter.push({ range: { stock: { gt: 0 } } });
  }

  const sortConfig: Record<string, any> = {
    relevance: ["_score"],
    price_asc: [{ price: "asc" }],
    price_desc: [{ price: "desc" }],
    rating: [{ rating: "desc" }, { review_count: "desc" }],
  };

  const response = await client.search({
    index: "products",
    body: {
      from: (params.page - 1) * params.pageSize,
      size: params.pageSize,
      query: { bool: { must: must.length ? must : [{ match_all: {} }], filter } },
      sort: sortConfig[params.sort ?? "relevance"],
      // Aggregations for facets sidebar
      aggs: {
        categories: { terms: { field: "category", size: 20 } },
        brands: { terms: { field: "brand", size: 20 } },
        price_ranges: {
          range: {
            field: "price",
            ranges: [
              { to: 25 },
              { from: 25, to: 50 },
              { from: 50, to: 100 },
              { from: 100, to: 200 },
              { from: 200 },
            ],
          },
        },
        avg_price: { avg: { field: "price" } },
      },
      highlight: {
        fields: {
          name: { number_of_fragments: 1 },
          description: { number_of_fragments: 2, fragment_size: 150 },
        },
        pre_tags: ["<mark>"],
        post_tags: ["</mark>"],
      },
    },
  });

  return {
    total: response.body.hits.total.value,
    products: response.body.hits.hits.map((h: any) => ({
      ...h._source,
      _score: h._score,
      highlights: h.highlight,
    })),
    facets: {
      categories: response.body.aggregations.categories.buckets,
      brands: response.body.aggregations.brands.buckets,
      priceRanges: response.body.aggregations.price_ranges.buckets,
    },
  };
}

Index Lifecycle Management (ILM)

For time-series data (events, logs), use ILM to automatically manage index rollover and deletion:

// lib/opensearch/ilm.ts

export async function createEventsILMPolicy() {
  await client.ilm.putLifecycle({
    policy: "events-policy",
    body: {
      policy: {
        phases: {
          hot: {
            min_age: "0ms",
            actions: {
              rollover: {
                max_age: "1d",         // Roll over daily
                max_primary_shard_size: "10gb",
              },
              set_priority: { priority: 100 },
            },
          },
          warm: {
            min_age: "7d",             // Move to warm after 7 days
            actions: {
              shrink: { number_of_shards: 1 },
              forcemerge: { max_num_segments: 1 },
              set_priority: { priority: 50 },
            },
          },
          cold: {
            min_age: "30d",            // Move to cold after 30 days
            actions: {
              set_priority: { priority: 0 },
            },
          },
          delete: {
            min_age: "90d",            // Delete after 90 days
            actions: { delete: {} },
          },
        },
      },
    },
  });
}

Cost Estimates

Cluster SizeInstance TypeStorageMonthly Cost
Dev/staging1× t3.small.search20GB~$25/month
Small production2× t3.medium.search50GB × 2~$150/month
Medium production3× r6g.large.search100GB × 3~$500/month
Large production3× r6g.2xlarge.search + 3 masters500GB × 3~$2,000/month

Cost reduction tips: Enable UltraWarm storage for infrequently queried data ($0.024/GB vs $0.135/GB for hot storage). Use ILM to move old data to cold/delete automatically.

Cost and Timeline Estimates

ScopeTeamTimelineCost Range
Basic cluster + index setup1 dev1–2 days$400–800
Search with facets and highlighting1 dev3–5 days$1,000–2,000
Analytics pipeline + dashboards1–2 devs1–2 weeks$3,000–6,000
Full observability stack (logs, metrics, traces)2–3 devs3–5 weeks$8,000–18,000

See Also


Working With Viprasol

OpenSearch unlocks analytics that PostgreSQL can't deliver at scale — but getting there requires careful index design, query optimization, and cost management. Our team has built OpenSearch-backed search and analytics systems for e-commerce platforms, SaaS products, and log analytics pipelines.

What we deliver:

  • Terraform-managed OpenSearch cluster with VPC, encryption, and ILM
  • Index mapping design optimized for your query patterns
  • Aggregation queries for dashboards and funnel analytics
  • Product search with facets, autocomplete, and relevance tuning
  • PostgreSQL → OpenSearch sync pipeline

Talk to our team about search and analytics architecture →

Or explore our cloud infrastructure services.

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need DevOps & Cloud Expertise?

Scale your infrastructure with confidence. AWS, GCP, Azure certified team.

Free consultation • No commitment • Response within 24 hours

Viprasol · Big Data & Analytics

Making sense of your data at scale?

Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.