AWS Bedrock RAG in 2026: Knowledge Bases, Embedding Pipeline, and Retrieval-Augmented Generation
Build a RAG system with AWS Bedrock: Knowledge Bases with S3 and OpenSearch, embedding pipeline, retrieval queries, Claude integration, and Terraform configuration.
AWS Bedrock RAG in 2026: Knowledge Bases, Embedding Pipeline, and Retrieval-Augmented Generation
AWS Bedrock's Knowledge Bases provide fully managed RAG โ you point it at an S3 bucket, it handles chunking, embedding, and vector storage in OpenSearch Serverless or Aurora pgvector, and exposes a single API for retrieval. No vector database to manage, no embedding model to deploy.
This post covers Bedrock Knowledge Bases setup, the ingestion pipeline (S3 โ embed โ vector store), retrieval with RetrieveAndGenerate, custom RAG with Retrieve + InvokeModel for more control, streaming responses, and Terraform configuration.
Architecture
Documents (S3)
โ Ingestion job
Chunks + Embeddings (Titan Embeddings v2)
โ
Vector Store (OpenSearch Serverless or Aurora pgvector)
โ Retrieval
Top-K relevant chunks
โ Augmented prompt
Claude claude-sonnet-4-6 (or other Bedrock model)
โ
Answer with citations
Terraform: Knowledge Base Setup
# terraform/bedrock-kb.tf
# S3 bucket for knowledge base documents
resource "aws_s3_bucket" "kb_documents" {
bucket = "${var.name}-${var.environment}-kb-documents"
tags = var.common_tags
}
resource "aws_s3_bucket_versioning" "kb_documents" {
bucket = aws_s3_bucket.kb_documents.id
versioning_configuration { status = "Enabled" }
}
# OpenSearch Serverless collection (vector store)
resource "aws_opensearchserverless_collection" "kb" {
name = "${var.name}-${var.environment}-kb"
type = "VECTORSEARCH"
tags = var.common_tags
depends_on = [
aws_opensearchserverless_security_policy.encryption,
aws_opensearchserverless_security_policy.network,
aws_opensearchserverless_access_policy.kb,
]
}
resource "aws_opensearchserverless_security_policy" "encryption" {
name = "${var.name}-${var.environment}-kb-enc"
type = "encryption"
policy = jsonencode({
Rules = [{ ResourceType = "collection", Resource = ["collection/${var.name}-${var.environment}-kb"] }]
AWSOwnedKey = true
})
}
resource "aws_opensearchserverless_security_policy" "network" {
name = "${var.name}-${var.environment}-kb-net"
type = "network"
policy = jsonencode([{
Rules = [
{ ResourceType = "dashboard", Resource = ["collection/${var.name}-${var.environment}-kb"] },
{ ResourceType = "collection", Resource = ["collection/${var.name}-${var.environment}-kb"] }
]
AllowFromPublic = true
}])
}
resource "aws_opensearchserverless_access_policy" "kb" {
name = "${var.name}-${var.environment}-kb-access"
type = "data"
policy = jsonencode([{
Rules = [
{
ResourceType = "index"
Resource = ["index/${var.name}-${var.environment}-kb/*"]
Permission = ["aoss:*"]
},
{
ResourceType = "collection"
Resource = ["collection/${var.name}-${var.environment}-kb"]
Permission = ["aoss:*"]
}
]
Principal = [
aws_iam_role.bedrock_kb.arn,
"arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
]
}])
}
# IAM role for Bedrock Knowledge Base
resource "aws_iam_role" "bedrock_kb" {
name = "${var.name}-${var.environment}-bedrock-kb"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "bedrock.amazonaws.com" }
Action = "sts:AssumeRole"
Condition = {
StringEquals = { "aws:SourceAccount" = data.aws_caller_identity.current.account_id }
}
}]
})
}
resource "aws_iam_role_policy" "bedrock_kb" {
name = "kb-permissions"
role = aws_iam_role.bedrock_kb.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:GetObject", "s3:ListBucket"]
Resource = [
aws_s3_bucket.kb_documents.arn,
"${aws_s3_bucket.kb_documents.arn}/*"
]
},
{
Effect = "Allow"
Action = ["aoss:APIAccessAll"]
Resource = aws_opensearchserverless_collection.kb.arn
},
{
Effect = "Allow"
Action = ["bedrock:InvokeModel"]
Resource = "arn:aws:bedrock:${var.region}::foundation-model/amazon.titan-embed-text-v2:0"
}
]
})
}
# Bedrock Knowledge Base
resource "aws_bedrockagent_knowledge_base" "main" {
name = "${var.name}-${var.environment}-kb"
role_arn = aws_iam_role.bedrock_kb.arn
knowledge_base_configuration {
type = "VECTOR"
vector_knowledge_base_configuration {
embedding_model_arn = "arn:aws:bedrock:${var.region}::foundation-model/amazon.titan-embed-text-v2:0"
}
}
storage_configuration {
type = "OPENSEARCH_SERVERLESS"
opensearch_serverless_configuration {
collection_arn = aws_opensearchserverless_collection.kb.arn
vector_index_name = "bedrock-kb-index"
field_mapping {
vector_field = "embedding"
text_field = "AMAZON_BEDROCK_TEXT_CHUNK"
metadata_field = "AMAZON_BEDROCK_METADATA"
}
}
}
tags = var.common_tags
}
# Data source: S3 documents
resource "aws_bedrockagent_data_source" "documents" {
knowledge_base_id = aws_bedrockagent_knowledge_base.main.id
name = "documents"
data_source_configuration {
type = "S3"
s3_configuration {
bucket_arn = aws_s3_bucket.kb_documents.arn
}
}
vector_ingestion_configuration {
chunking_configuration {
chunking_strategy = "FIXED_SIZE"
fixed_size_chunking_configuration {
max_tokens = 512
overlap_percentage = 20
}
}
}
}
๐ค AI Is Not the Future โ It Is Right Now
Businesses using AI automation cut manual work by 60โ80%. We build production-ready AI systems โ RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.
- LLM integration (OpenAI, Anthropic, Gemini, local models)
- RAG systems that answer from your own data
- AI agents that take real actions โ not just chat
- Custom ML models for prediction, classification, detection
Ingestion: Upload Documents and Sync
// lib/bedrock/knowledge-base.ts
import {
BedrockAgentClient,
StartIngestionJobCommand,
GetIngestionJobCommand,
} from "@aws-sdk/client-bedrock-agent";
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { readFileSync } from "fs";
const agent = new BedrockAgentClient({ region: process.env.AWS_REGION });
const s3 = new S3Client({ region: process.env.AWS_REGION });
const KB_ID = process.env.BEDROCK_KB_ID!;
const DS_ID = process.env.BEDROCK_DS_ID!;
const KB_S3_BUCKET = process.env.KB_S3_BUCKET!;
// Upload a document to S3 (Knowledge Base will ingest on next sync)
export async function uploadDocument(
key: string,
content: string | Buffer,
contentType: "text/plain" | "text/markdown" | "application/pdf" = "text/plain"
) {
await s3.send(new PutObjectCommand({
Bucket: KB_S3_BUCKET,
Key: key,
Body: content,
ContentType: contentType,
Metadata: {
// Metadata available in retrieval results
source: key,
uploadedAt: new Date().toISOString(),
},
}));
}
// Trigger ingestion job (processes new/changed S3 documents)
export async function syncKnowledgeBase(): Promise<string> {
const { ingestionJob } = await agent.send(new StartIngestionJobCommand({
knowledgeBaseId: KB_ID,
dataSourceId: DS_ID,
}));
return ingestionJob!.ingestionJobId!;
}
// Wait for ingestion to complete
export async function waitForIngestion(jobId: string, timeoutMs = 300_000): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const { ingestionJob } = await agent.send(new GetIngestionJobCommand({
knowledgeBaseId: KB_ID,
dataSourceId: DS_ID,
ingestionJobId: jobId,
}));
const status = ingestionJob?.status;
if (status === "COMPLETE") return;
if (status === "FAILED") throw new Error(`Ingestion failed: ${ingestionJob?.failureReasons}`);
await new Promise((r) => setTimeout(r, 5000));
}
throw new Error("Ingestion timed out");
}
RetrieveAndGenerate: One-Call RAG
// lib/bedrock/rag.ts
import {
BedrockAgentRuntimeClient,
RetrieveAndGenerateCommand,
RetrieveCommand,
type RetrieveAndGenerateCommandInput,
} from "@aws-sdk/client-bedrock-agent-runtime";
import {
BedrockRuntimeClient,
InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";
const agentRuntime = new BedrockAgentRuntimeClient({ region: process.env.AWS_REGION });
const bedrockRuntime = new BedrockRuntimeClient({ region: process.env.AWS_REGION });
const KB_ID = process.env.BEDROCK_KB_ID!;
export interface RAGResult {
answer: string;
citations: Array<{
text: string;
source: string;
score: number;
}>;
}
// Simple RAG: retrieve + generate in one API call
export async function retrieveAndGenerate(
query: string,
options: { maxResults?: number; systemPrompt?: string } = {}
): Promise<RAGResult> {
const input: RetrieveAndGenerateCommandInput = {
input: { text: query },
retrieveAndGenerateConfiguration: {
type: "KNOWLEDGE_BASE",
knowledgeBaseConfiguration: {
knowledgeBaseId: KB_ID,
modelArn: "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-sonnet-4-6-v1:0",
retrievalConfiguration: {
vectorSearchConfiguration: {
numberOfResults: options.maxResults ?? 5,
},
},
generationConfiguration: {
promptTemplate: {
textPromptTemplate: options.systemPrompt
? `${options.systemPrompt}\n\n$search_results$\n\nQuestion: $query$\n\nAnswer based only on the provided context.`
: undefined,
},
inferenceConfig: {
textInferenceConfig: {
maxTokens: 1024,
temperature: 0.1, // Low temperature for factual retrieval
},
},
},
},
},
};
const response = await agentRuntime.send(new RetrieveAndGenerateCommand(input));
const answer = response.output?.text ?? "";
const citations = (response.citations ?? []).flatMap((citation) =>
(citation.retrievedReferences ?? []).map((ref) => ({
text: ref.content?.text ?? "",
source: (ref.location?.s3Location?.uri ?? "").split("/").pop() ?? "",
score: 0, // Score not available in RetrieveAndGenerate response
}))
);
return { answer, citations };
}
โก Your Competitors Are Already Using AI โ Are You?
We build AI systems that actually work in production โ not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.
- AI agent systems that run autonomously โ not just chatbots
- Integrates with your existing tools (CRM, ERP, Slack, etc.)
- Explainable outputs โ know why the model decided what it did
- Free AI opportunity audit for your business
Custom RAG: Retrieve + Stream
For more control โ retrieve chunks, build your own prompt, stream the response:
// lib/bedrock/custom-rag.ts
export async function* retrieveAndStreamAnswer(
query: string,
systemPrompt: string,
options: { maxResults?: number } = {}
): AsyncGenerator<string> {
// Step 1: Retrieve relevant chunks
const { retrievalResults } = await agentRuntime.send(new RetrieveCommand({
knowledgeBaseId: KB_ID,
retrievalQuery: { text: query },
retrievalConfiguration: {
vectorSearchConfiguration: {
numberOfResults: options.maxResults ?? 5,
overrideSearchType: "HYBRID", // Semantic + keyword search
},
},
}));
// Filter low-relevance results
const relevantChunks = (retrievalResults ?? [])
.filter((r) => (r.score ?? 0) > 0.5)
.map((r) => ({
text: r.content?.text ?? "",
source: r.location?.s3Location?.uri?.split("/").pop() ?? "unknown",
score: r.score ?? 0,
}));
if (relevantChunks.length === 0) {
yield "I don't have information about that in my knowledge base.";
return;
}
// Step 2: Build augmented prompt
const context = relevantChunks
.map((c, i) => `[${i + 1}] Source: ${c.source}\n${c.text}`)
.join("\n\n---\n\n");
const userMessage = `Context from knowledge base:\n\n${context}\n\n---\n\nQuestion: ${query}`;
// Step 3: Stream response from Claude
const stream = await bedrockRuntime.send(new InvokeModelWithResponseStreamCommand({
modelId: "anthropic.claude-sonnet-4-6-v1:0",
contentType: "application/json",
accept: "application/json",
body: JSON.stringify({
anthropic_version: "bedrock-2023-05-31",
max_tokens: 1024,
system: systemPrompt,
messages: [{ role: "user", content: userMessage }],
}),
}));
// Stream text chunks
for await (const chunk of stream.body ?? []) {
if (chunk.chunk?.bytes) {
const parsed = JSON.parse(new TextDecoder().decode(chunk.chunk.bytes));
if (parsed.type === "content_block_delta" && parsed.delta?.type === "text_delta") {
yield parsed.delta.text;
}
}
}
}
API Route: Streaming RAG Endpoint
// app/api/ai/ask/route.ts
import { NextRequest } from "next/server";
import { getWorkspaceContext } from "@/lib/auth/workspace-context";
import { retrieveAndStreamAnswer } from "@/lib/bedrock/custom-rag";
export async function POST(req: NextRequest) {
const ctx = await getWorkspaceContext();
if (!ctx) return new Response("Unauthorized", { status: 401 });
const { query } = await req.json();
if (!query?.trim()) return new Response("Query required", { status: 400 });
const systemPrompt = `You are a helpful assistant for ${ctx.workspace.name}.
Answer questions based only on the provided context.
If the context doesn't contain the answer, say so clearly.
Cite your sources using [1], [2], etc.`;
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
try {
for await (const chunk of retrieveAndStreamAnswer(query, systemPrompt)) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ text: chunk })}\n\n`));
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
} catch (err) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ error: "Generation failed" })}\n\n`)
);
} finally {
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
Cost Estimation
| Component | Cost |
|---|---|
| Titan Embeddings v2 | $0.00002/1K tokens (ingestion) |
| Claude claude-sonnet-4-6 | $3/M input tokens, $15/M output tokens |
| OpenSearch Serverless | ~$0.24/OCU-hour (min 2 OCU = ~$350/month) |
| Aurora pgvector (alternative) | db.t3.medium ~$60/month |
| S3 storage | $0.023/GB/month |
For small-medium RAG workloads (<100K documents), Aurora pgvector is significantly cheaper than OpenSearch Serverless. Use OpenSearch Serverless for millions of documents or when you need full-text hybrid search.
See Also
- OpenAI Function Calling โ Function calling for structured RAG outputs
- AWS Lambda Container โ Running RAG in Lambda containers
- AWS Secrets Manager โ Storing Bedrock credentials
- AWS Step Functions โ Orchestrating multi-step RAG pipelines
Working With Viprasol
We build RAG systems on AWS Bedrock for SaaS products โ from internal documentation search through customer-facing AI assistants. Our AI team has shipped Bedrock Knowledge Base integrations with streaming responses, citation tracking, and relevance filtering.
What we deliver:
- Bedrock Knowledge Base setup with S3 data source and OpenSearch/pgvector
- Document ingestion pipeline with sync automation
- Custom RAG with retrieve + stream for full control
- Streaming SSE endpoint for real-time response display
- Cost analysis: OpenSearch Serverless vs Aurora pgvector for your document volume
See our AI/ML services or contact us to build your RAG system on AWS Bedrock.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Want to Implement AI in Your Business?
From chatbots to predictive models โ harness the power of AI with a team that delivers.
Free consultation โข No commitment โข Response within 24 hours
Ready to automate your business with AI agents?
We build custom multi-agent AI systems that handle sales, support, ops, and content โ across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.