Enterprise Features
@DevilsDev/rag-pipeline-utils provides comprehensive enterprise-grade capabilities for mission-critical RAG deployments. This guide covers the advanced features designed for production environments requiring high availability, security, and observability.
๐ข Enterprise Architecture Overviewโ
Production-Ready Infrastructureโ
The enterprise architecture is built on proven patterns and practices:
- Dependency Injection Container: IoC pattern for modular, testable code
- SLO Monitoring System: Service Level Objectives with error budgets and alerting
- Semantic Release Automation: Automated versioning and deployment pipelines
- External API Mocking: Deterministic testing with network simulation
- Structured Observability: Comprehensive logging, metrics, and tracing
Enterprise Deployment Modelโ
graph TB
subgraph "Production Environment"
LB[Load Balancer]
API1[RAG API Instance 1]
API2[RAG API Instance 2]
API3[RAG API Instance 3]
end
subgraph "Data Layer"
VDB[(Vector Database)]
Cache[(Redis Cache)]
Metrics[(Metrics Store)]
end
subgraph "Monitoring Stack"
Prometheus[Prometheus]
Grafana[Grafana]
AlertManager[Alert Manager]
end
subgraph "External Services"
OpenAI[OpenAI API]
Pinecone[Pinecone]
Azure[Azure OpenAI]
end
LB --> API1
LB --> API2
LB --> API3
API1 --> VDB
API2 --> VDB
API3 --> VDB
API1 --> Cache
API2 --> Cache
API3 --> Cache
API1 --> OpenAI
API2 --> Pinecone
API3 --> Azure
API1 --> Prometheus
API2 --> Prometheus
API3 --> Prometheus
Prometheus --> Grafana
Prometheus --> AlertManager
๐ง Dependency Injection Containerโ
The enterprise IoC container provides modular, testable architecture with lifecycle management.
Service Registrationโ
import { DependencyContainer } from '@DevilsDev/rag-pipeline-utils/enterprise';
const container = new DependencyContainer();
// Register singleton services
container.register('logger', () => new StructuredLogger({
level: 'info',
correlationId: true,
structured: true
}), { lifecycle: 'singleton' });
// Register factory services with dependencies
container.register('embedder', (logger) => new OpenAIEmbedder({
apiKey: process.env.OPENAI_API_KEY,
logger
}), {
lifecycle: 'factory',
dependencies: ['logger']
});
// Register with interface validation
container.registerInterface('retriever', PineconeRetriever, {
implements: ['RetrieverPlugin'],
dependencies: ['logger', 'metrics']
});
Service Resolutionโ
// Resolve services with automatic dependency injection
const pipeline = container.resolve('ragPipeline');
const logger = container.resolve('logger');
const metrics = container.resolve('metricsCollector');
// Conditional resolution
const embedder = container.resolveConditional('embedder', {
environment: 'production',
feature: 'advanced-embeddings'
});
Lifecycle Managementโ
// Graceful shutdown with dependency cleanup
process.on('SIGTERM', async () => {
await container.shutdown({
timeout: 30000,
graceful: true
});
});
// Health checks for registered services
const healthStatus = await container.healthCheck();
console.log('Service Health:', healthStatus);
๐ SLO Monitoring Systemโ
Built-in Service Level Objectives tracking with error budgets, alerting, and automated remediation.
SLO Definitionโ
import { SLOMonitor } from '@DevilsDev/rag-pipeline-utils/observability';
const sloMonitor = new SLOMonitor({
storage: 'prometheus',
alerting: {
webhook: process.env.SLACK_WEBHOOK_URL,
channels: ['#rag-alerts', '#engineering']
}
});
// Define availability SLO
sloMonitor.defineSLO('rag-pipeline-availability', {
objective: 0.999, // 99.9% availability
window: '30d',
errorBudget: {
policy: 'burn-rate',
alertThresholds: [2, 5, 10] // 2x, 5x, 10x burn rates
},
indicators: [
{
name: 'http_requests_success_rate',
query: 'sum(rate(http_requests_total{status!~"5.."}[5m])) / sum(rate(http_requests_total[5m]))'
}
]
});
// Define latency SLO
sloMonitor.defineSLO('rag-pipeline-latency', {
objective: 0.95, // 95% of requests under 2s
window: '7d',
threshold: 2000, // 2 seconds
indicators: [
{
name: 'response_time_p95',
query: 'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))'
}
]
});
Real-Time Monitoringโ
// Record measurements
sloMonitor.recordMeasurement('rag-pipeline-availability', {
success: true,
timestamp: Date.now(),
metadata: { endpoint: '/query', method: 'POST' }
});
sloMonitor.recordMeasurement('rag-pipeline-latency', {
success: true,
latency: 1500, // 1.5 seconds
timestamp: Date.now()
});
// Get error budget status
const budgetStatus = await sloMonitor.getErrorBudgetStatus('rag-pipeline-availability');
console.log('Error Budget Remaining:', budgetStatus.remaining);
console.log('Current Burn Rate:', budgetStatus.burnRate);
// Set up automated alerting
sloMonitor.on('slo:violation', (event) => {
console.error('SLO Violation:', event);
// Trigger automated remediation
if (event.severity === 'critical') {
triggerAutoScaling();
}
});
SLO Dashboardโ
// Generate SLO dashboard data
const dashboardData = await sloMonitor.generateDashboard({
timeRange: '24h',
slos: ['rag-pipeline-availability', 'rag-pipeline-latency'],
includeForecasting: true
});
// Export SLO reports
const report = await sloMonitor.generateReport({
format: 'pdf',
period: 'monthly',
recipients: ['engineering@company.com']
});
๐งช External API Mocking Infrastructureโ
Deterministic testing infrastructure with network simulation for reliable CI/CD pipelines.
Mock Configurationโ
import { ExternalAPIMocker } from '@DevilsDev/rag-pipeline-utils/testing';
const mocker = new ExternalAPIMocker({
deterministic: true,
seed: 'test-seed-123'
});
// Mock OpenAI API
mocker.mockAPI('openai', {
baseURL: 'https://api.openai.com',
responses: {
'/v1/embeddings': {
success: {
data: [{ embedding: [0.1, 0.2, 0.3] }],
usage: { total_tokens: 10 }
},
errors: {
rate_limit: { status: 429, message: 'Rate limit exceeded' },
invalid_key: { status: 401, message: 'Invalid API key' }
}
},
'/v1/chat/completions': {
success: {
choices: [{ message: { content: 'Mocked response' } }],
usage: { total_tokens: 50 }
}
}
},
latency: { min: 100, max: 500 },
errorRate: 0.05,
rateLimiting: {
requestsPerMinute: 60,
tokensPerMinute: 150000
}
});
// Mock Pinecone API
mocker.mockAPI('pinecone', {
baseURL: 'https://api.pinecone.io',
responses: {
'/vectors/upsert': { success: { upsertedCount: 100 } },
'/query': {
success: {
matches: [
{ id: 'doc1', score: 0.95, metadata: { text: 'Sample text' } }
]
}
}
},
networkConditions: {
bandwidth: '100mbps',
latency: 50,
packetLoss: 0.001
}
});
Test Integrationโ
// Jest setup with mocking
beforeAll(async () => {
await mocker.start();
// Configure pipeline to use mocked endpoints
process.env.OPENAI_BASE_URL = mocker.getEndpoint('openai');
process.env.PINECONE_BASE_URL = mocker.getEndpoint('pinecone');
});
afterAll(async () => {
await mocker.stop();
});
// Deterministic test scenarios
test('handles API rate limiting gracefully', async () => {
// Configure mock to return rate limit errors
mocker.setErrorRate('openai', 1.0, 'rate_limit');
const pipeline = createRagPipeline(testConfig);
// Should retry and eventually succeed
const result = await pipeline.query('test query');
expect(result).toBeDefined();
// Verify retry behavior
const metrics = mocker.getMetrics('openai');
expect(metrics.retryCount).toBeGreaterThan(0);
});
// Network condition simulation
test('handles poor network conditions', async () => {
mocker.setNetworkConditions('openai', {
latency: 2000, // 2s latency
packetLoss: 0.1 // 10% packet loss
});
const startTime = Date.now();
const result = await pipeline.query('test query');
const duration = Date.now() - startTime;
expect(duration).toBeGreaterThan(2000);
expect(result).toBeDefined();
});
๐ค Advanced AI Capabilitiesโ
Multi-Modal Processingโ
Handle text, images, and structured data in unified pipelines:
import { MultiModalProcessor } from '@DevilsDev/rag-pipeline-utils/ai';
const processor = new MultiModalProcessor({
textProcessor: 'openai-embedder',
imageProcessor: 'clip-embedder',
structuredProcessor: 'table-embedder',
fusionStrategy: 'weighted-average'
});
// Process multi-modal document
const result = await processor.process({
modalities: [
{ type: 'text', content: 'Document text content' },
{ type: 'image', content: imageBuffer, metadata: { format: 'png' } },
{ type: 'table', content: csvData, metadata: { headers: true } }
],
fusionWeights: { text: 0.5, image: 0.3, table: 0.2 }
});
Federated Learningโ
Distributed model training with privacy preservation:
import { FederatedLearningCoordinator } from '@DevilsDev/rag-pipeline-utils/ai';
const coordinator = new FederatedLearningCoordinator({
aggregationStrategy: 'federated-averaging',
privacyBudget: 1.0,
differentialPrivacy: true
});
// Coordinate federated training
const globalModel = await coordinator.coordinateTraining({
participants: federatedNodes,
rounds: 10,
clientSampleRatio: 0.1,
localEpochs: 5
});
Adaptive Retrievalโ
Dynamic retrieval strategies with performance optimization:
import { AdaptiveRetrievalEngine } from '@DevilsDev/rag-pipeline-utils/ai';
const engine = new AdaptiveRetrievalEngine({
strategies: ['semantic', 'keyword', 'hybrid', 'graph-based'],
adaptationPolicy: 'performance-based',
learningRate: 0.01
});
// Adaptive retrieval with strategy selection
const results = await engine.adaptiveRetrieve(query, {
domain: 'technical-documentation',
userContext: { expertise: 'intermediate' },
performanceRequirements: { maxLatency: 500 }
});
๐ Enterprise Securityโ
Authentication & Authorizationโ
import { EnterpriseAuth } from '@DevilsDev/rag-pipeline-utils/enterprise';
const auth = new EnterpriseAuth({
provider: 'azure-ad',
tenantId: process.env.AZURE_TENANT_ID,
clientId: process.env.AZURE_CLIENT_ID,
rbac: {
roles: ['admin', 'user', 'readonly'],
permissions: {
'admin': ['*'],
'user': ['query', 'ingest'],
'readonly': ['query']
}
}
});
// Middleware for authentication
app.use(auth.middleware());
// Role-based access control
app.post('/ingest', auth.requireRole('user'), async (req, res) => {
// Handle ingestion
});
Audit Loggingโ
import { AuditLogger } from '@DevilsDev/rag-pipeline-utils/enterprise';
const auditLogger = new AuditLogger({
storage: 'elasticsearch',
retention: '7y', // 7 years for compliance
encryption: true,
immutable: true
});
// Log security-sensitive operations
auditLogger.log('document.ingest', {
userId: req.user.id,
documentId: doc.id,
classification: 'confidential',
source: req.ip,
timestamp: new Date().toISOString()
});
๐ Production Deploymentโ
Kubernetes Deploymentโ
# deployment/kubernetes/rag-pipeline-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-pipeline
labels:
app: rag-pipeline
spec:
replicas: 3
selector:
matchLabels:
app: rag-pipeline
template:
metadata:
labels:
app: rag-pipeline
spec:
containers:
- name: rag-pipeline
image: devilsdev/rag-pipeline-utils:latest
ports:
- containerPort: 3000
env:
- name: NODE_ENV
value: "production"
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: openai-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
Helm Chart Configurationโ
# deployment/helm/rag-pipeline/values.yaml
replicaCount: 3
image:
repository: devilsdev/rag-pipeline-utils
tag: "latest"
pullPolicy: IfNotPresent
service:
type: ClusterIP
port: 80
targetPort: 3000
ingress:
enabled: true
className: "nginx"
annotations:
cert-manager.io/cluster-issuer: "letsencrypt-prod"
hosts:
- host: rag-api.company.com
paths:
- path: /
pathType: Prefix
tls:
- secretName: rag-api-tls
hosts:
- rag-api.company.com
monitoring:
enabled: true
serviceMonitor:
enabled: true
interval: 30s
scrapeTimeout: 10s
autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
targetMemoryUtilizationPercentage: 80
This enterprise feature set enables @DevilsDev/rag-pipeline-utils to meet the demanding requirements of mission-critical production environments while maintaining the flexibility and ease of use that makes it valuable for development and prototyping.