Skip to main content
Version: 2.3.1 (Latest)

Architecture

This comprehensive guide outlines the internal architecture of @DevilsDev/rag-pipeline-utils, emphasizing enterprise-grade modularity, plugin design patterns, and SOLID-compliant structure. Understanding this architecture is crucial for extending the system, creating custom plugins, and optimizing performance.


Core Design Philosophy

The architecture is built on proven software engineering principles that ensure scalability, maintainability, and extensibility:

SOLID Principles Implementation

  • Single Responsibility Principle (SRP): Each component handles one specific domain concern

    • Loaders only handle document ingestion
    • Embedders only convert text to vectors
    • Retrievers only manage vector storage and search
    • LLMs only generate responses
  • Open/Closed Principle (OCP): System is open for extension but closed for modification

    • New plugins can be added without changing core code
    • Plugin interfaces define stable contracts
    • Middleware can be injected without core changes
  • Liskov Substitution Principle (LSP): Any plugin can be replaced with another of the same type

    • All PDF loaders implement the same interface
    • OpenAI embedder can be swapped with Cohere embedder
    • Vector stores are interchangeable
  • Interface Segregation Principle (ISP): Plugins only depend on interfaces they use

    • Loaders don't depend on LLM interfaces
    • Embedders don't need retriever methods
    • Clean separation of concerns
  • Dependency Inversion Principle (DIP): High-level modules don't depend on low-level modules

    • Pipeline depends on plugin abstractions, not implementations
    • Configuration drives dependency injection
    • Easy testing with mock implementations

Additional Architectural Principles

  • Plugin-Based Architecture: Complete modularity with hot-swappable components
  • Streaming-First Design: Async flows with real-time, token-by-token output
  • Configuration-Driven: Environment-safe config via .ragrc.json files
  • Event-Driven Processing: Comprehensive event system for monitoring and hooks
  • Fail-Fast Validation: Early error detection with detailed error messages
  • Observability Built-In: Comprehensive logging, tracing, and metrics collection

System Architecture Overview

High-Level Component Diagram

graph TB
subgraph "Application Layer"
CLI[CLI Interface]
API[Programmatic API]
Dashboard[Evaluation Dashboard]
end

subgraph "Core Pipeline"
Factory[Pipeline Factory]
Registry[Plugin Registry]
Pipeline[RAG Pipeline]
Middleware[Middleware Stack]
end

subgraph "Plugin Ecosystem"
Loaders[Document Loaders]
Embedders[Embedding Models]
Retrievers[Vector Stores]
LLMs[Language Models]
Rerankers[Context Rerankers]
end

subgraph "Infrastructure"
Config[Configuration Manager]
Logger[Event Logger]
Metrics[Metrics Collector]
Cache[Caching Layer]
end

CLI --> Factory
API --> Factory
Dashboard --> API

Factory --> Registry
Factory --> Pipeline
Pipeline --> Middleware

Registry --> Loaders
Registry --> Embedders
Registry --> Retrievers
Registry --> LLMs
Registry --> Rerankers

Pipeline --> Config
Pipeline --> Logger
Pipeline --> Metrics
Pipeline --> Cache

Data Flow Architecture

sequenceDiagram
participant U as User
participant P as Pipeline
participant L as Loader
participant E as Embedder
participant R as Retriever
participant LLM as Language Model
participant Re as Reranker

Note over U,Re: Document Ingestion Phase
U->>P: ingest(documents)
P->>L: load(document)
L->>P: chunks[]
P->>E: embed(chunks)
E->>P: embeddings[]
P->>R: store(embeddings)
R->>P: success

Note over U,Re: Query Processing Phase
U->>P: query(prompt)
P->>E: embed(prompt)
E->>P: queryVector
P->>R: search(queryVector)
R->>P: candidates[]
P->>Re: rerank(candidates, prompt)
Re->>P: rankedContext[]
P->>LLM: generate(prompt, context)
LLM->>P: response
P->>U: answer + metadata

Plugin System Architecture

Plugin Registry Implementation

The PluginRegistry is the heart of the plugin system, managing plugin lifecycle and dependency injection:

// Core plugin registry structure
class PluginRegistry {
private plugins: Map<string, Map<string, Plugin>> = new Map();
private contracts: Map<string, PluginContract> = new Map();

register<T extends Plugin>(
type: PluginType,
name: string,
plugin: T,
metadata?: PluginMetadata,
): void {
// Validate plugin against contract
this.validateContract(type, plugin);

// Register plugin with metadata
if (!this.plugins.has(type)) {
this.plugins.set(type, new Map());
}
this.plugins.get(type)!.set(name, plugin);

// Emit registration event
this.eventEmitter.emit("plugin:registered", {
type,
name,
plugin,
metadata,
});
}

get<T extends Plugin>(type: PluginType, name: string): T {
const typePlugins = this.plugins.get(type);
if (!typePlugins?.has(name)) {
throw new PluginNotFoundError(`Plugin ${type}:${name} not found`);
}
return typePlugins.get(name) as T;
}

list(type?: PluginType): PluginInfo[] {
// Return available plugins with metadata
}
}

Plugin Contract System

Each plugin type implements a strict contract interface:

// Loader Plugin Contract
interface LoaderPlugin {
readonly metadata: PluginMetadata;

load(source: string | Buffer, options?: LoaderOptions): Promise<Document[]>;
supports(mimeType: string): boolean;
validate(source: string | Buffer): Promise<ValidationResult>;
}

// Embedder Plugin Contract
interface EmbedderPlugin {
readonly metadata: PluginMetadata;

embed(texts: string[], options?: EmbedderOptions): Promise<Embedding[]>;
embedSingle(text: string, options?: EmbedderOptions): Promise<Embedding>;
getDimensions(): number;
getMaxTokens(): number;
}

// Retriever Plugin Contract
interface RetrieverPlugin {
readonly metadata: PluginMetadata;

store(embeddings: EmbeddingDocument[]): Promise<void>;
search(query: Embedding, options?: SearchOptions): Promise<SearchResult[]>;
delete(ids: string[]): Promise<void>;
getStats(): Promise<IndexStats>;
}

// LLM Plugin Contract
interface LLMPlugin {
readonly metadata: PluginMetadata;

generate(
prompt: string,
context?: string[],
options?: LLMOptions,
): AsyncIterable<string>;
generateSync(
prompt: string,
context?: string[],
options?: LLMOptions,
): Promise<string>;
getTokenCount(text: string): number;
getMaxContextLength(): number;
}

Plugin Metadata System

interface PluginMetadata {
name: string;
version: string;
description: string;
author: string;
license: string;
homepage?: string;
repository?: string;
keywords: string[];

// Plugin-specific metadata
capabilities: string[];
requirements: {
node: string;
dependencies?: Record<string, string>;
environment?: string[];
};

// Performance characteristics
performance: {
throughput?: string;
latency?: string;
memoryUsage?: string;
};

// Configuration schema
configSchema?: JSONSchema;
}

Pipeline Factory Pattern

The createRagPipeline function implements the Factory pattern to create configured pipeline instances:

export function createRagPipeline(config: PipelineConfig): RagPipeline {
// Initialize plugin registry
const registry = new PluginRegistry();

// Register built-in plugins
registerBuiltinPlugins(registry);

// Load custom plugins from config
if (config.plugins) {
await loadCustomPlugins(registry, config.plugins);
}

// Resolve plugin instances
const loader = registry.get("loader", config.loader);
const embedder = registry.get("embedder", config.embedder);
const retriever = registry.get("retriever", config.retriever);
const llm = registry.get("llm", config.llm);
const reranker = config.useReranker
? registry.get("reranker", config.reranker || "default")
: null;

// Create middleware stack
const middleware = createMiddlewareStack(config.middleware || []);

// Initialize pipeline with dependencies
return new RagPipeline({
loader,
embedder,
retriever,
llm,
reranker,
middleware,
config: config.pipelineConfig || {},
eventEmitter: new EventEmitter(),
logger: createLogger(config.logging || {}),
metrics: new MetricsCollector(config.metrics || {}),
});
}

Middleware Architecture

The middleware system allows for cross-cutting concerns like retry logic, caching, and monitoring:

// Middleware interface
interface Middleware {
name: string;
priority: number;

beforeLoad?(context: LoadContext): Promise<LoadContext>;
afterLoad?(context: LoadContext, result: Document[]): Promise<Document[]>;

beforeEmbed?(context: EmbedContext): Promise<EmbedContext>;
afterEmbed?(context: EmbedContext, result: Embedding[]): Promise<Embedding[]>;

beforeRetrieve?(context: RetrieveContext): Promise<RetrieveContext>;
afterRetrieve?(
context: RetrieveContext,
result: SearchResult[],
): Promise<SearchResult[]>;

beforeGenerate?(context: GenerateContext): Promise<GenerateContext>;
afterGenerate?(context: GenerateContext, result: string): Promise<string>;

onError?(error: Error, context: any): Promise<void>;
}

// Built-in middleware examples
class RetryMiddleware implements Middleware {
constructor(private options: RetryOptions) {}

async beforeGenerate(context: GenerateContext): Promise<GenerateContext> {
context.retryCount = 0;
context.maxRetries = this.options.maxRetries;
return context;
}

async onError(error: Error, context: any): Promise<void> {
if (context.retryCount < context.maxRetries) {
context.retryCount++;
// Implement exponential backoff
await this.delay(Math.pow(2, context.retryCount) * 1000);
throw new RetryableError(error);
}
throw error;
}
}

class CachingMiddleware implements Middleware {
constructor(private cache: Cache) {}

async beforeEmbed(context: EmbedContext): Promise<EmbedContext> {
const cacheKey = this.generateCacheKey(context.texts);
const cached = await this.cache.get(cacheKey);
if (cached) {
context.cachedResult = cached;
}
return context;
}

async afterEmbed(
context: EmbedContext,
result: Embedding[],
): Promise<Embedding[]> {
if (!context.cachedResult) {
const cacheKey = this.generateCacheKey(context.texts);
await this.cache.set(cacheKey, result, { ttl: 3600 });
}
return context.cachedResult || result;
}
}

Event System & Observability

The architecture includes a comprehensive event system for monitoring, debugging, and analytics:

// Event types emitted by the system
interface PipelineEvents {
"pipeline:created": { config: PipelineConfig };
"pipeline:destroyed": { pipelineId: string };

"document:loading": { source: string; loader: string };
"document:loaded": { source: string; chunks: number; duration: number };
"document:error": { source: string; error: Error };

"embedding:started": { texts: number; embedder: string };
"embedding:completed": { embeddings: number; duration: number };
"embedding:cached": { texts: number; cacheHit: boolean };

"retrieval:query": { query: string; retriever: string };
"retrieval:results": { results: number; duration: number };

"generation:started": { prompt: string; llm: string };
"generation:token": { token: string; position: number };
"generation:completed": {
response: string;
duration: number;
tokens: number;
};

"error:handled": { error: Error; context: string; recovered: boolean };
}

// Event-driven metrics collection
class MetricsCollector {
constructor(private config: MetricsConfig) {
this.setupEventListeners();
}

private setupEventListeners(): void {
// Track performance metrics
this.on("embedding:completed", (event) => {
this.histogram("embedding.duration", event.duration);
this.counter("embedding.requests").inc();
});

this.on("generation:completed", (event) => {
this.histogram("generation.duration", event.duration);
this.histogram("generation.tokens", event.tokens);
this.counter("generation.requests").inc();
});

// Track error rates
this.on("error:handled", (event) => {
this.counter("errors.total", {
context: event.context,
recovered: event.recovered.toString(),
}).inc();
});
}
}

Security Architecture

Security is built into the architecture at multiple levels:

Plugin Sandboxing

  • Plugins run in isolated contexts with limited system access
  • API key management through secure configuration
  • Input validation and sanitization at plugin boundaries

Configuration Security

  • Encrypted storage of sensitive configuration data
  • Environment variable injection for secrets
  • Configuration schema validation

Network Security

  • TLS/SSL enforcement for all external API calls
  • Request rate limiting and timeout handling
  • Secure credential rotation support

Enterprise Architecture Components

Dependency Injection Container

The enterprise-grade dependency injection system provides IoC (Inversion of Control) for modular, testable architecture:

// src/core/dependency-injection.js
class DependencyContainer {
private services: Map<string, ServiceDefinition> = new Map();
private instances: Map<string, any> = new Map();

register<T>(
name: string,
factory: ServiceFactory<T>,
options?: ServiceOptions,
): void {
this.services.set(name, {
factory,
lifecycle: options?.lifecycle || "singleton",
dependencies: options?.dependencies || [],
});
}

resolve<T>(name: string): T {
if (this.instances.has(name)) {
return this.instances.get(name);
}

const service = this.services.get(name);
if (!service) {
throw new ServiceNotFoundError(`Service ${name} not registered`);
}

// Resolve dependencies
const dependencies = service.dependencies.map((dep) => this.resolve(dep));

// Create instance
const instance = service.factory(...dependencies);

if (service.lifecycle === "singleton") {
this.instances.set(name, instance);
}

return instance;
}
}

SLO Monitoring System

Built-in Service Level Objectives tracking with error budgets and alerting:

// src/observability/slo-monitor.js
class SLOMonitor {
private slos: Map<string, SLODefinition> = new Map();
private measurements: TimeSeriesDB = new TimeSeriesDB();

defineSLO(name: string, definition: SLODefinition): void {
this.slos.set(name, {
...definition,
errorBudget: this.calculateErrorBudget(definition),
});
}

recordMeasurement(sloName: string, success: boolean, latency?: number): void {
const measurement = {
timestamp: Date.now(),
success,
latency,
slo: sloName,
};

this.measurements.record(measurement);
this.checkSLOViolation(sloName, measurement);
}

getErrorBudgetStatus(sloName: string): ErrorBudgetStatus {
const slo = this.slos.get(sloName);
const recentMeasurements = this.measurements.getRecent(sloName, slo.window);

return {
remaining: this.calculateRemainingBudget(slo, recentMeasurements),
burnRate: this.calculateBurnRate(recentMeasurements),
alerting: this.shouldAlert(slo, recentMeasurements),
};
}
}

External API Mocking Infrastructure

Deterministic test infrastructure with network simulation for reliable CI/CD:

// __tests__/mocks/external-apis.js
class ExternalAPIMocker {
private mocks: Map<string, MockDefinition> = new Map();
private networkSimulator: NetworkSimulator;

mockAPI(service: string, config: MockConfig): void {
this.mocks.set(service, {
responses: config.responses,
latency: config.latency || { min: 100, max: 500 },
errorRate: config.errorRate || 0.05,
rateLimiting: config.rateLimiting,
});
}

async simulateRequest(
service: string,
request: APIRequest,
): Promise<APIResponse> {
const mock = this.mocks.get(service);

// Simulate network conditions
await this.networkSimulator.delay(mock.latency);

// Simulate errors
if (Math.random() < mock.errorRate) {
throw new MockNetworkError("Simulated network failure");
}

// Return mock response
return this.generateResponse(mock, request);
}
}

Advanced AI Architecture

Multi-Modal Processing Engine

Handle text, images, and structured data in unified pipelines:

// src/ai/multimodal/multi-modal-processor.js
class MultiModalProcessor {
private processors: Map<string, ModalityProcessor> = new Map();

async process(input: MultiModalInput): Promise<ProcessedContent> {
const results = await Promise.all(
input.modalities.map(async (modality) => {
const processor = this.processors.get(modality.type);
return processor.process(modality.content, modality.metadata);
}),
);

return this.fuseResults(results, input.fusionStrategy);
}
}

Federated Learning Coordinator

Distributed model training with privacy-preserving aggregation:

// src/ai/federation/federated-learning-coordinator.js
class FederatedLearningCoordinator {
async coordinateTraining(
participants: FederatedNode[],
): Promise<GlobalModel> {
// Distribute training tasks
const localUpdates = await this.distributeTraining(participants);

// Aggregate updates with differential privacy
const aggregatedUpdate = await this.aggregateWithPrivacy(localUpdates);

// Update global model
return this.updateGlobalModel(aggregatedUpdate);
}
}

Adaptive Retrieval Engine

Dynamic retrieval strategies with performance optimization:

// src/ai/retrieval/adaptive-retrieval-engine.js
class AdaptiveRetrievalEngine {
async adaptiveRetrieve(
query: Query,
context: RetrievalContext,
): Promise<RetrievalResult> {
// Analyze query complexity
const strategy = await this.selectStrategy(query, context);

// Execute retrieval with chosen strategy
const results = await this.executeStrategy(strategy, query);

// Learn from results for future optimization
await this.updateStrategyPerformance(strategy, results);

return results;
}
}

Enhanced Developer Tools

CLI Doctor Diagnostics

Comprehensive system health checking and troubleshooting:

// src/cli/doctor-command.js
class DoctorCommand {
async runDiagnostics(): Promise<DiagnosticReport> {
const checks = [
this.checkNodeVersion(),
this.checkDependencies(),
this.checkConfiguration(),
this.checkPluginHealth(),
this.checkExternalServices(),
this.checkPerformanceBottlenecks(),
];

const results = await Promise.allSettled(checks);
return this.generateReport(results);
}
}

Plugin Marketplace

Certified plugin ecosystem with discovery and installation workflows:

// src/core/plugin-marketplace/marketplace.js
class PluginMarketplace {
async discoverPlugins(criteria: SearchCriteria): Promise<PluginListing[]> {
const plugins = await this.searchRegistry(criteria);
return plugins.filter((plugin) => this.meetsCertificationStandards(plugin));
}

async installPlugin(pluginId: string): Promise<InstallationResult> {
// Verify plugin certification
await this.verifyCertification(pluginId);

// Install with security scanning
return this.secureInstall(pluginId);
}
}

This enterprise-grade architecture enables @DevilsDev/rag-pipeline-utils to scale from simple prototypes to mission-critical production systems while maintaining flexibility, security, and observability. Continue to Usage for practical implementation examples, or explore Plugins to learn about creating custom components.