Skip to main content
Version: 2.4.0-dev (Next)

Enterprise Features

@DevilsDev/rag-pipeline-utils provides comprehensive enterprise-grade capabilities for mission-critical RAG deployments. This guide covers the advanced features designed for production environments requiring high availability, security, and observability.


Enterprise Architecture Overview

Production-Ready Infrastructure

The enterprise architecture is built on proven patterns and practices:

  • Dependency Injection Container: IoC pattern for modular, testable code
  • SLO Monitoring System: Service Level Objectives with error budgets and alerting
  • Semantic Release Automation: Automated versioning and deployment pipelines
  • External API Mocking: Deterministic testing with network simulation
  • Structured Observability: Comprehensive logging, metrics, and tracing

Enterprise Deployment Model

graph TB
subgraph "Production Environment"
LB[Load Balancer]
API1[RAG API Instance 1]
API2[RAG API Instance 2]
API3[RAG API Instance 3]
end

subgraph "Data Layer"
VDB[(Vector Database)]
Cache[(Redis Cache)]
Metrics[(Metrics Store)]
end

subgraph "Monitoring Stack"
Prometheus[Prometheus]
Grafana[Grafana]
AlertManager[Alert Manager]
end

subgraph "External Services"
OpenAI[OpenAI API]
Pinecone[Pinecone]
Azure[Azure OpenAI]
end

LB --> API1
LB --> API2
LB --> API3

API1 --> VDB
API2 --> VDB
API3 --> VDB

API1 --> Cache
API2 --> Cache
API3 --> Cache

API1 --> OpenAI
API2 --> Pinecone
API3 --> Azure

API1 --> Prometheus
API2 --> Prometheus
API3 --> Prometheus

Prometheus --> Grafana
Prometheus --> AlertManager

Dependency Injection Container

The enterprise IoC container provides modular, testable architecture with lifecycle management.

Service Registration

import { DependencyContainer } from "@DevilsDev/rag-pipeline-utils/enterprise";

const container = new DependencyContainer();

// Register singleton services
container.register(
"logger",
() =>
new StructuredLogger({
level: "info",
correlationId: true,
structured: true,
}),
{ lifecycle: "singleton" },
);

// Register factory services with dependencies
container.register(
"embedder",
(logger) =>
new OpenAIEmbedder({
apiKey: process.env.OPENAI_API_KEY,
logger,
}),
{
lifecycle: "factory",
dependencies: ["logger"],
},
);

// Register with interface validation
container.registerInterface("retriever", PineconeRetriever, {
implements: ["RetrieverPlugin"],
dependencies: ["logger", "metrics"],
});

Service Resolution

// Resolve services with automatic dependency injection
const pipeline = container.resolve("ragPipeline");
const logger = container.resolve("logger");
const metrics = container.resolve("metricsCollector");

// Conditional resolution
const embedder = container.resolveConditional("embedder", {
environment: "production",
feature: "advanced-embeddings",
});

Lifecycle Management

// Graceful shutdown with dependency cleanup
process.on("SIGTERM", async () => {
await container.shutdown({
timeout: 30000,
graceful: true,
});
});

// Health checks for registered services
const healthStatus = await container.healthCheck();
console.log("Service Health:", healthStatus);

SLO Monitoring System

Built-in Service Level Objectives tracking with error budgets, alerting, and automated remediation.

SLO Definition

import { SLOMonitor } from "@DevilsDev/rag-pipeline-utils/observability";

const sloMonitor = new SLOMonitor({
storage: "prometheus",
alerting: {
webhook: process.env.SLACK_WEBHOOK_URL,
channels: ["#rag-alerts", "#engineering"],
},
});

// Define availability SLO
sloMonitor.defineSLO("rag-pipeline-availability", {
objective: 0.999, // 99.9% availability
window: "30d",
errorBudget: {
policy: "burn-rate",
alertThresholds: [2, 5, 10], // 2x, 5x, 10x burn rates
},
indicators: [
{
name: "http_requests_success_rate",
query:
'sum(rate(http_requests_total{status!~"5.."}[5m])) / sum(rate(http_requests_total[5m]))',
},
],
});

// Define latency SLO
sloMonitor.defineSLO("rag-pipeline-latency", {
objective: 0.95, // 95% of requests under 2s
window: "7d",
threshold: 2000, // 2 seconds
indicators: [
{
name: "response_time_p95",
query:
"histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
},
],
});

Real-Time Monitoring

// Record measurements
sloMonitor.recordMeasurement("rag-pipeline-availability", {
success: true,
timestamp: Date.now(),
metadata: { endpoint: "/query", method: "POST" },
});

sloMonitor.recordMeasurement("rag-pipeline-latency", {
success: true,
latency: 1500, // 1.5 seconds
timestamp: Date.now(),
});

// Get error budget status
const budgetStatus = await sloMonitor.getErrorBudgetStatus(
"rag-pipeline-availability",
);
console.log("Error Budget Remaining:", budgetStatus.remaining);
console.log("Current Burn Rate:", budgetStatus.burnRate);

// Set up automated alerting
sloMonitor.on("slo:violation", (event) => {
console.error("SLO Violation:", event);
// Trigger automated remediation
if (event.severity === "critical") {
triggerAutoScaling();
}
});

SLO Dashboard

// Generate SLO dashboard data
const dashboardData = await sloMonitor.generateDashboard({
timeRange: "24h",
slos: ["rag-pipeline-availability", "rag-pipeline-latency"],
includeForecasting: true,
});

// Export SLO reports
const report = await sloMonitor.generateReport({
format: "pdf",
period: "monthly",
recipients: ["engineering@company.com"],
});

External API Mocking Infrastructure

Deterministic testing infrastructure with network simulation for reliable CI/CD pipelines.

Mock Configuration

import { ExternalAPIMocker } from "@DevilsDev/rag-pipeline-utils/testing";

const mocker = new ExternalAPIMocker({
deterministic: true,
seed: "test-seed-123",
});

// Mock OpenAI API
mocker.mockAPI("openai", {
baseURL: "https://api.openai.com",
responses: {
"/v1/embeddings": {
success: {
data: [{ embedding: [0.1, 0.2, 0.3] }],
usage: { total_tokens: 10 },
},
errors: {
rate_limit: { status: 429, message: "Rate limit exceeded" },
invalid_key: { status: 401, message: "Invalid API key" },
},
},
"/v1/chat/completions": {
success: {
choices: [{ message: { content: "Mocked response" } }],
usage: { total_tokens: 50 },
},
},
},
latency: { min: 100, max: 500 },
errorRate: 0.05,
rateLimiting: {
requestsPerMinute: 60,
tokensPerMinute: 150000,
},
});

// Mock Pinecone API
mocker.mockAPI("pinecone", {
baseURL: "https://api.pinecone.io",
responses: {
"/vectors/upsert": { success: { upsertedCount: 100 } },
"/query": {
success: {
matches: [
{ id: "doc1", score: 0.95, metadata: { text: "Sample text" } },
],
},
},
},
networkConditions: {
bandwidth: "100mbps",
latency: 50,
packetLoss: 0.001,
},
});

Test Integration

// Jest setup with mocking
beforeAll(async () => {
await mocker.start();

// Configure pipeline to use mocked endpoints
process.env.OPENAI_BASE_URL = mocker.getEndpoint("openai");
process.env.PINECONE_BASE_URL = mocker.getEndpoint("pinecone");
});

afterAll(async () => {
await mocker.stop();
});

// Deterministic test scenarios
test("handles API rate limiting gracefully", async () => {
// Configure mock to return rate limit errors
mocker.setErrorRate("openai", 1.0, "rate_limit");

const pipeline = createRagPipeline(testConfig);

// Should retry and eventually succeed
const result = await pipeline.query("test query");
expect(result).toBeDefined();

// Verify retry behavior
const metrics = mocker.getMetrics("openai");
expect(metrics.retryCount).toBeGreaterThan(0);
});

// Network condition simulation
test("handles poor network conditions", async () => {
mocker.setNetworkConditions("openai", {
latency: 2000, // 2s latency
packetLoss: 0.1, // 10% packet loss
});

const startTime = Date.now();
const result = await pipeline.query("test query");
const duration = Date.now() - startTime;

expect(duration).toBeGreaterThan(2000);
expect(result).toBeDefined();
});

Advanced AI Capabilities

Multi-Modal Processing

Handle text, images, and structured data in unified pipelines:

import { MultiModalProcessor } from "@DevilsDev/rag-pipeline-utils/ai";

const processor = new MultiModalProcessor({
textProcessor: "openai-embedder",
imageProcessor: "clip-embedder",
structuredProcessor: "table-embedder",
fusionStrategy: "weighted-average",
});

// Process multi-modal document
const result = await processor.process({
modalities: [
{ type: "text", content: "Document text content" },
{ type: "image", content: imageBuffer, metadata: { format: "png" } },
{ type: "table", content: csvData, metadata: { headers: true } },
],
fusionWeights: { text: 0.5, image: 0.3, table: 0.2 },
});

Federated Learning

Distributed model training with privacy preservation:

import { FederatedLearningCoordinator } from "@DevilsDev/rag-pipeline-utils/ai";

const coordinator = new FederatedLearningCoordinator({
aggregationStrategy: "federated-averaging",
privacyBudget: 1.0,
differentialPrivacy: true,
});

// Coordinate federated training
const globalModel = await coordinator.coordinateTraining({
participants: federatedNodes,
rounds: 10,
clientSampleRatio: 0.1,
localEpochs: 5,
});

Adaptive Retrieval

Dynamic retrieval strategies with performance optimization:

import { AdaptiveRetrievalEngine } from "@DevilsDev/rag-pipeline-utils/ai";

const engine = new AdaptiveRetrievalEngine({
strategies: ["semantic", "keyword", "hybrid", "graph-based"],
adaptationPolicy: "performance-based",
learningRate: 0.01,
});

// Adaptive retrieval with strategy selection
const results = await engine.adaptiveRetrieve(query, {
domain: "technical-documentation",
userContext: { expertise: "intermediate" },
performanceRequirements: { maxLatency: 500 },
});

Enterprise Security

Authentication & Authorization

import { EnterpriseAuth } from "@DevilsDev/rag-pipeline-utils/enterprise";

const auth = new EnterpriseAuth({
provider: "azure-ad",
tenantId: process.env.AZURE_TENANT_ID,
clientId: process.env.AZURE_CLIENT_ID,
rbac: {
roles: ["admin", "user", "readonly"],
permissions: {
admin: ["*"],
user: ["query", "ingest"],
readonly: ["query"],
},
},
});

// Middleware for authentication
app.use(auth.middleware());

// Role-based access control
app.post("/ingest", auth.requireRole("user"), async (req, res) => {
// Handle ingestion
});

Audit Logging

import { AuditLogger } from "@DevilsDev/rag-pipeline-utils/enterprise";

const auditLogger = new AuditLogger({
storage: "elasticsearch",
retention: "7y", // 7 years for compliance
encryption: true,
immutable: true,
});

// Log security-sensitive operations
auditLogger.log("document.ingest", {
userId: req.user.id,
documentId: doc.id,
classification: "confidential",
source: req.ip,
timestamp: new Date().toISOString(),
});

Production Deployment

Kubernetes Deployment

# deployment/kubernetes/rag-pipeline-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-pipeline
labels:
app: rag-pipeline
spec:
replicas: 3
selector:
matchLabels:
app: rag-pipeline
template:
metadata:
labels:
app: rag-pipeline
spec:
containers:
- name: rag-pipeline
image: devilsdev/rag-pipeline-utils:latest
ports:
- containerPort: 3000
env:
- name: NODE_ENV
value: "production"
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: openai-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 3000
initialDelaySeconds: 5
periodSeconds: 5

Helm Chart Configuration

# deployment/helm/rag-pipeline/values.yaml
replicaCount: 3

image:
repository: devilsdev/rag-pipeline-utils
tag: "latest"
pullPolicy: IfNotPresent

service:
type: ClusterIP
port: 80
targetPort: 3000

ingress:
enabled: true
className: "nginx"
annotations:
cert-manager.io/cluster-issuer: "letsencrypt-prod"
hosts:
- host: rag-api.company.com
paths:
- path: /
pathType: Prefix
tls:
- secretName: rag-api-tls
hosts:
- rag-api.company.com

monitoring:
enabled: true
serviceMonitor:
enabled: true
interval: 30s
scrapeTimeout: 10s

autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
targetMemoryUtilizationPercentage: 80

This enterprise feature set enables @DevilsDev/rag-pipeline-utils to meet the demanding requirements of mission-critical production environments while maintaining the flexibility and ease of use that makes it valuable for development and prototyping.