Architecture
This comprehensive guide outlines the internal architecture of @DevilsDev/rag-pipeline-utils, emphasizing enterprise-grade modularity, plugin design patterns, and SOLID-compliant structure. Understanding this architecture is crucial for extending the system, creating custom plugins, and optimizing performance.
🏗️ Core Design Philosophy
The architecture is built on proven software engineering principles that ensure scalability, maintainability, and extensibility:
SOLID Principles Implementation
-
Single Responsibility Principle (SRP): Each component handles one specific domain concern
- Loaders only handle document ingestion
- Embedders only convert text to vectors
- Retrievers only manage vector storage and search
- LLMs only generate responses
-
Open/Closed Principle (OCP): System is open for extension but closed for modification
- New plugins can be added without changing core code
- Plugin interfaces define stable contracts
- Middleware can be injected without core changes
-
Liskov Substitution Principle (LSP): Any plugin can be replaced with another of the same type
- All PDF loaders implement the same interface
- OpenAI embedder can be swapped with Cohere embedder
- Vector stores are interchangeable
-
Interface Segregation Principle (ISP): Plugins only depend on interfaces they use
- Loaders don't depend on LLM interfaces
- Embedders don't need retriever methods
- Clean separation of concerns
-
Dependency Inversion Principle (DIP): High-level modules don't depend on low-level modules
- Pipeline depends on plugin abstractions, not implementations
- Configuration drives dependency injection
- Easy testing with mock implementations
Additional Architectural Principles
- Plugin-Based Architecture: Complete modularity with hot-swappable components
- Streaming-First Design: Async flows with real-time, token-by-token output
- Configuration-Driven: Environment-safe config via
.ragrc.json
files - Event-Driven Processing: Comprehensive event system for monitoring and hooks
- Fail-Fast Validation: Early error detection with detailed error messages
- Observability Built-In: Comprehensive logging, tracing, and metrics collection
🔧 System Architecture Overview
High-Level Component Diagram
graph TB
subgraph "Application Layer"
CLI[CLI Interface]
API[Programmatic API]
Dashboard[Evaluation Dashboard]
end
subgraph "Core Pipeline"
Factory[Pipeline Factory]
Registry[Plugin Registry]
Pipeline[RAG Pipeline]
Middleware[Middleware Stack]
end
subgraph "Plugin Ecosystem"
Loaders[Document Loaders]
Embedders[Embedding Models]
Retrievers[Vector Stores]
LLMs[Language Models]
Rerankers[Context Rerankers]
end
subgraph "Infrastructure"
Config[Configuration Manager]
Logger[Event Logger]
Metrics[Metrics Collector]
Cache[Caching Layer]
end
CLI --> Factory
API --> Factory
Dashboard --> API
Factory --> Registry
Factory --> Pipeline
Pipeline --> Middleware
Registry --> Loaders
Registry --> Embedders
Registry --> Retrievers
Registry --> LLMs
Registry --> Rerankers
Pipeline --> Config
Pipeline --> Logger
Pipeline --> Metrics
Pipeline --> Cache
Data Flow Architecture
sequenceDiagram
participant U as User
participant P as Pipeline
participant L as Loader
participant E as Embedder
participant R as Retriever
participant LLM as Language Model
participant Re as Reranker
Note over U,Re: Document Ingestion Phase
U->>P: ingest(documents)
P->>L: load(document)
L->>P: chunks[]
P->>E: embed(chunks)
E->>P: embeddings[]
P->>R: store(embeddings)
R->>P: success
Note over U,Re: Query Processing Phase
U->>P: query(prompt)
P->>E: embed(prompt)
E->>P: queryVector
P->>R: search(queryVector)
R->>P: candidates[]
P->>Re: rerank(candidates, prompt)
Re->>P: rankedContext[]
P->>LLM: generate(prompt, context)
LLM->>P: response
P->>U: answer + metadata
🔌 Plugin System Architecture
Plugin Registry Implementation
The PluginRegistry
is the heart of the plugin system, managing plugin lifecycle and dependency injection:
// Core plugin registry structure
class PluginRegistry {
private plugins: Map<string, Map<string, Plugin>> = new Map();
private contracts: Map<string, PluginContract> = new Map();
register<T extends Plugin>(
type: PluginType,
name: string,
plugin: T,
metadata?: PluginMetadata
): void {
// Validate plugin against contract
this.validateContract(type, plugin);
// Register plugin with metadata
if (!this.plugins.has(type)) {
this.plugins.set(type, new Map());
}
this.plugins.get(type)!.set(name, plugin);
// Emit registration event
this.eventEmitter.emit('plugin:registered', {
type, name, plugin, metadata
});
}
get<T extends Plugin>(type: PluginType, name: string): T {
const typePlugins = this.plugins.get(type);
if (!typePlugins?.has(name)) {
throw new PluginNotFoundError(`Plugin ${type}:${name} not found`);
}
return typePlugins.get(name) as T;
}
list(type?: PluginType): PluginInfo[] {
// Return available plugins with metadata
}
}
Plugin Contract System
Each plugin type implements a strict contract interface:
// Loader Plugin Contract
interface LoaderPlugin {
readonly metadata: PluginMetadata;
load(source: string | Buffer, options?: LoaderOptions): Promise<Document[]>;
supports(mimeType: string): boolean;
validate(source: string | Buffer): Promise<ValidationResult>;
}
// Embedder Plugin Contract
interface EmbedderPlugin {
readonly metadata: PluginMetadata;
embed(texts: string[], options?: EmbedderOptions): Promise<Embedding[]>;
embedSingle(text: string, options?: EmbedderOptions): Promise<Embedding>;
getDimensions(): number;
getMaxTokens(): number;
}
// Retriever Plugin Contract
interface RetrieverPlugin {
readonly metadata: PluginMetadata;
store(embeddings: EmbeddingDocument[]): Promise<void>;
search(query: Embedding, options?: SearchOptions): Promise<SearchResult[]>;
delete(ids: string[]): Promise<void>;
getStats(): Promise<IndexStats>;
}
// LLM Plugin Contract
interface LLMPlugin {
readonly metadata: PluginMetadata;
generate(prompt: string, context?: string[], options?: LLMOptions): AsyncIterable<string>;
generateSync(prompt: string, context?: string[], options?: LLMOptions): Promise<string>;
getTokenCount(text: string): number;
getMaxContextLength(): number;
}
Plugin Metadata System
interface PluginMetadata {
name: string;
version: string;
description: string;
author: string;
license: string;
homepage?: string;
repository?: string;
keywords: string[];
// Plugin-specific metadata
capabilities: string[];
requirements: {
node: string;
dependencies?: Record<string, string>;
environment?: string[];
};
// Performance characteristics
performance: {
throughput?: string;
latency?: string;
memoryUsage?: string;
};
// Configuration schema
configSchema?: JSONSchema;
}
🏭 Pipeline Factory Pattern
The createRagPipeline
function implements the Factory pattern to create configured pipeline instances:
export function createRagPipeline(config: PipelineConfig): RagPipeline {
// Initialize plugin registry
const registry = new PluginRegistry();
// Register built-in plugins
registerBuiltinPlugins(registry);
// Load custom plugins from config
if (config.plugins) {
await loadCustomPlugins(registry, config.plugins);
}
// Resolve plugin instances
const loader = registry.get('loader', config.loader);
const embedder = registry.get('embedder', config.embedder);
const retriever = registry.get('retriever', config.retriever);
const llm = registry.get('llm', config.llm);
const reranker = config.useReranker ?
registry.get('reranker', config.reranker || 'default') : null;
// Create middleware stack
const middleware = createMiddlewareStack(config.middleware || []);
// Initialize pipeline with dependencies
return new RagPipeline({
loader,
embedder,
retriever,
llm,
reranker,
middleware,
config: config.pipelineConfig || {},
eventEmitter: new EventEmitter(),
logger: createLogger(config.logging || {}),
metrics: new MetricsCollector(config.metrics || {})
});
}
🔄 Middleware Architecture
The middleware system allows for cross-cutting concerns like retry logic, caching, and monitoring:
// Middleware interface
interface Middleware {
name: string;
priority: number;
beforeLoad?(context: LoadContext): Promise<LoadContext>;
afterLoad?(context: LoadContext, result: Document[]): Promise<Document[]>;
beforeEmbed?(context: EmbedContext): Promise<EmbedContext>;
afterEmbed?(context: EmbedContext, result: Embedding[]): Promise<Embedding[]>;
beforeRetrieve?(context: RetrieveContext): Promise<RetrieveContext>;
afterRetrieve?(context: RetrieveContext, result: SearchResult[]): Promise<SearchResult[]>;
beforeGenerate?(context: GenerateContext): Promise<GenerateContext>;
afterGenerate?(context: GenerateContext, result: string): Promise<string>;
onError?(error: Error, context: any): Promise<void>;
}
// Built-in middleware examples
class RetryMiddleware implements Middleware {
constructor(private options: RetryOptions) {}
async beforeGenerate(context: GenerateContext): Promise<GenerateContext> {
context.retryCount = 0;
context.maxRetries = this.options.maxRetries;
return context;
}
async onError(error: Error, context: any): Promise<void> {
if (context.retryCount < context.maxRetries) {
context.retryCount++;
// Implement exponential backoff
await this.delay(Math.pow(2, context.retryCount) * 1000);
throw new RetryableError(error);
}
throw error;
}
}
class CachingMiddleware implements Middleware {
constructor(private cache: Cache) {}
async beforeEmbed(context: EmbedContext): Promise<EmbedContext> {
const cacheKey = this.generateCacheKey(context.texts);
const cached = await this.cache.get(cacheKey);
if (cached) {
context.cachedResult = cached;
}
return context;
}
async afterEmbed(context: EmbedContext, result: Embedding[]): Promise<Embedding[]> {
if (!context.cachedResult) {
const cacheKey = this.generateCacheKey(context.texts);
await this.cache.set(cacheKey, result, { ttl: 3600 });
}
return context.cachedResult || result;
}
}
📊 Event System & Observability
The architecture includes a comprehensive event system for monitoring, debugging, and analytics:
// Event types emitted by the system
interface PipelineEvents {
'pipeline:created': { config: PipelineConfig };
'pipeline:destroyed': { pipelineId: string };
'document:loading': { source: string, loader: string };
'document:loaded': { source: string, chunks: number, duration: number };
'document:error': { source: string, error: Error };
'embedding:started': { texts: number, embedder: string };
'embedding:completed': { embeddings: number, duration: number };
'embedding:cached': { texts: number, cacheHit: boolean };
'retrieval:query': { query: string, retriever: string };
'retrieval:results': { results: number, duration: number };
'generation:started': { prompt: string, llm: string };
'generation:token': { token: string, position: number };
'generation:completed': { response: string, duration: number, tokens: number };
'error:handled': { error: Error, context: string, recovered: boolean };
}
// Event-driven metrics collection
class MetricsCollector {
constructor(private config: MetricsConfig) {
this.setupEventListeners();
}
private setupEventListeners(): void {
// Track performance metrics
this.on('embedding:completed', (event) => {
this.histogram('embedding.duration', event.duration);
this.counter('embedding.requests').inc();
});
this.on('generation:completed', (event) => {
this.histogram('generation.duration', event.duration);
this.histogram('generation.tokens', event.tokens);
this.counter('generation.requests').inc();
});
// Track error rates
this.on('error:handled', (event) => {
this.counter('errors.total', {
context: event.context,
recovered: event.recovered.toString()
}).inc();
});
}
}
🔒 Security Architecture
Security is built into the architecture at multiple levels:
Plugin Sandboxing
- Plugins run in isolated contexts with limited system access
- API key management through secure configuration
- Input validation and sanitization at plugin boundaries
Configuration Security
- Encrypted storage of sensitive configuration data
- Environment variable injection for secrets
- Configuration schema validation
Network Security
- TLS/SSL enforcement for all external API calls
- Request rate limiting and timeout handling
- Secure credential rotation support
🏢 Enterprise Architecture Components
Dependency Injection Container
The enterprise-grade dependency injection system provides IoC (Inversion of Control) for modular, testable architecture:
// src/core/dependency-injection.js
class DependencyContainer {
private services: Map<string, ServiceDefinition> = new Map();
private instances: Map<string, any> = new Map();
register<T>(name: string, factory: ServiceFactory<T>, options?: ServiceOptions): void {
this.services.set(name, {
factory,
lifecycle: options?.lifecycle || 'singleton',
dependencies: options?.dependencies || []
});
}
resolve<T>(name: string): T {
if (this.instances.has(name)) {
return this.instances.get(name);
}
const service = this.services.get(name);
if (!service) {
throw new ServiceNotFoundError(`Service ${name} not registered`);
}
// Resolve dependencies
const dependencies = service.dependencies.map(dep => this.resolve(dep));
// Create instance
const instance = service.factory(...dependencies);
if (service.lifecycle === 'singleton') {
this.instances.set(name, instance);
}
return instance;
}
}
SLO Monitoring System
Built-in Service Level Objectives tracking with error budgets and alerting:
// src/observability/slo-monitor.js
class SLOMonitor {
private slos: Map<string, SLODefinition> = new Map();
private measurements: TimeSeriesDB = new TimeSeriesDB();
defineSLO(name: string, definition: SLODefinition): void {
this.slos.set(name, {
...definition,
errorBudget: this.calculateErrorBudget(definition)
});
}
recordMeasurement(sloName: string, success: boolean, latency?: number): void {
const measurement = {
timestamp: Date.now(),
success,
latency,
slo: sloName
};
this.measurements.record(measurement);
this.checkSLOViolation(sloName, measurement);
}
getErrorBudgetStatus(sloName: string): ErrorBudgetStatus {
const slo = this.slos.get(sloName);
const recentMeasurements = this.measurements.getRecent(sloName, slo.window);
return {
remaining: this.calculateRemainingBudget(slo, recentMeasurements),
burnRate: this.calculateBurnRate(recentMeasurements),
alerting: this.shouldAlert(slo, recentMeasurements)
};
}
}
External API Mocking Infrastructure
Deterministic test infrastructure with network simulation for reliable CI/CD:
// __tests__/mocks/external-apis.js
class ExternalAPIMocker {
private mocks: Map<string, MockDefinition> = new Map();
private networkSimulator: NetworkSimulator;
mockAPI(service: string, config: MockConfig): void {
this.mocks.set(service, {
responses: config.responses,
latency: config.latency || { min: 100, max: 500 },
errorRate: config.errorRate || 0.05,
rateLimiting: config.rateLimiting
});
}
async simulateRequest(service: string, request: APIRequest): Promise<APIResponse> {
const mock = this.mocks.get(service);
// Simulate network conditions
await this.networkSimulator.delay(mock.latency);
// Simulate errors
if (Math.random() < mock.errorRate) {
throw new MockNetworkError('Simulated network failure');
}
// Return mock response
return this.generateResponse(mock, request);
}
}
🚀 Advanced AI Architecture
Multi-Modal Processing Engine
Handle text, images, and structured data in unified pipelines:
// src/ai/multimodal/multi-modal-processor.js
class MultiModalProcessor {
private processors: Map<string, ModalityProcessor> = new Map();
async process(input: MultiModalInput): Promise<ProcessedContent> {
const results = await Promise.all(
input.modalities.map(async (modality) => {
const processor = this.processors.get(modality.type);
return processor.process(modality.content, modality.metadata);
})
);
return this.fuseResults(results, input.fusionStrategy);
}
}
Federated Learning Coordinator
Distributed model training with privacy-preserving aggregation:
// src/ai/federation/federated-learning-coordinator.js
class FederatedLearningCoordinator {
async coordinateTraining(participants: FederatedNode[]): Promise<GlobalModel> {
// Distribute training tasks
const localUpdates = await this.distributeTraining(participants);
// Aggregate updates with differential privacy
const aggregatedUpdate = await this.aggregateWithPrivacy(localUpdates);
// Update global model
return this.updateGlobalModel(aggregatedUpdate);
}
}
Adaptive Retrieval Engine
Dynamic retrieval strategies with performance optimization:
// src/ai/retrieval/adaptive-retrieval-engine.js
class AdaptiveRetrievalEngine {
async adaptiveRetrieve(query: Query, context: RetrievalContext): Promise<RetrievalResult> {
// Analyze query complexity
const strategy = await this.selectStrategy(query, context);
// Execute retrieval with chosen strategy
const results = await this.executeStrategy(strategy, query);
// Learn from results for future optimization
await this.updateStrategyPerformance(strategy, results);
return results;
}
}
🔧 Enhanced Developer Tools
CLI Doctor Diagnostics
Comprehensive system health checking and troubleshooting:
// src/cli/doctor-command.js
class DoctorCommand {
async runDiagnostics(): Promise<DiagnosticReport> {
const checks = [
this.checkNodeVersion(),
this.checkDependencies(),
this.checkConfiguration(),
this.checkPluginHealth(),
this.checkExternalServices(),
this.checkPerformanceBottlenecks()
];
const results = await Promise.allSettled(checks);
return this.generateReport(results);
}
}
Plugin Marketplace
Certified plugin ecosystem with discovery and installation workflows:
// src/core/plugin-marketplace/marketplace.js
class PluginMarketplace {
async discoverPlugins(criteria: SearchCriteria): Promise<PluginListing[]> {
const plugins = await this.searchRegistry(criteria);
return plugins.filter(plugin => this.meetsCertificationStandards(plugin));
}
async installPlugin(pluginId: string): Promise<InstallationResult> {
// Verify plugin certification
await this.verifyCertification(pluginId);
// Install with security scanning
return this.secureInstall(pluginId);
}
}
This enterprise-grade architecture enables @DevilsDev/rag-pipeline-utils to scale from simple prototypes to mission-critical production systems while maintaining flexibility, security, and observability. Continue to Usage for practical implementation examples, or explore Plugins to learn about creating custom components.