Skip to main content
Version: 2.3.1 (Latest)

API Reference

Comprehensive reference documentation for RAG Pipeline Utils v2.3.1.

Core API

createRagPipeline

Creates a RAG pipeline instance with the specified plugins.

Behavior: Factory function that initializes a complete RAG pipeline by composing loader, embedder, retriever, and LLM components. Supports both plugin instances and string references to registered plugins.

Signature:

function createRagPipeline(config: PipelineConfig): Pipeline;

Parameters:

ParameterTypeRequiredDescription
configPipelineConfigYesPipeline configuration object
config.registryPluginRegistryNoCustom plugin registry (uses default if not provided)
config.loaderLoader | stringNoDocument loader plugin or plugin name
config.embedderEmbedder | stringNoEmbedding generation plugin or plugin name
config.retrieverRetriever | stringNoDocument retrieval plugin or plugin name
config.llmLLM | stringNoLanguage model plugin or plugin name
config.rerankerReranker | stringNoResult reranking plugin or plugin name

Returns: Pipeline - Pipeline instance with run() and cleanup() methods

Example:

const { createRagPipeline } = require("@devilsdev/rag-pipeline-utils");

const pipeline = createRagPipeline({
embedder: new OpenAIEmbedder({ apiKey: process.env.OPENAI_API_KEY }),
retriever: new PineconeRetriever({ apiKey: process.env.PINECONE_API_KEY }),
llm: new OpenAILLM({ model: "gpt-4" }),
});

Aliases:

  • createPipeline - Backward compatibility alias (deprecated, use createRagPipeline instead)

Pipeline Methods

pipeline.run()

Executes the RAG pipeline with the provided query.

Behavior: Processes queries through the complete RAG flow: embeds query (if embedder provided), retrieves relevant documents using the retriever, optionally reranks results, and generates a response using the LLM. Supports both standard and streaming response modes.

Signature:

async run(options: RunOptions): Promise<RunResult>

Parameters:

ParameterTypeRequiredDescription
options.querystringNoNatural language query text
options.queryVectornumber[]NoPre-computed query embedding vector
options.topKnumberNoNumber of documents to retrieve (default: 3)
options.timeoutnumberNoTimeout in milliseconds
options.streambooleanNoEnable streaming response (default: false)

Note: Either query or queryVector must be provided.

Returns: Promise<RunResult>

interface RunResult {
success: boolean;
query: string;
results: Document[];
error?: string; // Only present when success is false
}

Example:

const result = await pipeline.run({
query: "What is the vacation policy?",
options: { topK: 5, timeout: 10000 },
});

if (result.success) {
console.log("Query:", result.query);
console.log("Results:", result.results);
}

Streaming Example:

const stream = await pipeline.run({
query: "Explain the benefits",
options: { stream: true },
});

// Stream is an async generator
for await (const chunk of stream) {
if (!chunk.done) {
process.stdout.write(chunk.token);
}
}

Configuration

loadConfig

Loads and validates RAG pipeline configuration from .ragrc.json.

Signature:

function loadConfig(path?: string): Promise<RagConfig>;

Parameters:

ParameterTypeRequiredDescription
pathstringNoPath to config file (default: .ragrc.json)

Returns: Promise<RagConfig>

Example:

const { loadConfig } = require("@devilsdev/rag-pipeline-utils");

const config = await loadConfig("./config/.ragrc.json");

validateRagrc

Validates RAG configuration against schema.

Signature:

function validateRagrc(config: unknown): ValidationResult;

Returns: ValidationResult with valid: boolean and errors: string[]

normalizeConfig

Normalizes and applies defaults to configuration.

Signature:

function normalizeConfig(config: Partial<RagConfig>): RagConfig;

Security

Not Available in Public API

The security utilities JWTValidator and InputSanitizer exist in the codebase but are not exported in the public API (src/index.js). They are available only through CLI commands or by directly requiring internal modules. If you need these features, please open an issue requesting they be added to the public API.

JWTValidator

Enterprise-grade JWT validation with replay protection (CLI/Internal use only).

Behavior: Provides cryptographically secure JWT signing and verification with built-in replay attack detection, algorithm confusion prevention, and race condition mitigation. Supports both self-signed (reusable) and external (single-use) token validation.

Constructor:

new JWTValidator(options: JWTValidatorOptions)

Options:

ParameterTypeRequiredDescription
secretstringYesJWT signing secret
algorithmstringNoAlgorithm (default: 'HS256')
issuerstringNoExpected issuer
audiencestringNoExpected audience
expiresInstring | numberNoToken expiration (default: '1h')
strictValidationbooleanNoEnforce iss/aud validation (default: true)
enableJtiTrackingbooleanNoEnable replay protection (default: false)
jtiCacheSizenumberNoMax tracked JTIs (default: 10000)

Methods:

sign()

Signs a JWT token.

sign(payload: object, options?: SignOptions): string

Example:

const { JWTValidator } = require("@devilsdev/rag-pipeline-utils");

const validator = new JWTValidator({
secret: process.env.JWT_SECRET,
algorithm: "HS256",
issuer: "my-app",
audience: "api-users",
strictValidation: true,
enableJtiTracking: true, // Replay protection
});

const token = validator.sign({
sub: "user-123",
role: "admin",
});

verify()

Verifies and decodes a JWT token.

verify(token: string, options?: VerifyOptions): Promise<JWTPayload>

Throws:

  • JsonWebTokenError - Invalid token signature
  • TokenExpiredError - Token has expired
  • Error - Replay attack detected (if enableJtiTracking: true)

Example:

try {
const payload = await validator.verify(token);
console.log("User:", payload.sub);
} catch (error) {
if (error.message.includes("replay")) {
console.error("Replay attack detected");
}
}

Security Features:

  • Self-signed tokens are reusable (for refresh flows)
  • External tokens are single-use only (replay protection)
  • Race condition mitigation with optimized check-then-set pattern
  • Algorithm confusion attack prevention

InputSanitizer

Multi-layer input sanitization with path traversal defense (CLI/Internal use only).

Behavior: Protects against XSS, SQL injection, command injection, and path traversal attacks through multi-layer validation. Uses iterative URL decoding (up to 5 iterations) to detect sophisticated encoding-based attacks.

Constructor:

new InputSanitizer(options?: SanitizerOptions)

Options:

ParameterTypeRequiredDescription
throwOnInvalidbooleanNoThrow on validation failure (default: true)
maxLengthnumberNoMaximum input length (default: 10000)
allowedTagsstring[]NoAllowed HTML tags (default: [])

Methods:

sanitize()

Sanitizes general text input against XSS, SQL injection, command injection.

sanitize(input: string): string

Example:

const { InputSanitizer } = require("@devilsdev/rag-pipeline-utils");

const sanitizer = new InputSanitizer({
throwOnInvalid: true,
maxLength: 5000,
});

const safe = sanitizer.sanitize(userInput);

sanitizePath()

Sanitizes file paths with iterative URL decoding to prevent path traversal.

sanitizePath(path: string): string

Throws: Error - Path traversal attempt detected (always throws, even with throwOnInvalid: false)

Example:

try {
const safePath = sanitizer.sanitizePath(userProvidedPath);
// Use safePath
} catch (error) {
console.error("Path traversal blocked:", error.message);
}

Protection Against:

  • Standard traversal: ../../../etc/passwd
  • Windows paths: ..\\..\\windows\\system32
  • URL encoded: %2e%2e%2f, %2e%2e%5c
  • Double encoded: %252e%252e%252f
  • Multi-level encoding (up to 5 iterations)

AI/ML Capabilities

MultiModalProcessor

Process text, images, audio, and video content with unified embedding pipelines.

Behavior: Handles multi-modal content (text, images, audio, video) through unified embedding generation. Automatically selects appropriate models based on content type and normalizes embeddings for cross-modal retrieval.

Constructor:

new MultiModalProcessor(options: MultiModalOptions)

Methods:

async process(content: MultiModalContent): Promise<ProcessedContent>

Example:

const { MultiModalProcessor } = require("@devilsdev/rag-pipeline-utils");

const processor = new MultiModalProcessor({
textModel: "text-embedding-ada-002",
imageModel: "clip-vit-base",
});

const result = await processor.process({
type: "image",
data: imageBuffer,
});

AdaptiveRetrievalEngine

Learning-based relevance optimization using reinforcement learning.

Constructor:

new AdaptiveRetrievalEngine(options: AdaptiveRetrievalOptions)

Methods:

async retrieve(query: string, options?: RetrievalOptions): Promise<Document[]>
async learn(feedback: RelevanceFeedback): Promise<void>

Example:

const { AdaptiveRetrievalEngine } = require("@devilsdev/rag-pipeline-utils");

const engine = new AdaptiveRetrievalEngine({
baseRetriever: myRetriever,
learningRate: 0.01,
});

const docs = await engine.retrieve("machine learning basics");

// Provide feedback for learning
await engine.learn({
query: "machine learning basics",
relevantDocs: [docs[0].id, docs[2].id],
irrelevantDocs: [docs[3].id],
});

Workflow Engine

DAGEngine

Execute complex RAG workflows as directed acyclic graphs.

Behavior: Orchestrates complex multi-step workflows as directed acyclic graphs with automatic dependency resolution, parallel execution of independent tasks, and comprehensive error handling with retry logic.

Constructor:

new DAGEngine(options?: DAGOptions)

Methods:

addNode()

addNode(id: string, task: TaskFunction): void

addEdge()

addEdge(from: string, to: string): void

execute()

async execute(input?: any): Promise<DAGResult>

Example:

const { DAGEngine } = require("@devilsdev/rag-pipeline-utils");

const dag = new DAGEngine();

dag.addNode("load", async () => await loadDocuments());
dag.addNode("embed", async (docs) => await embedDocuments(docs));
dag.addNode("store", async (embeddings) => await storeEmbeddings(embeddings));

dag.addEdge("load", "embed");
dag.addEdge("embed", "store");

const result = await dag.execute();

Observability

metrics

Global metrics collection for monitoring pipeline performance.

Methods:

metrics.counter(name: string, value?: number, tags?: object): void
metrics.gauge(name: string, value: number, tags?: object): void
metrics.histogram(name: string, value: number, tags?: object): void
metrics.timing(name: string, duration: number, tags?: object): void

Example:

const { metrics } = require("@devilsdev/rag-pipeline-utils");

metrics.counter("pipeline.queries");
metrics.timing("retrieval.latency", 150);
metrics.gauge("cache.hit_rate", 0.85);

eventLogger

Structured event logging for audit trails.

Methods:

eventLogger.log(event: Event): void
eventLogger.query(filters: EventFilters): Promise<Event[]>

Example:

const { eventLogger } = require("@devilsdev/rag-pipeline-utils");

eventLogger.log({
type: "security.jwt_validation",
level: "warn",
message: "Token replay detected",
metadata: { jti: "abc123", userId: "user-456" },
});

Enterprise Features

AuditLogger

Immutable audit logging for compliance requirements.

Constructor:

new AuditLogger(options: AuditLoggerOptions)

Example:

const { AuditLogger } = require("@devilsdev/rag-pipeline-utils");

const auditLogger = new AuditLogger({
backend: "s3",
bucket: "compliance-logs",
encryption: true,
});

await auditLogger.log({
action: "document.access",
actor: "user-123",
resource: "confidential-doc-456",
timestamp: new Date().toISOString(),
});

DataGovernance

Multi-tenant data isolation and resource quotas.

Constructor:

new DataGovernance(options: DataGovernanceOptions)

Example:

const { DataGovernance } = require("@devilsdev/rag-pipeline-utils");

const governance = new DataGovernance({
tenantIdField: "orgId",
quotas: {
"org-123": { maxDocuments: 10000, maxStorage: "5GB" },
},
});

Development Tools

HotReloadManager

Enable hot module reloading during development.

Constructor:

new HotReloadManager(options: HotReloadOptions)

Example:

const { createHotReloadManager } = require("@devilsdev/rag-pipeline-utils");

const hotReload = createHotReloadManager({
watchPaths: ["./plugins/**/*.js"],
onReload: (module) => console.log("Reloaded:", module),
});

await hotReload.start();

DevServer

Development server with real-time debugging.

Constructor:

new DevServer(options: DevServerOptions)

Example:

const { createDevServer } = require("@devilsdev/rag-pipeline-utils");

const server = createDevServer({
port: 3000,
pipeline: myPipeline,
enableDebugger: true,
});

await server.start();
// Access dashboard at http://localhost:3000

Utilities

logger

Structured logging utility with multiple transport support.

Methods:

logger.debug(message: string, meta?: object): void
logger.info(message: string, meta?: object): void
logger.warn(message: string, meta?: object): void
logger.error(message: string, meta?: object): void

Example:

const { logger } = require("@devilsdev/rag-pipeline-utils");

logger.info("Pipeline initialized", {
plugins: ["loader", "embedder", "retriever"],
});

Error Handling

createError:

createError(
code: ErrorCode,
message: string,
details?: object
): RagError

wrapError:

wrapError(
error: Error,
context: string
): RagError

ERROR_CODES:

  • PLUGIN_NOT_FOUND
  • INVALID_CONFIG
  • VALIDATION_ERROR
  • NETWORK_ERROR
  • TIMEOUT_ERROR
  • SECURITY_ERROR

Example:

const {
createError,
wrapError,
ERROR_CODES,
} = require("@devilsdev/rag-pipeline-utils");

throw createError(
ERROR_CODES.PLUGIN_NOT_FOUND,
"Embedder plugin not registered",
{ pluginName: "custom-embedder" },
);

try {
await riskyOperation();
} catch (err) {
throw wrapError(err, "Pipeline execution");
}

Plugin System

pluginRegistry

Global plugin registry for managing pipeline components.

Methods:

register()

register(
type: PluginType,
name: string,
plugin: Plugin
): void

get()

get(
type: PluginType,
name: string
): Plugin

list()

list(type?: PluginType): string[]

Plugin Types:

  • 'loader' - Document loaders
  • 'embedder' - Embedding generators
  • 'retriever' - Document retrievers
  • 'llm' - Language models
  • 'reranker' - Result rerankers

Example:

const { pluginRegistry } = require("@devilsdev/rag-pipeline-utils");

pluginRegistry.register("embedder", "my-embedder", new MyEmbedder());

const embedder = pluginRegistry.get("embedder", "my-embedder");

console.log(pluginRegistry.list("embedder"));
// ['my-embedder', 'openai', 'cohere', ...]

Performance

ParallelProcessor

Process documents in parallel with configurable concurrency.

Constructor:

new ParallelProcessor(options: ParallelProcessorOptions)

Methods:

async process(
items: T[],
handler: (item: T) => Promise<R>
): Promise<R[]>

Example:

const { ParallelProcessor } = require("@devilsdev/rag-pipeline-utils");

const processor = new ParallelProcessor({
concurrency: 10,
retryAttempts: 3,
});

const results = await processor.process(
documents,
async (doc) => await embedDocument(doc),
);

Type Definitions

Plugin Contracts

Loader:

interface Loader {
load(source: string): Promise<Document[]>;
}

Embedder:

interface Embedder {
embed(text: string): Promise<number[]>;
embedBatch?(texts: string[]): Promise<number[][]>;
}

Retriever:

interface Retriever {
retrieve(options: RetrieveOptions): Promise<Document[]>;

interface RetrieveOptions {
query?: string;
queryVector?: number[];
topK?: number;
}
}

LLM:

interface LLM {
generate(
query: string,
context: Document[],
options?: GenerateOptions,
): Promise<string>;

generateStream?(
query: string,
context: Document[],
options?: GenerateOptions,
): AsyncIterable<StreamChunk>;
}

Reranker:

interface Reranker {
rerank(
query: string,
documents: Document[],
options?: RerankOptions,
): Promise<Document[]>;
}

Common Types

Document:

interface Document {
id: string;
content: string;
metadata?: Record<string, any>;
score?: number;
}

StreamChunk:

interface StreamChunk {
token: string;
done: boolean;
metadata?: Record<string, any>;
}

Version Information

Current version: 2.3.1

Supported Node.js versions:

  • Node.js 18.x
  • Node.js 20.x
  • Node.js 22.x

Further Reading