Skip to main content
Version: 2.4.0-dev (Next)

API Reference

Comprehensive reference documentation for RAG Pipeline Utils v2.3.1.

Core API

createRagPipeline

Creates a RAG pipeline instance with the specified plugins.

Signature:

function createRagPipeline(config: PipelineConfig): Pipeline;

Parameters:

ParameterTypeRequiredDescription
configPipelineConfigYesPipeline configuration object
config.registryPluginRegistryNoCustom plugin registry (uses default if not provided)
config.loaderLoader | stringNoDocument loader plugin or plugin name
config.embedderEmbedder | stringNoEmbedding generation plugin or plugin name
config.retrieverRetriever | stringNoDocument retrieval plugin or plugin name
config.llmLLM | stringNoLanguage model plugin or plugin name
config.rerankerReranker | stringNoResult reranking plugin or plugin name

Returns: Pipeline - Pipeline instance with query() and ingest() methods

Example:

const { createRagPipeline } = require("@devilsdev/rag-pipeline-utils");

const pipeline = createRagPipeline({
loader: new PDFLoader(),
embedder: new OpenAIEmbedder({ apiKey: process.env.OPENAI_API_KEY }),
retriever: new PineconeRetriever({ apiKey: process.env.PINECONE_API_KEY }),
llm: new OpenAILLM({ model: "gpt-4" }),
});

Aliases:

  • createPipeline - Backward compatibility alias

Pipeline Methods

pipeline.query()

Executes a query against the RAG pipeline.

Signature:

async query(
query: string,
options?: QueryOptions
): Promise<QueryResult>

Parameters:

ParameterTypeRequiredDescription
querystringYesNatural language query
options.topKnumberNoNumber of documents to retrieve (default: 3)
options.timeoutnumberNoTimeout in milliseconds
options.streambooleanNoEnable streaming response (default: false)
options.queryVectornumber[]NoPre-computed query embedding

Returns: Promise<QueryResult>

interface QueryResult {
text: string;
sources: Document[];
metadata?: Record<string, any>;
}

Example:

const result = await pipeline.query("What is the vacation policy?", {
topK: 5,
timeout: 10000,
});

console.log(result.text);
console.log(result.sources);

Streaming Example:

const stream = await pipeline.query("Explain the benefits", {
stream: true,
});

for await (const chunk of stream) {
if (!chunk.done) {
process.stdout.write(chunk.token);
}
}

pipeline.ingest()

Ingests documents into the RAG pipeline.

Signature:

async ingest(
source: string | string[],
options?: IngestOptions
): Promise<IngestResult>

Parameters:

ParameterTypeRequiredDescription
sourcestring | string[]YesFile path(s) or directory to ingest
options.batchSizenumberNoBatch size for processing (default: 100)
options.timeoutnumberNoTimeout per document in milliseconds

Returns: Promise<IngestResult>

Example:

await pipeline.ingest("./documents");
await pipeline.ingest(["file1.pdf", "file2.txt"]);

Configuration

loadConfig

Loads and validates RAG pipeline configuration from .ragrc.json.

Signature:

function loadConfig(path?: string): Promise<RagConfig>;

Parameters:

ParameterTypeRequiredDescription
pathstringNoPath to config file (default: .ragrc.json)

Returns: Promise<RagConfig>

Example:

const { loadConfig } = require("@devilsdev/rag-pipeline-utils");

const config = await loadConfig("./config/.ragrc.json");

validateRagrc

Validates RAG configuration against schema.

Signature:

function validateRagrc(config: unknown): ValidationResult;

Returns: ValidationResult with valid: boolean and errors: string[]

normalizeConfig

Normalizes and applies defaults to configuration.

Signature:

function normalizeConfig(config: Partial<RagConfig>): RagConfig;

Security

JWTValidator

Enterprise-grade JWT validation with replay protection.

Constructor:

new JWTValidator(options: JWTValidatorOptions)

Options:

ParameterTypeRequiredDescription
secretstringYesJWT signing secret
algorithmstringNoAlgorithm (default: 'HS256')
issuerstringNoExpected issuer
audiencestringNoExpected audience
expiresInstring | numberNoToken expiration (default: '1h')
strictValidationbooleanNoEnforce iss/aud validation (default: true)
enableJtiTrackingbooleanNoEnable replay protection (default: false)
jtiCacheSizenumberNoMax tracked JTIs (default: 10000)

Methods:

sign()

Signs a JWT token.

sign(payload: object, options?: SignOptions): string

Example:

const { JWTValidator } = require("@devilsdev/rag-pipeline-utils");

const validator = new JWTValidator({
secret: process.env.JWT_SECRET,
algorithm: "HS256",
issuer: "my-app",
audience: "api-users",
strictValidation: true,
enableJtiTracking: true,
});

const token = validator.sign({
sub: "user-123",
role: "admin",
});

verify()

Verifies and decodes a JWT token.

verify(token: string, options?: VerifyOptions): Promise<JWTPayload>

Throws:

  • JsonWebTokenError - Invalid token signature
  • TokenExpiredError - Token has expired
  • Error - Replay attack detected (if enableJtiTracking: true)

Example:

try {
const payload = await validator.verify(token);
console.log("User:", payload.sub);
} catch (error) {
if (error.message.includes("replay")) {
console.error("Replay attack detected");
}
}

Security Features:

  • Self-signed tokens are reusable (for refresh flows)
  • External tokens are single-use only (replay protection)
  • Race condition mitigation with optimized check-then-set pattern
  • Algorithm confusion attack prevention

InputSanitizer

Multi-layer input sanitization with path traversal defense.

Constructor:

new InputSanitizer(options?: SanitizerOptions)

Options:

ParameterTypeRequiredDescription
throwOnInvalidbooleanNoThrow on validation failure (default: true)
maxLengthnumberNoMaximum input length (default: 10000)
allowedTagsstring[]NoAllowed HTML tags (default: [])

Methods:

sanitize()

Sanitizes general text input against XSS, SQL injection, command injection.

sanitize(input: string): string

Example:

const { InputSanitizer } = require("@devilsdev/rag-pipeline-utils");

const sanitizer = new InputSanitizer({
throwOnInvalid: true,
maxLength: 5000,
});

const safe = sanitizer.sanitize(userInput);

sanitizePath()

Sanitizes file paths with iterative URL decoding to prevent path traversal.

sanitizePath(path: string): string

Throws: Error - Path traversal attempt detected (always throws, even with throwOnInvalid: false)

Example:

try {
const safePath = sanitizer.sanitizePath(userProvidedPath);
// Use safePath
} catch (error) {
console.error("Path traversal blocked:", error.message);
}

Protection Against:

  • Standard traversal: ../../../etc/passwd
  • Windows paths: ..\\..\\windows\\system32
  • URL encoded: %2e%2e%2f, %2e%2e%5c
  • Double encoded: %252e%252e%252f
  • Multi-level encoding (up to 5 iterations)

AI/ML Capabilities

MultiModalProcessor

Process text, images, audio, and video content with unified embedding pipelines.

Constructor:

new MultiModalProcessor(options: MultiModalOptions)

Methods:

async process(content: MultiModalContent): Promise<ProcessedContent>

Example:

const { MultiModalProcessor } = require("@devilsdev/rag-pipeline-utils");

const processor = new MultiModalProcessor({
textModel: "text-embedding-ada-002",
imageModel: "clip-vit-base",
});

const result = await processor.process({
type: "image",
data: imageBuffer,
});

AdaptiveRetrievalEngine

Learning-based relevance optimization using reinforcement learning.

Constructor:

new AdaptiveRetrievalEngine(options: AdaptiveRetrievalOptions)

Methods:

async retrieve(query: string, options?: RetrievalOptions): Promise<Document[]>
async learn(feedback: RelevanceFeedback): Promise<void>

Example:

const { AdaptiveRetrievalEngine } = require("@devilsdev/rag-pipeline-utils");

const engine = new AdaptiveRetrievalEngine({
baseRetriever: myRetriever,
learningRate: 0.01,
});

const docs = await engine.retrieve("machine learning basics");

// Provide feedback for learning
await engine.learn({
query: "machine learning basics",
relevantDocs: [docs[0].id, docs[2].id],
irrelevantDocs: [docs[3].id],
});

Workflow Engine

DAGEngine

Execute complex RAG workflows as directed acyclic graphs.

Constructor:

new DAGEngine(options?: DAGOptions)

Methods:

addNode()

addNode(id: string, task: TaskFunction): void

addEdge()

addEdge(from: string, to: string): void

execute()

async execute(input?: any): Promise<DAGResult>

Example:

const { DAGEngine } = require("@devilsdev/rag-pipeline-utils");

const dag = new DAGEngine();

dag.addNode("load", async () => await loadDocuments());
dag.addNode("embed", async (docs) => await embedDocuments(docs));
dag.addNode("store", async (embeddings) => await storeEmbeddings(embeddings));

dag.addEdge("load", "embed");
dag.addEdge("embed", "store");

const result = await dag.execute();

Observability

metrics

Global metrics collection for monitoring pipeline performance.

Methods:

metrics.counter(name: string, value?: number, tags?: object): void
metrics.gauge(name: string, value: number, tags?: object): void
metrics.histogram(name: string, value: number, tags?: object): void
metrics.timing(name: string, duration: number, tags?: object): void

Example:

const { metrics } = require("@devilsdev/rag-pipeline-utils");

metrics.counter("pipeline.queries");
metrics.timing("retrieval.latency", 150);
metrics.gauge("cache.hit_rate", 0.85);

eventLogger

Structured event logging for audit trails.

Methods:

eventLogger.log(event: Event): void
eventLogger.query(filters: EventFilters): Promise<Event[]>

Example:

const { eventLogger } = require("@devilsdev/rag-pipeline-utils");

eventLogger.log({
type: "security.jwt_validation",
level: "warn",
message: "Token replay detected",
metadata: { jti: "abc123", userId: "user-456" },
});

Enterprise Features

AuditLogger

Immutable audit logging for compliance requirements.

Constructor:

new AuditLogger(options: AuditLoggerOptions)

Example:

const { AuditLogger } = require("@devilsdev/rag-pipeline-utils");

const auditLogger = new AuditLogger({
backend: "s3",
bucket: "compliance-logs",
encryption: true,
});

await auditLogger.log({
action: "document.access",
actor: "user-123",
resource: "confidential-doc-456",
timestamp: new Date().toISOString(),
});

DataGovernance

Multi-tenant data isolation and resource quotas.

Constructor:

new DataGovernance(options: DataGovernanceOptions)

Example:

const { DataGovernance } = require("@devilsdev/rag-pipeline-utils");

const governance = new DataGovernance({
tenantIdField: "orgId",
quotas: {
"org-123": { maxDocuments: 10000, maxStorage: "5GB" },
},
});

Development Tools

HotReloadManager

Enable hot module reloading during development.

Constructor:

new HotReloadManager(options: HotReloadOptions)

Example:

const { createHotReloadManager } = require("@devilsdev/rag-pipeline-utils");

const hotReload = createHotReloadManager({
watchPaths: ["./plugins/**/*.js"],
onReload: (module) => console.log("Reloaded:", module),
});

await hotReload.start();

DevServer

Development server with real-time debugging.

Constructor:

new DevServer(options: DevServerOptions)

Example:

const { createDevServer } = require("@devilsdev/rag-pipeline-utils");

const server = createDevServer({
port: 3000,
pipeline: myPipeline,
enableDebugger: true,
});

await server.start();
// Access dashboard at http://localhost:3000

Utilities

logger

Structured logging utility with multiple transport support.

Methods:

logger.debug(message: string, meta?: object): void
logger.info(message: string, meta?: object): void
logger.warn(message: string, meta?: object): void
logger.error(message: string, meta?: object): void

Example:

const { logger } = require("@devilsdev/rag-pipeline-utils");

logger.info("Pipeline initialized", {
plugins: ["loader", "embedder", "retriever"],
});

Error Handling

createError:

createError(
code: ErrorCode,
message: string,
details?: object
): RagError

wrapError:

wrapError(
error: Error,
context: string
): RagError

ERROR_CODES:

  • PLUGIN_NOT_FOUND
  • INVALID_CONFIG
  • VALIDATION_ERROR
  • NETWORK_ERROR
  • TIMEOUT_ERROR
  • SECURITY_ERROR

Example:

const {
createError,
wrapError,
ERROR_CODES,
} = require("@devilsdev/rag-pipeline-utils");

throw createError(
ERROR_CODES.PLUGIN_NOT_FOUND,
"Embedder plugin not registered",
{ pluginName: "custom-embedder" },
);

try {
await riskyOperation();
} catch (err) {
throw wrapError(err, "Pipeline execution");
}

Plugin System

pluginRegistry

Global plugin registry for managing pipeline components.

Methods:

register()

register(
type: PluginType,
name: string,
plugin: Plugin
): void

get()

get(
type: PluginType,
name: string
): Plugin

list()

list(type?: PluginType): string[]

Plugin Types:

  • 'loader' - Document loaders
  • 'embedder' - Embedding generators
  • 'retriever' - Document retrievers
  • 'llm' - Language models
  • 'reranker' - Result rerankers

Example:

const { pluginRegistry } = require("@devilsdev/rag-pipeline-utils");

pluginRegistry.register("embedder", "my-embedder", new MyEmbedder());

const embedder = pluginRegistry.get("embedder", "my-embedder");

console.log(pluginRegistry.list("embedder"));
// ['my-embedder', 'openai', 'cohere', ...]

Performance

ParallelProcessor

Process documents in parallel with configurable concurrency.

Constructor:

new ParallelProcessor(options: ParallelProcessorOptions)

Methods:

async process(
items: T[],
handler: (item: T) => Promise<R>
): Promise<R[]>

Example:

const { ParallelProcessor } = require("@devilsdev/rag-pipeline-utils");

const processor = new ParallelProcessor({
concurrency: 10,
retryAttempts: 3,
});

const results = await processor.process(
documents,
async (doc) => await embedDocument(doc),
);

Type Definitions

Plugin Contracts

Loader:

interface Loader {
load(source: string): Promise<Document[]>;
}

Embedder:

interface Embedder {
embed(text: string): Promise<number[]>;
embedBatch?(texts: string[]): Promise<number[][]>;
}

Retriever:

interface Retriever {
retrieve(options: RetrieveOptions): Promise<Document[]>;

interface RetrieveOptions {
query?: string;
queryVector?: number[];
topK?: number;
}
}

LLM:

interface LLM {
generate(
query: string,
context: Document[],
options?: GenerateOptions,
): Promise<string>;

generateStream?(
query: string,
context: Document[],
options?: GenerateOptions,
): AsyncIterable<StreamChunk>;
}

Reranker:

interface Reranker {
rerank(
query: string,
documents: Document[],
options?: RerankOptions,
): Promise<Document[]>;
}

Common Types

Document:

interface Document {
id: string;
content: string;
metadata?: Record<string, any>;
score?: number;
}

StreamChunk:

interface StreamChunk {
token: string;
done: boolean;
metadata?: Record<string, any>;
}

Version Information

Current version: 2.3.1

Supported Node.js versions:

  • Node.js 18.x
  • Node.js 20.x
  • Node.js 22.x

Further Reading