Performance Optimization Guide
This comprehensive performance guide helps you optimize @DevilsDev/rag-pipeline-utils for maximum throughput, minimal latency, and efficient resource utilization. From embedding generation to vector retrieval and LLM inference, this guide covers optimization strategies for every component.
🚀 Performance Overview​
Key Performance Metrics​
- Throughput: Documents processed per second
- Latency: Time from query to response
- Memory Usage: RAM consumption during processing
- Token Efficiency: Cost optimization for LLM usage
- Concurrent Processing: Parallel operation capabilities
Performance Monitoring​
# Enable performance monitoring
rag-pipeline query "test" --benchmark --stats
# Comprehensive performance analysis
rag-pipeline benchmark --comprehensive --output perf-report.json
# Real-time monitoring
rag-pipeline monitor --metrics throughput,latency,memory --interval 5s
📊 Component-Specific Optimization​
1. Embedding Performance​
Batch Processing Optimization:
import { createRagPipeline } from '@DevilsDev/rag-pipeline-utils';
const pipeline = createRagPipeline({
embedder: {
name: 'openai',
config: {
batchSize: 100, // Process 100 texts at once
maxConcurrency: 5, // 5 parallel API calls
timeout: 30000, // 30 second timeout
retryAttempts: 3, // Retry failed requests
cacheEnabled: true, // Cache embeddings
cacheSize: 10000 // Cache up to 10k embeddings
}
}
});
Parallel Embedding Strategy:
// Custom parallel embedding implementation
class OptimizedEmbedder extends BaseEmbedder {
constructor(options = {}) {
super();
this.concurrencyLimit = options.concurrency || 5;
this.batchSize = options.batchSize || 50;
this.cache = new LRUCache({ max: options.cacheSize || 5000 });
}
async embedBatch(texts, options = {}) {
// Filter cached texts
const uncachedTexts = texts.filter(text => !this.cache.has(text));
if (uncachedTexts.length === 0) {
return texts.map(text => this.cache.get(text));
}
// Create batches for parallel processing
const batches = this.createBatches(uncachedTexts, this.batchSize);
// Process batches with concurrency limit
const results = await this.processBatchesParallel(batches, {
concurrency: this.concurrencyLimit,
retryOnFailure: true
});
// Cache results
results.forEach((embedding, index) => {
this.cache.set(uncachedTexts[index], embedding);
});
return texts.map(text => this.cache.get(text));
}
async processBatchesParallel(batches, options) {
const semaphore = new Semaphore(options.concurrency);
return Promise.all(
batches.map(async (batch) => {
await semaphore.acquire();
try {
return await this.processBatch(batch);
} finally {
semaphore.release();
}
})
);
}
}
Embedding Benchmarks:
# Benchmark different embedding strategies
rag-pipeline benchmark embedder \
--texts 1000 \
--batch-sizes 10,50,100,200 \
--concurrency-levels 1,3,5,10 \
--output embedding-benchmark.json
2. Vector Retrieval Optimization​
Index Configuration:
const pipeline = createRagPipeline({
retriever: {
name: 'pinecone',
config: {
indexType: 'approximateSearch', // Faster but less precise
dimensions: 1536,
metric: 'cosine',
pods: 1,
replicas: 1,
podType: 'p1.x1', // Optimize for your workload
// Query optimization
topK: 5, // Limit results
includeMetadata: false, // Reduce payload size
includeValues: false, // Reduce payload size
// Connection pooling
maxConnections: 10,
keepAlive: true,
timeout: 5000
}
}
});
Query Optimization:
// Optimized retrieval with filtering
async function optimizedRetrieve(query, options = {}) {
const queryVector = await this.embedder.embed(query);
return await this.retriever.retrieve(queryVector, {
topK: options.topK || 5,
filter: options.filter, // Pre-filter to reduce search space
includeMetadata: options.includeMetadata || false,
namespace: options.namespace, // Use namespaces for isolation
// Performance optimizations
approximateSearch: true, // Trade accuracy for speed
searchTimeout: 2000, // 2 second timeout
maxRetries: 2
});
}
Vector Store Benchmarks:
# Compare vector store performance
rag-pipeline benchmark retriever \
--stores pinecone,chroma,weaviate \
--queries 500 \
--concurrent 10 \
--metrics latency,throughput,accuracy
3. LLM Generation Optimization​
Model Selection Strategy:
// Choose optimal model for use case
const modelConfigs = {
'fast-responses': {
name: 'openai-gpt-3.5-turbo',
config: {
maxTokens: 500,
temperature: 0.3,
topP: 0.9,
frequencyPenalty: 0.1
}
},
'high-quality': {
name: 'openai-gpt-4',
config: {
maxTokens: 1500,
temperature: 0.7,
topP: 0.95
}
},
'cost-optimized': {
name: 'openai-gpt-3.5-turbo',
config: {
maxTokens: 300,
temperature: 0.1,
stop: ['\n\n', '###']
}
}
};
Streaming Optimization:
// Implement streaming for better perceived performance
async function* optimizedGenerate(prompt, options = {}) {
const stream = await this.llm.generateStream(prompt, {
...options,
bufferSize: 10, // Buffer tokens for smoother streaming
flushInterval: 50, // Flush every 50ms
earlyStop: true // Stop on complete sentences
});
let buffer = '';
for await (const token of stream) {
buffer += token;
// Flush on sentence boundaries for better UX
if (buffer.match(/[.!?]\s/)) {
yield buffer;
buffer = '';
}
}
if (buffer) yield buffer;
}
Token Usage Optimization:
// Optimize prompts for token efficiency
class TokenOptimizer {
constructor(tokenizer) {
this.tokenizer = tokenizer;
this.maxContextTokens = 4096;
this.maxResponseTokens = 1000;
}
optimizePrompt(context, query) {
const basePrompt = `Context: {context}\n\nQuestion: ${query}\n\nAnswer:`;
const baseTokens = this.tokenizer.encode(basePrompt.replace('{context}', '')).length;
const availableTokens = this.maxContextTokens - baseTokens - this.maxResponseTokens;
// Truncate context to fit token limit
const optimizedContext = this.truncateContext(context, availableTokens);
return basePrompt.replace('{context}', optimizedContext);
}
truncateContext(context, maxTokens) {
const sentences = context.split(/[.!?]+/);
let truncated = '';
let tokenCount = 0;
for (const sentence of sentences) {
const sentenceTokens = this.tokenizer.encode(sentence).length;
if (tokenCount + sentenceTokens > maxTokens) break;
truncated += sentence + '.';
tokenCount += sentenceTokens;
}
return truncated;
}
}
🔄 Pipeline-Level Optimization​
Streaming Architecture​
// Implement full pipeline streaming
class StreamingPipeline {
constructor(config) {
this.config = config;
this.bufferSize = config.bufferSize || 1000;
this.concurrency = config.concurrency || 3;
}
async* processStream(documents) {
const documentStream = this.createDocumentStream(documents);
const chunkStream = this.createChunkStream(documentStream);
const embeddingStream = this.createEmbeddingStream(chunkStream);
for await (const batch of embeddingStream) {
// Process embeddings in parallel
const results = await Promise.allSettled(
batch.map(item => this.processEmbedding(item))
);
yield results.filter(r => r.status === 'fulfilled').map(r => r.value);
}
}
async* createChunkStream(documentStream) {
let buffer = [];
for await (const document of documentStream) {
const chunks = await this.chunkDocument(document);
buffer.push(...chunks);
if (buffer.length >= this.bufferSize) {
yield buffer.splice(0, this.bufferSize);
}
}
if (buffer.length > 0) yield buffer;
}
}
Memory Management​
// Implement memory-efficient processing
class MemoryOptimizedPipeline {
constructor(options = {}) {
this.maxMemoryUsage = options.maxMemory || '2GB';
this.gcThreshold = options.gcThreshold || 0.8;
this.batchSize = options.batchSize || 100;
}
async processLargeDataset(documents) {
const batches = this.createBatches(documents, this.batchSize);
for (const batch of batches) {
// Monitor memory usage
const memoryUsage = process.memoryUsage();
const memoryPercent = memoryUsage.heapUsed / memoryUsage.heapTotal;
if (memoryPercent > this.gcThreshold) {
// Force garbage collection
if (global.gc) global.gc();
// Reduce batch size if memory pressure continues
if (memoryPercent > 0.9) {
this.batchSize = Math.max(10, Math.floor(this.batchSize * 0.8));
}
}
await this.processBatch(batch);
// Clear batch references
batch.length = 0;
}
}
}
Caching Strategy​
// Multi-level caching implementation
class CacheManager {
constructor(options = {}) {
// L1: In-memory cache (fastest)
this.l1Cache = new LRUCache({
max: options.l1Size || 1000,
ttl: options.l1TTL || 300000 // 5 minutes
});
// L2: Redis cache (shared across instances)
this.l2Cache = new RedisCache({
host: options.redisHost,
ttl: options.l2TTL || 3600000 // 1 hour
});
// L3: Disk cache (persistent)
this.l3Cache = new DiskCache({
directory: options.cacheDir || './cache',
maxSize: options.l3Size || '1GB'
});
}
async get(key) {
// Try L1 first
let value = this.l1Cache.get(key);
if (value) return value;
// Try L2
value = await this.l2Cache.get(key);
if (value) {
this.l1Cache.set(key, value);
return value;
}
// Try L3
value = await this.l3Cache.get(key);
if (value) {
this.l1Cache.set(key, value);
this.l2Cache.set(key, value);
return value;
}
return null;
}
async set(key, value) {
this.l1Cache.set(key, value);
await this.l2Cache.set(key, value);
await this.l3Cache.set(key, value);
}
}
📈 Monitoring & Profiling​
Performance Metrics Collection​
// Built-in performance monitoring
import { createPerformanceMonitor } from '@DevilsDev/rag-pipeline-utils';
const monitor = createPerformanceMonitor({
metrics: ['throughput', 'latency', 'memory', 'tokens', 'errors'],
interval: 1000, // Collect every second
retention: 3600, // Keep 1 hour of data
alerts: {
highLatency: { threshold: 5000, action: 'log' },
memoryUsage: { threshold: 0.9, action: 'gc' },
errorRate: { threshold: 0.05, action: 'alert' }
}
});
const pipeline = createRagPipeline({
monitor,
// ... other config
});
// Access performance data
const metrics = monitor.getMetrics();
console.log('Average latency:', metrics.latency.average);
console.log('Throughput:', metrics.throughput.current);
Profiling Tools​
# Profile memory usage
node --inspect --max-old-space-size=4096 rag-pipeline ingest ./large-dataset
# Profile CPU usage
rag-pipeline benchmark --profile-cpu --output cpu-profile.json
# Generate performance report
rag-pipeline analyze-performance \
--input performance-logs.json \
--output performance-report.html \
--include-recommendations
Custom Metrics​
// Define custom performance metrics
class CustomMetrics {
constructor() {
this.metrics = new Map();
this.startTimes = new Map();
}
startTimer(operation) {
this.startTimes.set(operation, Date.now());
}
endTimer(operation) {
const startTime = this.startTimes.get(operation);
if (startTime) {
const duration = Date.now() - startTime;
this.recordMetric(operation, duration);
this.startTimes.delete(operation);
}
}
recordMetric(name, value) {
if (!this.metrics.has(name)) {
this.metrics.set(name, []);
}
this.metrics.get(name).push({
value,
timestamp: Date.now()
});
}
getStats(name) {
const values = this.metrics.get(name) || [];
if (values.length === 0) return null;
const nums = values.map(v => v.value);
return {
count: nums.length,
average: nums.reduce((a, b) => a + b, 0) / nums.length,
min: Math.min(...nums),
max: Math.max(...nums),
p95: this.percentile(nums, 0.95),
p99: this.percentile(nums, 0.99)
};
}
}
⚡ Production Optimization​
Deployment Configuration​
# Docker optimization
FROM node:18-alpine
WORKDIR /app
# Optimize Node.js for production
ENV NODE_ENV=production
ENV NODE_OPTIONS="--max-old-space-size=4096 --optimize-for-size"
# Install dependencies
COPY package*.json ./
RUN npm ci --only=production --no-audit
# Copy application
COPY . .
# Optimize startup
CMD ["node", "--experimental-worker", "index.js"]
Load Balancing​
// Horizontal scaling with load balancing
class LoadBalancer {
constructor(instances) {
this.instances = instances;
this.currentIndex = 0;
this.healthChecks = new Map();
}
async getHealthyInstance() {
const startIndex = this.currentIndex;
do {
const instance = this.instances[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.instances.length;
if (await this.isHealthy(instance)) {
return instance;
}
} while (this.currentIndex !== startIndex);
throw new Error('No healthy instances available');
}
async isHealthy(instance) {
try {
const response = await instance.healthCheck();
return response.status === 'healthy';
} catch (error) {
return false;
}
}
}
Auto-scaling Configuration​
// Auto-scaling based on load
class AutoScaler {
constructor(options = {}) {
this.minInstances = options.min || 1;
this.maxInstances = options.max || 10;
this.targetCPU = options.targetCPU || 0.7;
this.scaleUpThreshold = options.scaleUpThreshold || 0.8;
this.scaleDownThreshold = options.scaleDownThreshold || 0.3;
this.instances = [];
}
async monitor() {
const metrics = await this.collectMetrics();
const avgCPU = metrics.cpu.average;
const avgMemory = metrics.memory.average;
if (avgCPU > this.scaleUpThreshold && this.instances.length < this.maxInstances) {
await this.scaleUp();
} else if (avgCPU < this.scaleDownThreshold && this.instances.length > this.minInstances) {
await this.scaleDown();
}
}
async scaleUp() {
const newInstance = await this.createInstance();
this.instances.push(newInstance);
console.log(`Scaled up to ${this.instances.length} instances`);
}
async scaleDown() {
const instance = this.instances.pop();
await this.terminateInstance(instance);
console.log(`Scaled down to ${this.instances.length} instances`);
}
}
🎯 Performance Best Practices​
1. Configuration Optimization​
{
"performance": {
"embedder": {
"batchSize": 100,
"maxConcurrency": 5,
"cacheEnabled": true,
"timeout": 30000
},
"retriever": {
"topK": 5,
"approximateSearch": true,
"connectionPoolSize": 10
},
"llm": {
"maxTokens": 1000,
"temperature": 0.3,
"streaming": true
},
"pipeline": {
"parallelProcessing": true,
"memoryLimit": "2GB",
"gcThreshold": 0.8
}
}
}
2. Resource Management​
- Memory: Use streaming for large datasets
- CPU: Leverage parallel processing
- Network: Implement connection pooling
- Storage: Use appropriate caching strategies
3. Monitoring Checklist​
- Track response latency (target: less than 2s)
- Monitor throughput (documents/second)
- Watch memory usage (keep under 80%)
- Monitor error rates (keep under 1%)
- Track token usage costs
- Set up alerting for anomalies
📊 Performance Benchmarks​
Reference Performance​
Component | Throughput | Latency (P95) | Memory Usage |
---|---|---|---|
PDF Loader | 50 docs/sec | 200ms | 100MB |
OpenAI Embedder | 1000 texts/sec | 500ms | 50MB |
Pinecone Retriever | 500 queries/sec | 100ms | 25MB |
GPT-4 Generation | 10 queries/sec | 3000ms | 75MB |
Optimization Targets​
- Embedding: greater than 500 texts/second
- Retrieval: under 200ms P95 latency
- Generation: under 5s P95 latency
- Memory: under 1GB for 10k documents
- Error Rate: under 0.5%
This performance guide provides comprehensive optimization strategies for @DevilsDev/rag-pipeline-utils. For troubleshooting performance issues, see the Troubleshooting Guide.