Skip to content

AWS Bedrock RAG

End-to-end wiring for a Bedrock + OpenSearch Serverless + CloudWatch RAG stack.

What you need

  • An OpenSearch Serverless collection with a k-NN index that stores per-call embeddings and a timestamp field.
  • IAM credentials with read access to that collection and cloudwatch:PutMetricData.
  • pip install 'ragdrift-py[opensearch,aws]'

The loop

import os
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection

from ragdrift import EmbeddingDrift, DriftReport
from ragdrift.adapters import OpenSearchAdapter
from ragdrift.adapters.opensearch import OpenSearchWindow
from ragdrift.exporters import CloudWatchExporter

os_client = OpenSearch(
    hosts=[os.environ["RAGDRIFT_OS_HOST"]],
    connection_class=RequestsHttpConnection,
    timeout=30,
)
adapter = OpenSearchAdapter(
    client=os_client,
    index="rag-prod-embeddings",
    embedding_field="embedding",
    timestamp_field="@timestamp",
)
baseline_emb, current_emb = adapter.fetch_pair(
    baseline=OpenSearchWindow("2026-04-01T00:00:00Z", "2026-04-08T00:00:00Z"),
    current=OpenSearchWindow("2026-05-01T00:00:00Z", "2026-05-08T00:00:00Z"),
    sample_size=2000,
)

score = EmbeddingDrift(threshold=0.05).detect(baseline_emb, current_emb)
report = DriftReport([score], baseline_emb.shape[0], current_emb.shape[0])

cw = boto3.client("cloudwatch", region_name="us-east-1")
CloudWatchExporter(
    client=cw,
    namespace="rag/drift",
    dimensions=[{"Name": "service", "Value": "ask-anything"}],
).record(report)

Schedule it

Run hourly via EventBridge -> Lambda, or as a sidecar on your ingestion service. The Rust core takes single-digit milliseconds for 2000-vector samples, so this is not a meaningful tax on your pipeline.

Alarming

The exporter publishes RagDrift_embedding (raw score) and RagDrift_embedding_exceeded (0/1) per dimension. Alarm on the raw score with your own threshold, or on _exceeded == 1 if you want to honor the threshold configured in code.