API Reference
Comprehensive reference documentation for RAG Pipeline Utils v2.3.1.
Core API
createRagPipeline
Creates a RAG pipeline instance with the specified plugins.
Signature:
function createRagPipeline(config: PipelineConfig): Pipeline;
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
config | PipelineConfig | Yes | Pipeline configuration object |
config.registry | PluginRegistry | No | Custom plugin registry (uses default if not provided) |
config.loader | Loader | string | No | Document loader plugin or plugin name |
config.embedder | Embedder | string | No | Embedding generation plugin or plugin name |
config.retriever | Retriever | string | No | Document retrieval plugin or plugin name |
config.llm | LLM | string | No | Language model plugin or plugin name |
config.reranker | Reranker | string | No | Result reranking plugin or plugin name |
Returns: Pipeline - Pipeline instance with query() and ingest() methods
Example:
const { createRagPipeline } = require("@devilsdev/rag-pipeline-utils");
const pipeline = createRagPipeline({
loader: new PDFLoader(),
embedder: new OpenAIEmbedder({ apiKey: process.env.OPENAI_API_KEY }),
retriever: new PineconeRetriever({ apiKey: process.env.PINECONE_API_KEY }),
llm: new OpenAILLM({ model: "gpt-4" }),
});
Aliases:
createPipeline- Backward compatibility alias
Pipeline Methods
pipeline.query()
Executes a query against the RAG pipeline.
Signature:
async query(
query: string,
options?: QueryOptions
): Promise<QueryResult>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
query | string | Yes | Natural language query |
options.topK | number | No | Number of documents to retrieve (default: 3) |
options.timeout | number | No | Timeout in milliseconds |
options.stream | boolean | No | Enable streaming response (default: false) |
options.queryVector | number[] | No | Pre-computed query embedding |
Returns: Promise<QueryResult>
interface QueryResult {
text: string;
sources: Document[];
metadata?: Record<string, any>;
}
Example:
const result = await pipeline.query("What is the vacation policy?", {
topK: 5,
timeout: 10000,
});
console.log(result.text);
console.log(result.sources);
Streaming Example:
const stream = await pipeline.query("Explain the benefits", {
stream: true,
});
for await (const chunk of stream) {
if (!chunk.done) {
process.stdout.write(chunk.token);
}
}
pipeline.ingest()
Ingests documents into the RAG pipeline.
Signature:
async ingest(
source: string | string[],
options?: IngestOptions
): Promise<IngestResult>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
source | string | string[] | Yes | File path(s) or directory to ingest |
options.batchSize | number | No | Batch size for processing (default: 100) |
options.timeout | number | No | Timeout per document in milliseconds |
Returns: Promise<IngestResult>
Example:
await pipeline.ingest("./documents");
await pipeline.ingest(["file1.pdf", "file2.txt"]);
Configuration
loadConfig
Loads and validates RAG pipeline configuration from .ragrc.json.
Signature:
function loadConfig(path?: string): Promise<RagConfig>;
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
path | string | No | Path to config file (default: .ragrc.json) |
Returns: Promise<RagConfig>
Example:
const { loadConfig } = require("@devilsdev/rag-pipeline-utils");
const config = await loadConfig("./config/.ragrc.json");
validateRagrc
Validates RAG configuration against schema.
Signature:
function validateRagrc(config: unknown): ValidationResult;
Returns: ValidationResult with valid: boolean and errors: string[]
normalizeConfig
Normalizes and applies defaults to configuration.
Signature:
function normalizeConfig(config: Partial<RagConfig>): RagConfig;
Security
JWTValidator
Enterprise-grade JWT validation with replay protection.
Constructor:
new JWTValidator(options: JWTValidatorOptions)
Options:
| Parameter | Type | Required | Description |
|---|---|---|---|
secret | string | Yes | JWT signing secret |
algorithm | string | No | Algorithm (default: 'HS256') |
issuer | string | No | Expected issuer |
audience | string | No | Expected audience |
expiresIn | string | number | No | Token expiration (default: '1h') |
strictValidation | boolean | No | Enforce iss/aud validation (default: true) |
enableJtiTracking | boolean | No | Enable replay protection (default: false) |
jtiCacheSize | number | No | Max tracked JTIs (default: 10000) |
Methods:
sign()
Signs a JWT token.
sign(payload: object, options?: SignOptions): string
Example:
const { JWTValidator } = require("@devilsdev/rag-pipeline-utils");
const validator = new JWTValidator({
secret: process.env.JWT_SECRET,
algorithm: "HS256",
issuer: "my-app",
audience: "api-users",
strictValidation: true,
enableJtiTracking: true,
});
const token = validator.sign({
sub: "user-123",
role: "admin",
});
verify()
Verifies and decodes a JWT token.
verify(token: string, options?: VerifyOptions): Promise<JWTPayload>
Throws:
JsonWebTokenError- Invalid token signatureTokenExpiredError- Token has expiredError- Replay attack detected (if enableJtiTracking: true)
Example:
try {
const payload = await validator.verify(token);
console.log("User:", payload.sub);
} catch (error) {
if (error.message.includes("replay")) {
console.error("Replay attack detected");
}
}
Security Features:
- Self-signed tokens are reusable (for refresh flows)
- External tokens are single-use only (replay protection)
- Race condition mitigation with optimized check-then-set pattern
- Algorithm confusion attack prevention
InputSanitizer
Multi-layer input sanitization with path traversal defense.
Constructor:
new InputSanitizer(options?: SanitizerOptions)
Options:
| Parameter | Type | Required | Description |
|---|---|---|---|
throwOnInvalid | boolean | No | Throw on validation failure (default: true) |
maxLength | number | No | Maximum input length (default: 10000) |
allowedTags | string[] | No | Allowed HTML tags (default: []) |
Methods:
sanitize()
Sanitizes general text input against XSS, SQL injection, command injection.
sanitize(input: string): string
Example:
const { InputSanitizer } = require("@devilsdev/rag-pipeline-utils");
const sanitizer = new InputSanitizer({
throwOnInvalid: true,
maxLength: 5000,
});
const safe = sanitizer.sanitize(userInput);
sanitizePath()
Sanitizes file paths with iterative URL decoding to prevent path traversal.
sanitizePath(path: string): string
Throws: Error - Path traversal attempt detected (always throws, even with throwOnInvalid: false)
Example:
try {
const safePath = sanitizer.sanitizePath(userProvidedPath);
// Use safePath
} catch (error) {
console.error("Path traversal blocked:", error.message);
}
Protection Against:
- Standard traversal:
../../../etc/passwd - Windows paths:
..\\..\\windows\\system32 - URL encoded:
%2e%2e%2f,%2e%2e%5c - Double encoded:
%252e%252e%252f - Multi-level encoding (up to 5 iterations)
AI/ML Capabilities
MultiModalProcessor
Process text, images, audio, and video content with unified embedding pipelines.
Constructor:
new MultiModalProcessor(options: MultiModalOptions)
Methods:
async process(content: MultiModalContent): Promise<ProcessedContent>
Example:
const { MultiModalProcessor } = require("@devilsdev/rag-pipeline-utils");
const processor = new MultiModalProcessor({
textModel: "text-embedding-ada-002",
imageModel: "clip-vit-base",
});
const result = await processor.process({
type: "image",
data: imageBuffer,
});
AdaptiveRetrievalEngine
Learning-based relevance optimization using reinforcement learning.
Constructor:
new AdaptiveRetrievalEngine(options: AdaptiveRetrievalOptions)
Methods:
async retrieve(query: string, options?: RetrievalOptions): Promise<Document[]>
async learn(feedback: RelevanceFeedback): Promise<void>
Example:
const { AdaptiveRetrievalEngine } = require("@devilsdev/rag-pipeline-utils");
const engine = new AdaptiveRetrievalEngine({
baseRetriever: myRetriever,
learningRate: 0.01,
});
const docs = await engine.retrieve("machine learning basics");
// Provide feedback for learning
await engine.learn({
query: "machine learning basics",
relevantDocs: [docs[0].id, docs[2].id],
irrelevantDocs: [docs[3].id],
});
Workflow Engine
DAGEngine
Execute complex RAG workflows as directed acyclic graphs.
Constructor:
new DAGEngine(options?: DAGOptions)
Methods:
addNode()
addNode(id: string, task: TaskFunction): void
addEdge()
addEdge(from: string, to: string): void
execute()
async execute(input?: any): Promise<DAGResult>
Example:
const { DAGEngine } = require("@devilsdev/rag-pipeline-utils");
const dag = new DAGEngine();
dag.addNode("load", async () => await loadDocuments());
dag.addNode("embed", async (docs) => await embedDocuments(docs));
dag.addNode("store", async (embeddings) => await storeEmbeddings(embeddings));
dag.addEdge("load", "embed");
dag.addEdge("embed", "store");
const result = await dag.execute();
Observability
metrics
Global metrics collection for monitoring pipeline performance.
Methods:
metrics.counter(name: string, value?: number, tags?: object): void
metrics.gauge(name: string, value: number, tags?: object): void
metrics.histogram(name: string, value: number, tags?: object): void
metrics.timing(name: string, duration: number, tags?: object): void
Example:
const { metrics } = require("@devilsdev/rag-pipeline-utils");
metrics.counter("pipeline.queries");
metrics.timing("retrieval.latency", 150);
metrics.gauge("cache.hit_rate", 0.85);
eventLogger
Structured event logging for audit trails.
Methods:
eventLogger.log(event: Event): void
eventLogger.query(filters: EventFilters): Promise<Event[]>
Example:
const { eventLogger } = require("@devilsdev/rag-pipeline-utils");
eventLogger.log({
type: "security.jwt_validation",
level: "warn",
message: "Token replay detected",
metadata: { jti: "abc123", userId: "user-456" },
});
Enterprise Features
AuditLogger
Immutable audit logging for compliance requirements.
Constructor:
new AuditLogger(options: AuditLoggerOptions)
Example:
const { AuditLogger } = require("@devilsdev/rag-pipeline-utils");
const auditLogger = new AuditLogger({
backend: "s3",
bucket: "compliance-logs",
encryption: true,
});
await auditLogger.log({
action: "document.access",
actor: "user-123",
resource: "confidential-doc-456",
timestamp: new Date().toISOString(),
});
DataGovernance
Multi-tenant data isolation and resource quotas.
Constructor:
new DataGovernance(options: DataGovernanceOptions)
Example:
const { DataGovernance } = require("@devilsdev/rag-pipeline-utils");
const governance = new DataGovernance({
tenantIdField: "orgId",
quotas: {
"org-123": { maxDocuments: 10000, maxStorage: "5GB" },
},
});
Development Tools
HotReloadManager
Enable hot module reloading during development.
Constructor:
new HotReloadManager(options: HotReloadOptions)
Example:
const { createHotReloadManager } = require("@devilsdev/rag-pipeline-utils");
const hotReload = createHotReloadManager({
watchPaths: ["./plugins/**/*.js"],
onReload: (module) => console.log("Reloaded:", module),
});
await hotReload.start();
DevServer
Development server with real-time debugging.
Constructor:
new DevServer(options: DevServerOptions)
Example:
const { createDevServer } = require("@devilsdev/rag-pipeline-utils");
const server = createDevServer({
port: 3000,
pipeline: myPipeline,
enableDebugger: true,
});
await server.start();
// Access dashboard at http://localhost:3000
Utilities
logger
Structured logging utility with multiple transport support.
Methods:
logger.debug(message: string, meta?: object): void
logger.info(message: string, meta?: object): void
logger.warn(message: string, meta?: object): void
logger.error(message: string, meta?: object): void
Example:
const { logger } = require("@devilsdev/rag-pipeline-utils");
logger.info("Pipeline initialized", {
plugins: ["loader", "embedder", "retriever"],
});
Error Handling
createError:
createError(
code: ErrorCode,
message: string,
details?: object
): RagError
wrapError:
wrapError(
error: Error,
context: string
): RagError
ERROR_CODES:
PLUGIN_NOT_FOUNDINVALID_CONFIGVALIDATION_ERRORNETWORK_ERRORTIMEOUT_ERRORSECURITY_ERROR
Example:
const {
createError,
wrapError,
ERROR_CODES,
} = require("@devilsdev/rag-pipeline-utils");
throw createError(
ERROR_CODES.PLUGIN_NOT_FOUND,
"Embedder plugin not registered",
{ pluginName: "custom-embedder" },
);
try {
await riskyOperation();
} catch (err) {
throw wrapError(err, "Pipeline execution");
}
Plugin System
pluginRegistry
Global plugin registry for managing pipeline components.
Methods:
register()
register(
type: PluginType,
name: string,
plugin: Plugin
): void
get()
get(
type: PluginType,
name: string
): Plugin
list()
list(type?: PluginType): string[]
Plugin Types:
'loader'- Document loaders'embedder'- Embedding generators'retriever'- Document retrievers'llm'- Language models'reranker'- Result rerankers
Example:
const { pluginRegistry } = require("@devilsdev/rag-pipeline-utils");
pluginRegistry.register("embedder", "my-embedder", new MyEmbedder());
const embedder = pluginRegistry.get("embedder", "my-embedder");
console.log(pluginRegistry.list("embedder"));
// ['my-embedder', 'openai', 'cohere', ...]
Performance
ParallelProcessor
Process documents in parallel with configurable concurrency.
Constructor:
new ParallelProcessor(options: ParallelProcessorOptions)
Methods:
async process(
items: T[],
handler: (item: T) => Promise<R>
): Promise<R[]>
Example:
const { ParallelProcessor } = require("@devilsdev/rag-pipeline-utils");
const processor = new ParallelProcessor({
concurrency: 10,
retryAttempts: 3,
});
const results = await processor.process(
documents,
async (doc) => await embedDocument(doc),
);
Type Definitions
Plugin Contracts
Loader:
interface Loader {
load(source: string): Promise<Document[]>;
}
Embedder:
interface Embedder {
embed(text: string): Promise<number[]>;
embedBatch?(texts: string[]): Promise<number[][]>;
}
Retriever:
interface Retriever {
retrieve(options: RetrieveOptions): Promise<Document[]>;
interface RetrieveOptions {
query?: string;
queryVector?: number[];
topK?: number;
}
}
LLM:
interface LLM {
generate(
query: string,
context: Document[],
options?: GenerateOptions,
): Promise<string>;
generateStream?(
query: string,
context: Document[],
options?: GenerateOptions,
): AsyncIterable<StreamChunk>;
}
Reranker:
interface Reranker {
rerank(
query: string,
documents: Document[],
options?: RerankOptions,
): Promise<Document[]>;
}
Common Types
Document:
interface Document {
id: string;
content: string;
metadata?: Record<string, any>;
score?: number;
}
StreamChunk:
interface StreamChunk {
token: string;
done: boolean;
metadata?: Record<string, any>;
}
Version Information
Current version: 2.3.1
Supported Node.js versions:
- Node.js 18.x
- Node.js 20.x
- Node.js 22.x