AWS OpenSearch Analytics: Index Mappings, Aggregations, and Dashboards
Build analytics pipelines with AWS OpenSearch Service. Covers cluster setup with Terraform, index mapping design, aggregation queries for metrics, real-time log ingestion, and OpenSearch Dashboards.
AWS OpenSearch Service (the managed Elasticsearch fork) handles use cases where PostgreSQL full-text search runs out of runway: datasets above 10M documents, faceted search with many filter combinations, log analytics at scale, and real-time dashboards that aggregate across millions of events. It's the right tool when you need sub-second aggregations over hundreds of millions of records.
This guide covers cluster provisioning with Terraform, index mapping design, the aggregation queries that power analytics dashboards, and log ingestion patterns.
When OpenSearch vs PostgreSQL
| Factor | PostgreSQL FTS | OpenSearch |
|---|---|---|
| Row count | Up to ~10M efficiently | 100M+ no problem |
| Faceted search | Slow beyond 3–4 facets | Fast with term aggregations |
| Log/event analytics | Difficult (no time-series opt) | Native with date histograms |
| Relevance ranking | Basic (ts_rank) | Full BM25 + learning-to-rank |
| Infrastructure | Already have it | New cluster to manage |
| Cost | Included in DB cost | $50–500+/month additional |
| Full-text + relational joins | Excellent | Poor (denormalize data) |
Use PostgreSQL first. Add OpenSearch when you hit its limits.
Terraform: OpenSearch Cluster
# opensearch/main.tf
locals {
domain_name = "${var.project}-opensearch"
}
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
# Dedicated master nodes for production stability
resource "aws_opensearch_domain" "main" {
domain_name = local.domain_name
engine_version = "OpenSearch_2.15"
cluster_config {
# Data nodes
instance_count = var.environment == "production" ? 3 : 1
instance_type = var.environment == "production" ? "r6g.large.search" : "t3.small.search"
# Dedicated masters (production only — prevents split brain)
dedicated_master_enabled = var.environment == "production"
dedicated_master_count = 3
dedicated_master_type = "m6g.large.search"
# Multi-AZ
zone_awareness_enabled = var.environment == "production"
zone_awareness_config {
availability_zone_count = 3
}
}
ebs_options {
ebs_enabled = true
volume_type = "gp3"
volume_size = var.environment == "production" ? 100 : 20 # GB per node
throughput = 250 # MB/s (gp3)
iops = 3000
}
# Encryption at rest
encrypt_at_rest {
enabled = true
kms_key_id = aws_kms_key.opensearch.arn
}
# Encryption in transit
node_to_node_encryption {
enabled = true
}
# HTTPS only
domain_endpoint_options {
enforce_https = true
tls_security_policy = "Policy-Min-TLS-1-2-2019-07"
custom_endpoint_enabled = var.environment == "production"
custom_endpoint = var.environment == "production" ? "search.${var.domain}" : null
custom_endpoint_certificate_arn = var.environment == "production" ? var.acm_cert_arn : null
}
# Fine-grained access control
advanced_security_options {
enabled = true
anonymous_auth_enabled = false
internal_user_database_enabled = true
master_user_options {
master_user_name = var.opensearch_master_user
master_user_password = var.opensearch_master_password
}
}
# VPC deployment (recommended for production)
vpc_options {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.opensearch.id]
}
# Auto-tune for memory optimization
auto_tune_options {
desired_state = "ENABLED"
rollback_on_disable = "NO_ROLLBACK"
maintenance_schedule {
start_at = "2027-03-15T01:00:00Z"
cron_expression_for_recurrence = "cron(0 1 ? * SUN *)"
duration {
value = 2
unit = "HOURS"
}
}
}
# Slow log publishing
log_publishing_options {
log_type = "INDEX_SLOW_LOGS"
cloudwatch_log_group_arn = "${aws_cloudwatch_log_group.opensearch_slow.arn}:*"
enabled = true
}
log_publishing_options {
log_type = "SEARCH_SLOW_LOGS"
cloudwatch_log_group_arn = "${aws_cloudwatch_log_group.opensearch_slow.arn}:*"
enabled = true
}
tags = var.common_tags
}
# Access policy
resource "aws_opensearch_domain_policy" "main" {
domain_name = aws_opensearch_domain.main.domain_name
access_policies = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = { AWS = aws_iam_role.app_opensearch.arn }
Action = "es:*"
Resource = "${aws_opensearch_domain.main.arn}/*"
}
]
})
}
resource "aws_security_group" "opensearch" {
name = "${var.project}-opensearch-sg"
vpc_id = var.vpc_id
ingress {
from_port = 443
to_port = 443
protocol = "tcp"
security_groups = [var.app_security_group_id]
description = "HTTPS from application"
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
☁️ Is Your Cloud Costing Too Much?
Most teams overspend 30–40% on cloud — wrong instance types, no reserved pricing, bloated storage. We audit, right-size, and automate your infrastructure.
- AWS, GCP, Azure certified engineers
- Infrastructure as Code (Terraform, CDK)
- Docker, Kubernetes, GitHub Actions CI/CD
- Typical audit recovers $500–$3,000/month in savings
Index Mapping Design
// lib/opensearch/mappings.ts
import { Client } from "@opensearch-project/opensearch";
const client = new Client({
node: process.env.OPENSEARCH_URL!,
auth: {
username: process.env.OPENSEARCH_USER!,
password: process.env.OPENSEARCH_PASSWORD!,
},
ssl: { rejectUnauthorized: true },
});
// Events index — for analytics and dashboards
const eventsMapping = {
settings: {
number_of_shards: 3,
number_of_replicas: 1,
// Index Lifecycle Management: hot → warm → cold → delete
"index.lifecycle.name": "events-policy",
"index.lifecycle.rollover_alias": "events",
refresh_interval: "5s", // Trade off freshness for throughput
"index.codec": "best_compression",
},
mappings: {
dynamic: "strict" as const, // Reject unknown fields to prevent mapping explosion
properties: {
// Event identification
event_id: { type: "keyword" as const },
event_type: { type: "keyword" as const },
timestamp: { type: "date" as const, format: "strict_date_optional_time||epoch_millis" },
// Actor
user_id: { type: "keyword" as const },
workspace_id: { type: "keyword" as const },
session_id: { type: "keyword" as const },
// Request context
ip_address: { type: "ip" as const },
user_agent: { type: "keyword" as const, index: false }, // store but don't index
country: { type: "keyword" as const },
device_type: { type: "keyword" as const },
// Business data
plan: { type: "keyword" as const },
entity_type: { type: "keyword" as const },
entity_id: { type: "keyword" as const },
// Metrics
duration_ms: { type: "integer" as const },
status_code: { type: "short" as const },
bytes_sent: { type: "long" as const },
// Full-text searchable fields
description: {
type: "text" as const,
analyzer: "english",
fields: {
keyword: { type: "keyword" as const, ignore_above: 256 },
},
},
// Nested properties (avoid if possible — expensive)
metadata: {
type: "object" as const,
dynamic: false, // Don't index metadata sub-fields
enabled: false, // Store only, no indexing
},
},
},
};
// Products index — for search
const productsMapping = {
settings: {
number_of_shards: 2,
number_of_replicas: 1,
analysis: {
analyzer: {
product_analyzer: {
type: "custom",
tokenizer: "standard",
filter: ["lowercase", "asciifolding", "product_synonyms", "autocomplete_filter"],
},
autocomplete_analyzer: {
type: "custom",
tokenizer: "standard",
filter: ["lowercase", "asciifolding", "autocomplete_filter"],
},
},
filter: {
autocomplete_filter: {
type: "edge_ngram",
min_gram: 2,
max_gram: 20,
},
product_synonyms: {
type: "synonym",
synonyms: [
"tv, television",
"laptop, notebook",
"mobile, phone, smartphone",
],
},
},
},
},
mappings: {
properties: {
product_id: { type: "keyword" as const },
name: {
type: "text" as const,
analyzer: "product_analyzer",
search_analyzer: "standard",
fields: {
keyword: { type: "keyword" as const },
autocomplete: { type: "text" as const, analyzer: "autocomplete_analyzer", search_analyzer: "standard" },
},
},
description: { type: "text" as const, analyzer: "english" },
category: { type: "keyword" as const },
tags: { type: "keyword" as const },
price: { type: "scaled_float" as const, scaling_factor: 100 },
stock: { type: "integer" as const },
rating: { type: "half_float" as const },
review_count: { type: "integer" as const },
brand: { type: "keyword" as const },
is_active: { type: "boolean" as const },
created_at: { type: "date" as const },
updated_at: { type: "date" as const },
},
},
};
export async function createIndices() {
// Create index template for time-series events (ILM rollover)
await client.indices.putIndexTemplate({
name: "events-template",
body: {
index_patterns: ["events-*"],
priority: 100,
template: eventsMapping,
},
});
// Create initial index and alias
await client.indices.create({
index: "events-000001",
body: {
aliases: { events: { is_write_index: true } },
},
});
// Create products index
await client.indices.create({
index: "products",
body: productsMapping,
});
}
Ingesting Events
// lib/opensearch/ingest.ts
import { Client, errors } from "@opensearch-project/opensearch";
const client = new Client({ node: process.env.OPENSEARCH_URL! });
interface AnalyticsEvent {
eventId: string;
eventType: string;
timestamp: Date;
userId?: string;
workspaceId?: string;
sessionId?: string;
ipAddress?: string;
country?: string;
deviceType?: string;
entityType?: string;
entityId?: string;
durationMs?: number;
statusCode?: number;
metadata?: Record<string, unknown>;
}
// Buffer events and bulk-index for throughput
const eventBuffer: AnalyticsEvent[] = [];
let flushTimer: ReturnType<typeof setTimeout> | null = null;
export function trackEvent(event: AnalyticsEvent): void {
eventBuffer.push(event);
// Flush if buffer is large enough
if (eventBuffer.length >= 100) {
void flush();
} else if (!flushTimer) {
// Flush after 5 seconds even if buffer isn't full
flushTimer = setTimeout(() => void flush(), 5000);
}
}
async function flush(): Promise<void> {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
if (eventBuffer.length === 0) return;
const events = eventBuffer.splice(0, eventBuffer.length);
// Build bulk request body
const body = events.flatMap((event) => [
{ index: { _index: "events", _id: event.eventId } },
{
event_id: event.eventId,
event_type: event.eventType,
timestamp: event.timestamp.toISOString(),
user_id: event.userId,
workspace_id: event.workspaceId,
session_id: event.sessionId,
ip_address: event.ipAddress,
country: event.country,
device_type: event.deviceType,
entity_type: event.entityType,
entity_id: event.entityId,
duration_ms: event.durationMs,
status_code: event.statusCode,
},
]);
try {
const response = await client.bulk({ body, index: "events" });
if (response.body.errors) {
const failed = response.body.items.filter(
(item: any) => item.index?.error
);
console.error(`Bulk index: ${failed.length} documents failed:`, failed[0]?.index?.error);
}
} catch (err) {
console.error("OpenSearch bulk index error:", err);
// Re-queue events for retry (simplified — use a real queue in production)
eventBuffer.unshift(...events);
}
}
⚙️ DevOps Done Right — Zero Downtime, Full Automation
Ship faster without breaking things. We build CI/CD pipelines, monitoring stacks, and auto-scaling infrastructure that your team can actually maintain.
- Staging + production environments with feature flags
- Automated security scanning in the pipeline
- Uptime monitoring + alerting + runbook automation
- On-call support handover docs included
Analytics Aggregation Queries
// lib/opensearch/analytics.ts
// Dashboard: active users over time with event breakdown
export async function getDailyActiveUsers(params: {
workspaceId: string;
from: string;
to: string;
interval: "day" | "week" | "month";
}) {
const response = await client.search({
index: "events",
body: {
size: 0, // Don't return documents, only aggregations
query: {
bool: {
filter: [
{ term: { workspace_id: params.workspaceId } },
{ range: { timestamp: { gte: params.from, lte: params.to } } },
],
},
},
aggs: {
over_time: {
date_histogram: {
field: "timestamp",
calendar_interval: params.interval,
time_zone: "UTC",
},
aggs: {
unique_users: {
cardinality: { field: "user_id", precision_threshold: 1000 },
},
by_event_type: {
terms: { field: "event_type", size: 10 },
},
},
},
},
},
});
const buckets = response.body.aggregations.over_time.buckets;
return buckets.map((b: any) => ({
date: b.key_as_string,
uniqueUsers: b.unique_users.value,
events: b.doc_count,
byType: Object.fromEntries(
b.by_event_type.buckets.map((t: any) => [t.key, t.doc_count])
),
}));
}
// Funnel analysis: conversion steps
export async function getFunnelConversion(params: {
workspaceId: string;
steps: string[]; // event_type values in order
from: string;
to: string;
}) {
// Run parallel cardinality queries for each step
const stepQueries = params.steps.map((step) =>
client.search({
index: "events",
body: {
size: 0,
query: {
bool: {
filter: [
{ term: { workspace_id: params.workspaceId } },
{ term: { event_type: step } },
{ range: { timestamp: { gte: params.from, lte: params.to } } },
],
},
},
aggs: {
unique_users: { cardinality: { field: "user_id" } },
},
},
})
);
const results = await Promise.all(stepQueries);
const counts = results.map(
(r) => r.body.aggregations.unique_users.value as number
);
return params.steps.map((step, i) => ({
step,
users: counts[i],
conversionFromPrevious: i === 0 ? 100 : Math.round((counts[i] / counts[i - 1]) * 100),
conversionFromFirst: Math.round((counts[i] / counts[0]) * 100),
}));
}
// Revenue analytics with percentiles
export async function getRevenueMetrics(params: {
workspaceId: string;
from: string;
to: string;
}) {
const response = await client.search({
index: "events",
body: {
size: 0,
query: {
bool: {
filter: [
{ term: { workspace_id: params.workspaceId } },
{ term: { event_type: "payment_completed" } },
{ range: { timestamp: { gte: params.from, lte: params.to } } },
],
},
},
aggs: {
total_revenue: { sum: { field: "metadata.amount_cents" } },
avg_order_value: { avg: { field: "metadata.amount_cents" } },
order_percentiles: {
percentiles: {
field: "metadata.amount_cents",
percents: [25, 50, 75, 90, 95, 99],
},
},
by_plan: {
terms: { field: "plan", size: 10 },
aggs: {
revenue: { sum: { field: "metadata.amount_cents" } },
count: { value_count: { field: "event_id" } },
},
},
over_time: {
date_histogram: {
field: "timestamp",
calendar_interval: "day",
},
aggs: {
daily_revenue: { sum: { field: "metadata.amount_cents" } },
},
},
},
},
});
const aggs = response.body.aggregations;
return {
totalRevenue: aggs.total_revenue.value / 100,
avgOrderValue: aggs.avg_order_value.value / 100,
percentiles: Object.fromEntries(
Object.entries(aggs.order_percentiles.values).map(([k, v]) => [
k,
(v as number) / 100,
])
),
byPlan: aggs.by_plan.buckets.map((b: any) => ({
plan: b.key,
revenue: b.revenue.value / 100,
orders: b.count.value,
})),
timeSeries: aggs.over_time.buckets.map((b: any) => ({
date: b.key_as_string,
revenue: b.daily_revenue.value / 100,
})),
};
}
// Full-text search with facets
export async function searchProducts(params: {
query: string;
filters: {
category?: string[];
brand?: string[];
priceMin?: number;
priceMax?: number;
minRating?: number;
inStock?: boolean;
};
sort?: "relevance" | "price_asc" | "price_desc" | "rating";
page: number;
pageSize: number;
}) {
const must: any[] = [];
const filter: any[] = [{ term: { is_active: true } }];
if (params.query) {
must.push({
multi_match: {
query: params.query,
fields: ["name^3", "name.autocomplete^2", "description", "brand^2", "tags^1.5"],
type: "best_fields",
fuzziness: "AUTO",
prefix_length: 2,
},
});
}
if (params.filters.category?.length) {
filter.push({ terms: { category: params.filters.category } });
}
if (params.filters.brand?.length) {
filter.push({ terms: { brand: params.filters.brand } });
}
if (params.filters.priceMin !== undefined || params.filters.priceMax !== undefined) {
filter.push({
range: {
price: {
...(params.filters.priceMin !== undefined && { gte: params.filters.priceMin }),
...(params.filters.priceMax !== undefined && { lte: params.filters.priceMax }),
},
},
});
}
if (params.filters.minRating) {
filter.push({ range: { rating: { gte: params.filters.minRating } } });
}
if (params.filters.inStock) {
filter.push({ range: { stock: { gt: 0 } } });
}
const sortConfig: Record<string, any> = {
relevance: ["_score"],
price_asc: [{ price: "asc" }],
price_desc: [{ price: "desc" }],
rating: [{ rating: "desc" }, { review_count: "desc" }],
};
const response = await client.search({
index: "products",
body: {
from: (params.page - 1) * params.pageSize,
size: params.pageSize,
query: { bool: { must: must.length ? must : [{ match_all: {} }], filter } },
sort: sortConfig[params.sort ?? "relevance"],
// Aggregations for facets sidebar
aggs: {
categories: { terms: { field: "category", size: 20 } },
brands: { terms: { field: "brand", size: 20 } },
price_ranges: {
range: {
field: "price",
ranges: [
{ to: 25 },
{ from: 25, to: 50 },
{ from: 50, to: 100 },
{ from: 100, to: 200 },
{ from: 200 },
],
},
},
avg_price: { avg: { field: "price" } },
},
highlight: {
fields: {
name: { number_of_fragments: 1 },
description: { number_of_fragments: 2, fragment_size: 150 },
},
pre_tags: ["<mark>"],
post_tags: ["</mark>"],
},
},
});
return {
total: response.body.hits.total.value,
products: response.body.hits.hits.map((h: any) => ({
...h._source,
_score: h._score,
highlights: h.highlight,
})),
facets: {
categories: response.body.aggregations.categories.buckets,
brands: response.body.aggregations.brands.buckets,
priceRanges: response.body.aggregations.price_ranges.buckets,
},
};
}
Index Lifecycle Management (ILM)
For time-series data (events, logs), use ILM to automatically manage index rollover and deletion:
// lib/opensearch/ilm.ts
export async function createEventsILMPolicy() {
await client.ilm.putLifecycle({
policy: "events-policy",
body: {
policy: {
phases: {
hot: {
min_age: "0ms",
actions: {
rollover: {
max_age: "1d", // Roll over daily
max_primary_shard_size: "10gb",
},
set_priority: { priority: 100 },
},
},
warm: {
min_age: "7d", // Move to warm after 7 days
actions: {
shrink: { number_of_shards: 1 },
forcemerge: { max_num_segments: 1 },
set_priority: { priority: 50 },
},
},
cold: {
min_age: "30d", // Move to cold after 30 days
actions: {
set_priority: { priority: 0 },
},
},
delete: {
min_age: "90d", // Delete after 90 days
actions: { delete: {} },
},
},
},
},
});
}
Cost Estimates
| Cluster Size | Instance Type | Storage | Monthly Cost |
|---|---|---|---|
| Dev/staging | 1× t3.small.search | 20GB | ~$25/month |
| Small production | 2× t3.medium.search | 50GB × 2 | ~$150/month |
| Medium production | 3× r6g.large.search | 100GB × 3 | ~$500/month |
| Large production | 3× r6g.2xlarge.search + 3 masters | 500GB × 3 | ~$2,000/month |
Cost reduction tips: Enable UltraWarm storage for infrequently queried data ($0.024/GB vs $0.135/GB for hot storage). Use ILM to move old data to cold/delete automatically.
Cost and Timeline Estimates
| Scope | Team | Timeline | Cost Range |
|---|---|---|---|
| Basic cluster + index setup | 1 dev | 1–2 days | $400–800 |
| Search with facets and highlighting | 1 dev | 3–5 days | $1,000–2,000 |
| Analytics pipeline + dashboards | 1–2 devs | 1–2 weeks | $3,000–6,000 |
| Full observability stack (logs, metrics, traces) | 2–3 devs | 3–5 weeks | $8,000–18,000 |
See Also
- PostgreSQL Full-Text Search
- AWS CloudWatch Observability Setup
- SaaS Activity Feed Architecture
- AWS EventBridge Event-Driven Architecture
- PostgreSQL Window Functions for Analytics
Working With Viprasol
OpenSearch unlocks analytics that PostgreSQL can't deliver at scale — but getting there requires careful index design, query optimization, and cost management. Our team has built OpenSearch-backed search and analytics systems for e-commerce platforms, SaaS products, and log analytics pipelines.
What we deliver:
- Terraform-managed OpenSearch cluster with VPC, encryption, and ILM
- Index mapping design optimized for your query patterns
- Aggregation queries for dashboards and funnel analytics
- Product search with facets, autocomplete, and relevance tuning
- PostgreSQL → OpenSearch sync pipeline
Talk to our team about search and analytics architecture →
Or explore our cloud infrastructure services.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need DevOps & Cloud Expertise?
Scale your infrastructure with confidence. AWS, GCP, Azure certified team.
Free consultation • No commitment • Response within 24 hours
Making sense of your data at scale?
Viprasol builds end-to-end big data analytics solutions — ETL pipelines, data warehouses on Snowflake or BigQuery, and self-service BI dashboards. One reliable source of truth for your entire organisation.