Architecture
This comprehensive guide outlines the internal architecture of @DevilsDev/rag-pipeline-utils, emphasizing enterprise-grade modularity, plugin design patterns, and SOLID-compliant structure. Understanding this architecture is crucial for extending the system, creating custom plugins, and optimizing performance.
Core Design Philosophy
The architecture is built on proven software engineering principles that ensure scalability, maintainability, and extensibility:
SOLID Principles Implementation
-
Single Responsibility Principle (SRP): Each component handles one specific domain concern
- Loaders only handle document ingestion
- Embedders only convert text to vectors
- Retrievers only manage vector storage and search
- LLMs only generate responses
-
Open/Closed Principle (OCP): System is open for extension but closed for modification
- New plugins can be added without changing core code
- Plugin interfaces define stable contracts
- Middleware can be injected without core changes
-
Liskov Substitution Principle (LSP): Any plugin can be replaced with another of the same type
- All PDF loaders implement the same interface
- OpenAI embedder can be swapped with Cohere embedder
- Vector stores are interchangeable
-
Interface Segregation Principle (ISP): Plugins only depend on interfaces they use
- Loaders don't depend on LLM interfaces
- Embedders don't need retriever methods
- Clean separation of concerns
-
Dependency Inversion Principle (DIP): High-level modules don't depend on low-level modules
- Pipeline depends on plugin abstractions, not implementations
- Configuration drives dependency injection
- Easy testing with mock implementations
Additional Architectural Principles
- Plugin-Based Architecture: Complete modularity with hot-swappable components
- Streaming-First Design: Async flows with real-time, token-by-token output
- Configuration-Driven: Environment-safe config via
.ragrc.jsonfiles - Event-Driven Processing: Comprehensive event system for monitoring and hooks
- Fail-Fast Validation: Early error detection with detailed error messages
- Observability Built-In: Comprehensive logging, tracing, and metrics collection
System Architecture Overview
High-Level Component Diagram
graph TB
subgraph "Application Layer"
CLI[CLI Interface]
API[Programmatic API]
Dashboard[Evaluation Dashboard]
end
subgraph "Core Pipeline"
Factory[Pipeline Factory]
Registry[Plugin Registry]
Pipeline[RAG Pipeline]
Middleware[Middleware Stack]
end
subgraph "Plugin Ecosystem"
Loaders[Document Loaders]
Embedders[Embedding Models]
Retrievers[Vector Stores]
LLMs[Language Models]
Rerankers[Context Rerankers]
end
subgraph "Infrastructure"
Config[Configuration Manager]
Logger[Event Logger]
Metrics[Metrics Collector]
Cache[Caching Layer]
end
CLI --> Factory
API --> Factory
Dashboard --> API
Factory --> Registry
Factory --> Pipeline
Pipeline --> Middleware
Registry --> Loaders
Registry --> Embedders
Registry --> Retrievers
Registry --> LLMs
Registry --> Rerankers
Pipeline --> Config
Pipeline --> Logger
Pipeline --> Metrics
Pipeline --> Cache
Data Flow Architecture
sequenceDiagram
participant U as User
participant P as Pipeline
participant L as Loader
participant E as Embedder
participant R as Retriever
participant LLM as Language Model
participant Re as Reranker
Note over U,Re: Document Ingestion Phase
U->>P: ingest(documents)
P->>L: load(document)
L->>P: chunks[]
P->>E: embed(chunks)
E->>P: embeddings[]
P->>R: store(embeddings)
R->>P: success
Note over U,Re: Query Processing Phase
U->>P: query(prompt)
P->>E: embed(prompt)
E->>P: queryVector
P->>R: search(queryVector)
R->>P: candidates[]
P->>Re: rerank(candidates, prompt)
Re->>P: rankedContext[]
P->>LLM: generate(prompt, context)
LLM->>P: response
P->>U: answer + metadata
Plugin System Architecture
Plugin Registry Implementation
The PluginRegistry is the heart of the plugin system, managing plugin lifecycle and dependency injection:
// Core plugin registry structure
class PluginRegistry {
private plugins: Map<string, Map<string, Plugin>> = new Map();
private contracts: Map<string, PluginContract> = new Map();
register<T extends Plugin>(
type: PluginType,
name: string,
plugin: T,
metadata?: PluginMetadata,
): void {
// Validate plugin against contract
this.validateContract(type, plugin);
// Register plugin with metadata
if (!this.plugins.has(type)) {
this.plugins.set(type, new Map());
}
this.plugins.get(type)!.set(name, plugin);
// Emit registration event
this.eventEmitter.emit("plugin:registered", {
type,
name,
plugin,
metadata,
});
}
get<T extends Plugin>(type: PluginType, name: string): T {
const typePlugins = this.plugins.get(type);
if (!typePlugins?.has(name)) {
throw new PluginNotFoundError(`Plugin ${type}:${name} not found`);
}
return typePlugins.get(name) as T;
}
list(type?: PluginType): PluginInfo[] {
// Return available plugins with metadata
}
}
Plugin Contract System
Each plugin type implements a strict contract interface:
// Loader Plugin Contract
interface LoaderPlugin {
readonly metadata: PluginMetadata;
load(source: string | Buffer, options?: LoaderOptions): Promise<Document[]>;
supports(mimeType: string): boolean;
validate(source: string | Buffer): Promise<ValidationResult>;
}
// Embedder Plugin Contract
interface EmbedderPlugin {
readonly metadata: PluginMetadata;
embed(texts: string[], options?: EmbedderOptions): Promise<Embedding[]>;
embedSingle(text: string, options?: EmbedderOptions): Promise<Embedding>;
getDimensions(): number;
getMaxTokens(): number;
}
// Retriever Plugin Contract
interface RetrieverPlugin {
readonly metadata: PluginMetadata;
store(embeddings: EmbeddingDocument[]): Promise<void>;
search(query: Embedding, options?: SearchOptions): Promise<SearchResult[]>;
delete(ids: string[]): Promise<void>;
getStats(): Promise<IndexStats>;
}
// LLM Plugin Contract
interface LLMPlugin {
readonly metadata: PluginMetadata;
generate(
prompt: string,
context?: string[],
options?: LLMOptions,
): AsyncIterable<string>;
generateSync(
prompt: string,
context?: string[],
options?: LLMOptions,
): Promise<string>;
getTokenCount(text: string): number;
getMaxContextLength(): number;
}
Plugin Metadata System
interface PluginMetadata {
name: string;
version: string;
description: string;
author: string;
license: string;
homepage?: string;
repository?: string;
keywords: string[];
// Plugin-specific metadata
capabilities: string[];
requirements: {
node: string;
dependencies?: Record<string, string>;
environment?: string[];
};
// Performance characteristics
performance: {
throughput?: string;
latency?: string;
memoryUsage?: string;
};
// Configuration schema
configSchema?: JSONSchema;
}
Pipeline Factory Pattern
The createRagPipeline function implements the Factory pattern to create configured pipeline instances:
export function createRagPipeline(config: PipelineConfig): RagPipeline {
// Initialize plugin registry
const registry = new PluginRegistry();
// Register built-in plugins
registerBuiltinPlugins(registry);
// Load custom plugins from config
if (config.plugins) {
await loadCustomPlugins(registry, config.plugins);
}
// Resolve plugin instances
const loader = registry.get("loader", config.loader);
const embedder = registry.get("embedder", config.embedder);
const retriever = registry.get("retriever", config.retriever);
const llm = registry.get("llm", config.llm);
const reranker = config.useReranker
? registry.get("reranker", config.reranker || "default")
: null;
// Create middleware stack
const middleware = createMiddlewareStack(config.middleware || []);
// Initialize pipeline with dependencies
return new RagPipeline({
loader,
embedder,
retriever,
llm,
reranker,
middleware,
config: config.pipelineConfig || {},
eventEmitter: new EventEmitter(),
logger: createLogger(config.logging || {}),
metrics: new MetricsCollector(config.metrics || {}),
});
}
Middleware Architecture
The middleware system allows for cross-cutting concerns like retry logic, caching, and monitoring:
// Middleware interface
interface Middleware {
name: string;
priority: number;
beforeLoad?(context: LoadContext): Promise<LoadContext>;
afterLoad?(context: LoadContext, result: Document[]): Promise<Document[]>;
beforeEmbed?(context: EmbedContext): Promise<EmbedContext>;
afterEmbed?(context: EmbedContext, result: Embedding[]): Promise<Embedding[]>;
beforeRetrieve?(context: RetrieveContext): Promise<RetrieveContext>;
afterRetrieve?(
context: RetrieveContext,
result: SearchResult[],
): Promise<SearchResult[]>;
beforeGenerate?(context: GenerateContext): Promise<GenerateContext>;
afterGenerate?(context: GenerateContext, result: string): Promise<string>;
onError?(error: Error, context: any): Promise<void>;
}
// Built-in middleware examples
class RetryMiddleware implements Middleware {
constructor(private options: RetryOptions) {}
async beforeGenerate(context: GenerateContext): Promise<GenerateContext> {
context.retryCount = 0;
context.maxRetries = this.options.maxRetries;
return context;
}
async onError(error: Error, context: any): Promise<void> {
if (context.retryCount < context.maxRetries) {
context.retryCount++;
// Implement exponential backoff
await this.delay(Math.pow(2, context.retryCount) * 1000);
throw new RetryableError(error);
}
throw error;
}
}
class CachingMiddleware implements Middleware {
constructor(private cache: Cache) {}
async beforeEmbed(context: EmbedContext): Promise<EmbedContext> {
const cacheKey = this.generateCacheKey(context.texts);
const cached = await this.cache.get(cacheKey);
if (cached) {
context.cachedResult = cached;
}
return context;
}
async afterEmbed(
context: EmbedContext,
result: Embedding[],
): Promise<Embedding[]> {
if (!context.cachedResult) {
const cacheKey = this.generateCacheKey(context.texts);
await this.cache.set(cacheKey, result, { ttl: 3600 });
}
return context.cachedResult || result;
}
}
Event System & Observability
The architecture includes a comprehensive event system for monitoring, debugging, and analytics:
// Event types emitted by the system
interface PipelineEvents {
"pipeline:created": { config: PipelineConfig };
"pipeline:destroyed": { pipelineId: string };
"document:loading": { source: string; loader: string };
"document:loaded": { source: string; chunks: number; duration: number };
"document:error": { source: string; error: Error };
"embedding:started": { texts: number; embedder: string };
"embedding:completed": { embeddings: number; duration: number };
"embedding:cached": { texts: number; cacheHit: boolean };
"retrieval:query": { query: string; retriever: string };
"retrieval:results": { results: number; duration: number };
"generation:started": { prompt: string; llm: string };
"generation:token": { token: string; position: number };
"generation:completed": {
response: string;
duration: number;
tokens: number;
};
"error:handled": { error: Error; context: string; recovered: boolean };
}
// Event-driven metrics collection
class MetricsCollector {
constructor(private config: MetricsConfig) {
this.setupEventListeners();
}
private setupEventListeners(): void {
// Track performance metrics
this.on("embedding:completed", (event) => {
this.histogram("embedding.duration", event.duration);
this.counter("embedding.requests").inc();
});
this.on("generation:completed", (event) => {
this.histogram("generation.duration", event.duration);
this.histogram("generation.tokens", event.tokens);
this.counter("generation.requests").inc();
});
// Track error rates
this.on("error:handled", (event) => {
this.counter("errors.total", {
context: event.context,
recovered: event.recovered.toString(),
}).inc();
});
}
}
Security Architecture
Security is built into the architecture at multiple levels:
Plugin Sandboxing
- Plugins run in isolated contexts with limited system access
- API key management through secure configuration
- Input validation and sanitization at plugin boundaries
Configuration Security
- Encrypted storage of sensitive configuration data
- Environment variable injection for secrets
- Configuration schema validation
Network Security
- TLS/SSL enforcement for all external API calls
- Request rate limiting and timeout handling
- Secure credential rotation support
Enterprise Architecture Components
Dependency Injection Container
The enterprise-grade dependency injection system provides IoC (Inversion of Control) for modular, testable architecture:
// src/core/dependency-injection.js
class DependencyContainer {
private services: Map<string, ServiceDefinition> = new Map();
private instances: Map<string, any> = new Map();
register<T>(
name: string,
factory: ServiceFactory<T>,
options?: ServiceOptions,
): void {
this.services.set(name, {
factory,
lifecycle: options?.lifecycle || "singleton",
dependencies: options?.dependencies || [],
});
}
resolve<T>(name: string): T {
if (this.instances.has(name)) {
return this.instances.get(name);
}
const service = this.services.get(name);
if (!service) {
throw new ServiceNotFoundError(`Service ${name} not registered`);
}
// Resolve dependencies
const dependencies = service.dependencies.map((dep) => this.resolve(dep));
// Create instance
const instance = service.factory(...dependencies);
if (service.lifecycle === "singleton") {
this.instances.set(name, instance);
}
return instance;
}
}
SLO Monitoring System
Built-in Service Level Objectives tracking with error budgets and alerting:
// src/observability/slo-monitor.js
class SLOMonitor {
private slos: Map<string, SLODefinition> = new Map();
private measurements: TimeSeriesDB = new TimeSeriesDB();
defineSLO(name: string, definition: SLODefinition): void {
this.slos.set(name, {
...definition,
errorBudget: this.calculateErrorBudget(definition),
});
}
recordMeasurement(sloName: string, success: boolean, latency?: number): void {
const measurement = {
timestamp: Date.now(),
success,
latency,
slo: sloName,
};
this.measurements.record(measurement);
this.checkSLOViolation(sloName, measurement);
}
getErrorBudgetStatus(sloName: string): ErrorBudgetStatus {
const slo = this.slos.get(sloName);
const recentMeasurements = this.measurements.getRecent(sloName, slo.window);
return {
remaining: this.calculateRemainingBudget(slo, recentMeasurements),
burnRate: this.calculateBurnRate(recentMeasurements),
alerting: this.shouldAlert(slo, recentMeasurements),
};
}
}
External API Mocking Infrastructure
Deterministic test infrastructure with network simulation for reliable CI/CD:
// __tests__/mocks/external-apis.js
class ExternalAPIMocker {
private mocks: Map<string, MockDefinition> = new Map();
private networkSimulator: NetworkSimulator;
mockAPI(service: string, config: MockConfig): void {
this.mocks.set(service, {
responses: config.responses,
latency: config.latency || { min: 100, max: 500 },
errorRate: config.errorRate || 0.05,
rateLimiting: config.rateLimiting,
});
}
async simulateRequest(
service: string,
request: APIRequest,
): Promise<APIResponse> {
const mock = this.mocks.get(service);
// Simulate network conditions
await this.networkSimulator.delay(mock.latency);
// Simulate errors
if (Math.random() < mock.errorRate) {
throw new MockNetworkError("Simulated network failure");
}
// Return mock response
return this.generateResponse(mock, request);
}
}
Advanced AI Architecture
Multi-Modal Processing Engine
Handle text, images, and structured data in unified pipelines:
// src/ai/multimodal/multi-modal-processor.js
class MultiModalProcessor {
private processors: Map<string, ModalityProcessor> = new Map();
async process(input: MultiModalInput): Promise<ProcessedContent> {
const results = await Promise.all(
input.modalities.map(async (modality) => {
const processor = this.processors.get(modality.type);
return processor.process(modality.content, modality.metadata);
}),
);
return this.fuseResults(results, input.fusionStrategy);
}
}
Federated Learning Coordinator
Distributed model training with privacy-preserving aggregation:
// src/ai/federation/federated-learning-coordinator.js
class FederatedLearningCoordinator {
async coordinateTraining(
participants: FederatedNode[],
): Promise<GlobalModel> {
// Distribute training tasks
const localUpdates = await this.distributeTraining(participants);
// Aggregate updates with differential privacy
const aggregatedUpdate = await this.aggregateWithPrivacy(localUpdates);
// Update global model
return this.updateGlobalModel(aggregatedUpdate);
}
}
Adaptive Retrieval Engine
Dynamic retrieval strategies with performance optimization:
// src/ai/retrieval/adaptive-retrieval-engine.js
class AdaptiveRetrievalEngine {
async adaptiveRetrieve(
query: Query,
context: RetrievalContext,
): Promise<RetrievalResult> {
// Analyze query complexity
const strategy = await this.selectStrategy(query, context);
// Execute retrieval with chosen strategy
const results = await this.executeStrategy(strategy, query);
// Learn from results for future optimization
await this.updateStrategyPerformance(strategy, results);
return results;
}
}
Enhanced Developer Tools
CLI Doctor Diagnostics
Comprehensive system health checking and troubleshooting:
// src/cli/doctor-command.js
class DoctorCommand {
async runDiagnostics(): Promise<DiagnosticReport> {
const checks = [
this.checkNodeVersion(),
this.checkDependencies(),
this.checkConfiguration(),
this.checkPluginHealth(),
this.checkExternalServices(),
this.checkPerformanceBottlenecks(),
];
const results = await Promise.allSettled(checks);
return this.generateReport(results);
}
}
Plugin Marketplace
Certified plugin ecosystem with discovery and installation workflows:
// src/core/plugin-marketplace/marketplace.js
class PluginMarketplace {
async discoverPlugins(criteria: SearchCriteria): Promise<PluginListing[]> {
const plugins = await this.searchRegistry(criteria);
return plugins.filter((plugin) => this.meetsCertificationStandards(plugin));
}
async installPlugin(pluginId: string): Promise<InstallationResult> {
// Verify plugin certification
await this.verifyCertification(pluginId);
// Install with security scanning
return this.secureInstall(pluginId);
}
}
This enterprise-grade architecture enables @DevilsDev/rag-pipeline-utils to scale from simple prototypes to mission-critical production systems while maintaining flexibility, security, and observability. Continue to Usage for practical implementation examples, or explore Plugins to learn about creating custom components.