Skip to main content
Version: 2.4.0-dev (Next)

Observability & Monitoring

Comprehensive observability is essential for production RAG systems. @DevilsDev/rag-pipeline-utils provides built-in monitoring, logging, tracing, and alerting capabilities designed for enterprise environments.


Monitoring Architecture

Multi-Layer Observability Stack

graph TB
subgraph "Application Layer"
RAG[RAG Pipeline]
Plugins[Plugins]
CLI[CLI Tools]
end

subgraph "Observability Layer"
Logger[Structured Logger]
Metrics[Metrics Collector]
Tracer[Distributed Tracer]
SLO[SLO Monitor]
end

subgraph "Storage & Analytics"
Prometheus[Prometheus]
Elasticsearch[Elasticsearch]
Jaeger[Jaeger]
Grafana[Grafana]
end

subgraph "Alerting"
AlertManager[Alert Manager]
PagerDuty[PagerDuty]
Slack[Slack]
end

RAG --> Logger
RAG --> Metrics
RAG --> Tracer
RAG --> SLO

Plugins --> Logger
Plugins --> Metrics
CLI --> Logger

Logger --> Elasticsearch
Metrics --> Prometheus
Tracer --> Jaeger
SLO --> Prometheus

Prometheus --> Grafana
Elasticsearch --> Grafana
Jaeger --> Grafana

Prometheus --> AlertManager
AlertManager --> PagerDuty
AlertManager --> Slack

Structured Logging

Logger Configuration

import { StructuredLogger } from "@DevilsDev/rag-pipeline-utils/observability";

const logger = new StructuredLogger({
level: "info",
format: "json",
correlationId: true,
sanitization: {
enabled: true,
patterns: ["password", "apiKey", "token"],
},
outputs: [
{
type: "console",
level: "debug",
},
{
type: "file",
path: "/var/log/rag-pipeline/app.log",
level: "info",
rotation: {
maxSize: "100MB",
maxFiles: 10,
},
},
{
type: "elasticsearch",
host: "elasticsearch.monitoring.svc.cluster.local",
index: "rag-pipeline-logs",
level: "warn",
},
],
});

Contextual Logging

// Request-scoped logging with correlation IDs
app.use((req, res, next) => {
req.correlationId = generateCorrelationId();
req.logger = logger.child({
correlationId: req.correlationId,
requestId: req.id,
userId: req.user?.id,
});
next();
});

// Pipeline operation logging
class RagPipeline {
async query(prompt, options = {}) {
const operationId = generateOperationId();
const startTime = Date.now();

this.logger.info("query.started", {
operationId,
prompt: sanitizePrompt(prompt),
options: sanitizeOptions(options),
timestamp: new Date().toISOString(),
});

try {
// Execute query
const result = await this.executeQuery(prompt, options);

this.logger.info("query.completed", {
operationId,
duration: Date.now() - startTime,
resultCount: result.sources?.length || 0,
confidence: result.confidence,
tokensUsed: result.usage?.totalTokens,
});

return result;
} catch (error) {
this.logger.error("query.failed", {
operationId,
duration: Date.now() - startTime,
error: {
name: error.name,
message: error.message,
stack: error.stack,
code: error.code,
},
});
throw error;
}
}
}

Security & Compliance Logging

// Audit logging for security events
const auditLogger = logger.child({
component: "security",
audit: true,
});

// Authentication events
auditLogger.info("auth.login", {
userId: user.id,
method: "oauth2",
provider: "azure-ad",
sourceIp: req.ip,
userAgent: req.get("User-Agent"),
success: true,
});

// Data access events
auditLogger.info("data.access", {
userId: user.id,
resource: "document",
resourceId: doc.id,
action: "read",
classification: doc.classification,
authorized: true,
});

// Configuration changes
auditLogger.warn("config.changed", {
userId: admin.id,
component: "pipeline",
changes: {
"embedder.model": {
from: "text-embedding-ada-002",
to: "text-embedding-3-large",
},
"retriever.topK": { from: 5, to: 10 },
},
});

Metrics Collection

Performance Metrics

import { MetricsCollector } from "@DevilsDev/rag-pipeline-utils/observability";

const metrics = new MetricsCollector({
prefix: "rag_pipeline",
labels: {
service: "rag-api",
version: process.env.APP_VERSION,
environment: process.env.NODE_ENV,
},
exporters: [
{
type: "prometheus",
port: 9090,
path: "/metrics",
},
{
type: "statsd",
host: "statsd.monitoring.svc.cluster.local",
port: 8125,
},
],
});

// Request metrics
const requestDuration = metrics.histogram("request_duration_seconds", {
help: "Request duration in seconds",
buckets: [0.1, 0.5, 1, 2, 5, 10],
});

const requestCount = metrics.counter("requests_total", {
help: "Total number of requests",
labelNames: ["method", "endpoint", "status"],
});

// Pipeline-specific metrics
const embeddingDuration = metrics.histogram("embedding_duration_seconds", {
help: "Embedding generation duration",
buckets: [0.1, 0.5, 1, 2, 5],
labelNames: ["provider", "model"],
});

const retrievalAccuracy = metrics.gauge("retrieval_accuracy", {
help: "Retrieval accuracy score",
labelNames: ["retriever", "query_type"],
});

const tokenUsage = metrics.counter("tokens_used_total", {
help: "Total tokens consumed",
labelNames: ["provider", "model", "operation"],
});

Business Metrics

// User engagement metrics
const queryCount = metrics.counter("queries_total", {
help: "Total queries processed",
labelNames: ["user_type", "domain"],
});

const userSatisfaction = metrics.histogram("user_satisfaction_score", {
help: "User satisfaction ratings",
buckets: [1, 2, 3, 4, 5],
});

// Cost tracking
const apiCosts = metrics.counter("api_costs_usd", {
help: "API costs in USD",
labelNames: ["provider", "service"],
});

// Data quality metrics
const documentFreshness = metrics.gauge("document_freshness_hours", {
help: "Age of documents in hours",
labelNames: ["source", "category"],
});

Distributed Tracing

Trace Configuration

import { DistributedTracer } from "@DevilsDev/rag-pipeline-utils/observability";

const tracer = new DistributedTracer({
serviceName: "rag-pipeline",
version: process.env.APP_VERSION,
environment: process.env.NODE_ENV,
exporters: [
{
type: "jaeger",
endpoint:
"http://jaeger-collector.monitoring.svc.cluster.local:14268/api/traces",
},
{
type: "zipkin",
endpoint: "http://zipkin.monitoring.svc.cluster.local:9411/api/v2/spans",
},
],
sampling: {
type: "probabilistic",
rate: 0.1, // 10% sampling rate
},
});

Pipeline Tracing

class RagPipeline {
async query(prompt, options = {}) {
const span = tracer.startSpan("rag.query", {
tags: {
"query.length": prompt.length,
"query.type": options.type || "general",
"user.id": options.userId,
},
});

try {
// Trace embedding generation
const embeddingSpan = tracer.startSpan("rag.embed", { parent: span });
const queryEmbedding = await this.embedder.embed(prompt);
embeddingSpan.setTag("embedding.dimensions", queryEmbedding.length);
embeddingSpan.finish();

// Trace retrieval
const retrievalSpan = tracer.startSpan("rag.retrieve", { parent: span });
const context = await this.retriever.search(queryEmbedding, options);
retrievalSpan.setTag("retrieval.results", context.length);
retrievalSpan.setTag("retrieval.topK", options.topK || 5);
retrievalSpan.finish();

// Trace LLM generation
const generationSpan = tracer.startSpan("rag.generate", { parent: span });
const response = await this.llm.generate(prompt, context);
generationSpan.setTag("generation.tokens", response.usage.totalTokens);
generationSpan.setTag("generation.model", this.llm.model);
generationSpan.finish();

span.setTag("query.success", true);
return response;
} catch (error) {
span.setTag("query.success", false);
span.setTag("error.name", error.name);
span.setTag("error.message", error.message);
throw error;
} finally {
span.finish();
}
}
}

Cross-Service Tracing

// Propagate trace context across service boundaries
app.use((req, res, next) => {
const traceContext = tracer.extract(req.headers);
req.traceContext = traceContext;
next();
});

// External API calls with trace propagation
class OpenAIEmbedder {
async embed(texts) {
const span = tracer.startSpan("openai.embed", {
parent: this.traceContext,
});

const headers = {};
tracer.inject(span, headers);

try {
const response = await fetch("https://api.openai.com/v1/embeddings", {
method: "POST",
headers: {
...headers,
Authorization: `Bearer ${this.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
input: texts,
model: this.model,
}),
});

span.setTag("openai.model", this.model);
span.setTag("openai.tokens", response.usage.total_tokens);
span.setTag("http.status_code", response.status);

return response.data;
} catch (error) {
span.setTag("error", true);
span.log({ error: error.message });
throw error;
} finally {
span.finish();
}
}
}

Alerting & Incident Response

Alert Configuration

import { AlertManager } from "@DevilsDev/rag-pipeline-utils/observability";

const alertManager = new AlertManager({
channels: [
{
name: "slack-critical",
type: "slack",
webhook: process.env.SLACK_WEBHOOK_CRITICAL,
severity: ["critical", "high"],
},
{
name: "pagerduty",
type: "pagerduty",
integrationKey: process.env.PAGERDUTY_INTEGRATION_KEY,
severity: ["critical"],
},
{
name: "email-team",
type: "email",
recipients: ["engineering@company.com"],
severity: ["high", "medium"],
},
],
});

// Define alert rules
alertManager.defineAlert("high-error-rate", {
condition: 'rate(http_requests_total{status=~"5.."}[5m]) > 0.05',
duration: "2m",
severity: "high",
summary: "High error rate detected",
description: "Error rate is above 5% for 2 minutes",
runbook: "https://wiki.company.com/runbooks/high-error-rate",
});

alertManager.defineAlert("embedding-latency", {
condition:
"histogram_quantile(0.95, rate(embedding_duration_seconds_bucket[5m])) > 2",
duration: "5m",
severity: "medium",
summary: "High embedding latency",
description: "95th percentile embedding latency is above 2 seconds",
});

alertManager.defineAlert("api-quota-exhausted", {
condition: "openai_quota_remaining < 0.1",
duration: "1m",
severity: "critical",
summary: "OpenAI API quota nearly exhausted",
description: "Less than 10% of OpenAI quota remaining",
});

Automated Incident Response

// Incident response automation
alertManager.on("alert:triggered", async (alert) => {
if (alert.severity === "critical") {
// Auto-scale resources
await scaleResources({
service: "rag-pipeline",
replicas: Math.min(currentReplicas * 2, maxReplicas),
});

// Enable circuit breaker
await enableCircuitBreaker("external-apis");

// Switch to backup providers
if (alert.name === "api-quota-exhausted") {
await switchToBackupProvider("embedder");
}
}

// Create incident ticket
const incident = await createIncident({
title: alert.summary,
description: alert.description,
severity: alert.severity,
assignee: getOnCallEngineer(),
labels: ["rag-pipeline", "production"],
});

// Update alert with incident link
alert.incidentId = incident.id;
});

Grafana Dashboards

Operational Dashboard

{
"dashboard": {
"title": "RAG Pipeline Operations",
"panels": [
{
"title": "Request Rate",
"type": "stat",
"targets": [
{
"expr": "sum(rate(rag_pipeline_requests_total[5m]))",
"legendFormat": "Requests/sec"
}
]
},
{
"title": "Response Time",
"type": "timeseries",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(rag_pipeline_request_duration_seconds_bucket[5m]))",
"legendFormat": "p50"
},
{
"expr": "histogram_quantile(0.95, rate(rag_pipeline_request_duration_seconds_bucket[5m]))",
"legendFormat": "p95"
},
{
"expr": "histogram_quantile(0.99, rate(rag_pipeline_request_duration_seconds_bucket[5m]))",
"legendFormat": "p99"
}
]
},
{
"title": "Error Rate",
"type": "timeseries",
"targets": [
{
"expr": "sum(rate(rag_pipeline_requests_total{status=~\"5..\"}[5m])) / sum(rate(rag_pipeline_requests_total[5m]))",
"legendFormat": "Error Rate"
}
]
},
{
"title": "SLO Status",
"type": "table",
"targets": [
{
"expr": "slo_error_budget_remaining",
"format": "table"
}
]
}
]
}
}

Business Metrics Dashboard

{
"dashboard": {
"title": "RAG Pipeline Business Metrics",
"panels": [
{
"title": "Daily Active Users",
"type": "stat",
"targets": [
{
"expr": "count(count by (user_id) (rag_pipeline_queries_total[24h]))",
"legendFormat": "DAU"
}
]
},
{
"title": "Query Volume by Domain",
"type": "piechart",
"targets": [
{
"expr": "sum by (domain) (rate(rag_pipeline_queries_total[24h]))",
"legendFormat": "{{domain}}"
}
]
},
{
"title": "API Costs",
"type": "timeseries",
"targets": [
{
"expr": "sum by (provider) (rate(rag_pipeline_api_costs_usd[1h]))",
"legendFormat": "{{provider}}"
}
]
},
{
"title": "User Satisfaction",
"type": "gauge",
"targets": [
{
"expr": "avg(rag_pipeline_user_satisfaction_score)",
"legendFormat": "Average Rating"
}
]
}
]
}
}

Health Checks & Readiness Probes

Health Check Implementation

import { HealthChecker } from "@DevilsDev/rag-pipeline-utils/observability";

const healthChecker = new HealthChecker({
checks: [
{
name: "database",
check: async () => {
const result = await db.ping();
return { status: "healthy", latency: result.latency };
},
timeout: 5000,
critical: true,
},
{
name: "vector-store",
check: async () => {
const result = await vectorStore.healthCheck();
return { status: result.status, indexCount: result.indexCount };
},
timeout: 3000,
critical: true,
},
{
name: "openai-api",
check: async () => {
const result = await openaiClient.healthCheck();
return { status: "healthy", quotaRemaining: result.quota };
},
timeout: 10000,
critical: false,
},
],
});

// Express health endpoints
app.get("/health", async (req, res) => {
const health = await healthChecker.check();
res.status(health.status === "healthy" ? 200 : 503).json(health);
});

app.get("/ready", async (req, res) => {
const readiness = await healthChecker.checkReadiness();
res.status(readiness.ready ? 200 : 503).json(readiness);
});

This comprehensive observability framework ensures your RAG pipeline operates reliably in production with full visibility into performance, errors, and business metrics. Continue to Security for information about securing your deployment.