Documentation

Build LLM-powered agents
with production-ready TypeScript

DSPy for TypeScript. Working with LLMs is complexβ€”they don't always do what you want. DSPy makes it easier to build amazing things with LLMs. Just define your inputs and outputs (signature) and an efficient prompt is auto-generated and used. Connect together various signatures to build complex systems and workflows using LLMs.

15+ LLM Providers
End-to-end Streaming
Auto Prompt Tuning

Advanced RAG with AxFlow: axRAG

axRAG is a powerful, production-ready RAG (Retrieval-Augmented Generation) implementation built on AxFlow that provides advanced multi-hop retrieval, self-healing quality loops, and intelligent query refinement.

Key Features

Basic Usage

import { AxAI, axRAG } from "@ax-llm/ax";

const llm = ai({
  name: "openai",
  apiKey: process.env.OPENAI_APIKEY,
});

// Your vector database query function
const queryVectorDB = async (query: string): Promise<string> => {
  // Connect to your vector database (Pinecone, Weaviate, etc.)
  // Return relevant context for the query
  return await yourVectorDB.query(query);
};

// Create a powerful RAG pipeline
const rag = axRAG(queryVectorDB, {
  maxHops: 3,           // Maximum retrieval iterations
  qualityThreshold: 0.8, // Quality score threshold (0-1)
  maxIterations: 2,      // Max parallel sub-query iterations
  qualityTarget: 0.85,   // Target quality for healing loops
  debug: true           // Enable detailed logging
});

// Execute RAG with complex question
const result = await rag.forward(llm, {
  originalQuestion: "How do machine learning algorithms impact privacy in financial services?"
});

console.log("Answer:", result.finalAnswer);
console.log("Quality:", result.qualityAchieved);
console.log("Sources:", result.retrievedContexts.length);
console.log("Hops:", result.totalHops);
console.log("Healing attempts:", result.healingAttempts);

Advanced Configuration

// Production-ready RAG with full configuration
const advancedRAG = axRAG(queryVectorDB, {
  // Multi-hop retrieval settings
  maxHops: 4,                    // More thorough retrieval
  qualityThreshold: 0.75,        // Lower threshold for faster execution
  
  // Parallel processing settings  
  maxIterations: 3,              // More sub-query iterations
  
  // Self-healing settings
  qualityTarget: 0.9,            // Higher quality target
  disableQualityHealing: false,  // Enable healing loops
  
  // Debug and monitoring
  debug: true,                   // Detailed execution logging
  logger: customLogger,          // Custom logging function
});

// Execute with complex research query
const result = await advancedRAG.forward(llm, {
  originalQuestion: "What are the latest developments in quantum computing for cryptography, including both opportunities and security risks?"
});

RAG Pipeline Architecture

The axRAG implementation uses a sophisticated 4-phase approach:

Phase 1: Multi-hop Context Retrieval

Phase 2: Parallel Sub-query Processing

Phase 3: Answer Generation

Phase 4: Self-healing Quality Loops

AxFlow Pipeline Implementation

The axRAG implementation showcases the power of AxFlow to build complex, real-world LLM-powered pipelines that solve sophisticated problems. Below is the commented AxFlow pipeline code that demonstrates how intricate multi-hop RAG systems can be elegantly constructed using AxFlow’s declarative approach:

export const axRAG = (
  queryFn: (query: string) => Promise<string>,
  options?: {
    maxHops?: number;
    qualityThreshold?: number;
    maxIterations?: number;
    qualityTarget?: number;
    disableQualityHealing?: boolean;
    logger?: AxFlowLoggerFunction;
    debug?: boolean;
  }
) => {
  // Extract configuration with sensible defaults
  const maxHops = options?.maxHops ?? 3;
  const qualityThreshold = options?.qualityThreshold ?? 0.8;
  const maxIterations = options?.maxIterations ?? 2;
  const qualityTarget = options?.qualityTarget ?? 0.85;
  const disableQualityHealing = options?.disableQualityHealing ?? false;

  return (
    new AxFlow<
      { originalQuestion: string },
      {
        finalAnswer: string;
        totalHops: number;
        retrievedContexts: string[];
        iterationCount: number;
        healingAttempts: number;
        qualityAchieved: number;
      }
    >({
      logger: options?.logger,
      debug: options?.debug,
    })
      // πŸ—οΈ STEP 1: Define AI-powered processing nodes
      // Each node represents a specialized AI task with typed inputs/outputs
      .node(
        'queryGenerator',
        'originalQuestion:string, previousContext?:string -> searchQuery:string, queryReasoning:string'
      )
      .node(
        'contextualizer',
        'retrievedDocument:string, accumulatedContext?:string -> enhancedContext:string'
      )
      .node(
        'qualityAssessor',
        'currentContext:string, originalQuestion:string -> completenessScore:number, missingAspects:string[]'
      )
      .node(
        'questionDecomposer',
        'complexQuestion:string -> subQuestions:string[], decompositionReason:string'
      )
      .node(
        'evidenceSynthesizer',
        'collectedEvidence:string[], originalQuestion:string -> synthesizedEvidence:string, evidenceGaps:string[]'
      )
      .node(
        'gapAnalyzer',
        'synthesizedEvidence:string, evidenceGaps:string[], originalQuestion:string -> needsMoreInfo:boolean, focusedQueries:string[]'
      )
      .node(
        'answerGenerator',
        'finalContext:string, originalQuestion:string -> comprehensiveAnswer:string, confidenceLevel:number'
      )
      .node(
        'queryRefiner',
        'originalQuestion:string, currentContext:string, missingAspects:string[] -> refinedQuery:string'
      )
      .node(
        'qualityValidator',
        'generatedAnswer:string, userQuery:string -> qualityScore:number, issues:string[]'
      )
      .node(
        'answerHealer',
        'originalAnswer:string, healingDocument:string, issues?:string[] -> healedAnswer:string'
      )

      // πŸš€ STEP 2: Initialize comprehensive pipeline state
      // AxFlow maintains this state throughout the entire pipeline execution
      .map((state) => ({
        ...state,
        maxHops,
        qualityThreshold,
        maxIterations,
        qualityTarget,
        disableQualityHealing,
        currentHop: 0,
        accumulatedContext: '',
        retrievedContexts: [] as string[],
        completenessScore: 0,
        searchQuery: state.originalQuestion,
        shouldContinue: true,
        iteration: 0,
        allEvidence: [] as string[],
        evidenceSources: [] as string[],
        needsMoreInfo: true,
        healingAttempts: 0,
        currentQuality: 0,
        shouldContinueHealing: true,
        currentAnswer: '',
        currentIssues: [] as string[],
      }))

      // πŸ”„ PHASE 1: Multi-hop Retrieval with Iterative Refinement
      // AxFlow's .while() enables sophisticated looping with dynamic conditions
      .while(
        (state) =>
          state.currentHop < state.maxHops &&
          state.completenessScore < state.qualityThreshold &&
          state.shouldContinue
      )
      
      // Increment hop counter for each iteration
      .map((state) => ({
        ...state,
        currentHop: state.currentHop + 1,
      }))

      // 🧠 Generate intelligent search query using AI
      .execute('queryGenerator', (state) => ({
        originalQuestion: state.originalQuestion,
        previousContext: state.accumulatedContext || undefined,
      }))

      // πŸ“š Execute vector database retrieval using provided queryFn
      .map(async (state) => {
        const searchQuery = state.queryGeneratorResult.searchQuery as string;
        const retrievedDocument = await queryFn(searchQuery);
        return {
          ...state,
          retrievalResult: {
            retrievedDocument,
            retrievalConfidence: 0.9,
          },
        };
      })

      // πŸ”— Contextualize retrieved document with existing knowledge
      .execute('contextualizer', (state) => ({
        retrievedDocument: state.retrievalResult.retrievedDocument,
        accumulatedContext: state.accumulatedContext || undefined,
      }))

      // πŸ“Š Assess quality and completeness of current context
      .execute('qualityAssessor', (state) => ({
        currentContext: state.contextualizerResult.enhancedContext,
        originalQuestion: state.originalQuestion,
      }))

      // πŸ“ˆ Update state with enhanced context and quality metrics
      .map((state) => ({
        ...state,
        accumulatedContext: state.contextualizerResult.enhancedContext,
        retrievedContexts: [
          ...state.retrievedContexts,
          state.retrievalResult.retrievedDocument,
        ],
        completenessScore: state.qualityAssessorResult.completenessScore as number,
        searchQuery: state.queryGeneratorResult.searchQuery as string,
        shouldContinue:
          (state.qualityAssessorResult.completenessScore as number) < state.qualityThreshold,
      }))

      // 🎯 Conditional query refinement using AxFlow's branching
      .branch(
        (state) => state.shouldContinue && state.currentHop < state.maxHops
      )
      .when(true)
      .execute('queryRefiner', (state) => ({
        originalQuestion: state.originalQuestion,
        currentContext: state.accumulatedContext,
        missingAspects: state.qualityAssessorResult.missingAspects,
      }))
      .map((state) => ({
        ...state,
        searchQuery: state.queryRefinerResult?.refinedQuery || state.searchQuery,
      }))
      .when(false)
      .map((state) => state) // No refinement needed
      .merge()

      .endWhile()

      // ⚑ PHASE 2: Advanced Parallel Sub-query Processing
      // Initialize evidence collection from Phase 1 results
      .map((state) => ({
        ...state,
        allEvidence: state.retrievedContexts.length > 0 ? state.retrievedContexts : [],
      }))

      // πŸ”„ Iterative sub-query processing loop
      .while(
        (state) => state.iteration < state.maxIterations && state.needsMoreInfo
      )
      .map((state) => ({
        ...state,
        iteration: state.iteration + 1,
      }))

      // 🧩 Question decomposition for first iteration
      .branch((state) => state.iteration === 1)
      .when(true)
      .execute('questionDecomposer', (state) => ({
        complexQuestion: state.originalQuestion,
      }))
      .map((state) => ({
        ...state,
        currentQueries: state.questionDecomposerResult.subQuestions,
      }))
      .when(false)
      // Use focused queries from gap analysis for subsequent iterations
      .map((state) => ({
        ...state,
        currentQueries: ((state as any).gapAnalyzerResult?.focusedQueries as string[]) || [],
      }))
      .merge()

      // πŸš€ Parallel retrieval execution for multiple queries
      .map(async (state) => {
        const queries = state.currentQueries || [];
        const retrievalResults =
          queries.length > 0
            ? await Promise.all(queries.map((query: string) => queryFn(query)))
            : [];
        return {
          ...state,
          retrievalResults,
        };
      })

      // πŸ§ͺ Synthesize evidence from multiple sources
      .execute('evidenceSynthesizer', (state) => {
        const evidence = [
          ...(state.allEvidence || []),
          ...(state.retrievalResults || []),
        ].filter(Boolean);

        return {
          collectedEvidence: evidence.length > 0 ? evidence : ['No evidence collected yet'],
          originalQuestion: state.originalQuestion,
        };
      })

      // πŸ” Analyze information gaps and determine next steps
      .execute('gapAnalyzer', (state) => ({
        synthesizedEvidence: state.evidenceSynthesizerResult.synthesizedEvidence,
        evidenceGaps: state.evidenceSynthesizerResult.evidenceGaps,
        originalQuestion: state.originalQuestion,
      }))

      // πŸ“Š Update state with synthesized evidence and gap analysis
      .map((state) => ({
        ...state,
        allEvidence: [...state.allEvidence, ...state.retrievalResults],
        evidenceSources: [...state.evidenceSources, `Iteration ${state.iteration} sources`],
        needsMoreInfo: state.gapAnalyzerResult.needsMoreInfo,
        synthesizedEvidence: state.evidenceSynthesizerResult.synthesizedEvidence,
      }))

      .endWhile()

      // πŸ“ PHASE 3: Generate comprehensive initial answer
      .execute('answerGenerator', (state) => ({
        finalContext:
          state.accumulatedContext ||
          state.synthesizedEvidence ||
          state.allEvidence.join('\n'),
        originalQuestion: state.originalQuestion,
      }))

      // πŸ₯ PHASE 4: Self-healing Quality Validation and Improvement
      // Conditional quality healing based on configuration
      .branch((state) => !state.disableQualityHealing)
      .when(true)
      
      // Validate initial answer quality
      .execute('qualityValidator', (state) => ({
        generatedAnswer: state.answerGeneratorResult.comprehensiveAnswer,
        userQuery: state.originalQuestion,
      }))
      .map((state) => ({
        ...state,
        currentAnswer: state.answerGeneratorResult.comprehensiveAnswer as string,
        currentQuality: state.qualityValidatorResult.qualityScore as number,
        currentIssues: state.qualityValidatorResult.issues as string[],
        shouldContinueHealing:
          (state.qualityValidatorResult.qualityScore as number) < state.qualityTarget,
      }))

      // πŸ”„ Healing loop for iterative quality improvement
      .while(
        (state) => state.healingAttempts < 3 && state.shouldContinueHealing
      )
      .map((state) => ({
        ...state,
        healingAttempts: state.healingAttempts + 1,
      }))

      // 🩹 Retrieve healing context to address specific issues
      .map(async (state) => {
        const healingQuery = `${state.originalQuestion} addressing issues: ${(state.currentIssues as string[])?.join(', ') || 'quality improvement'}`;
        const healingDocument = await queryFn(healingQuery);
        return {
          ...state,
          healingResult: { healingDocument },
        };
      })

      // πŸ”§ Apply healing to improve answer quality
      .execute('answerHealer', (state) => ({
        originalAnswer: state.currentAnswer,
        healingDocument: state.healingResult.healingDocument,
        issues: state.currentIssues,
      }))

      // βœ… Re-validate after healing application
      .execute('qualityValidator', (state) => ({
        generatedAnswer: state.answerHealerResult.healedAnswer,
        userQuery: state.originalQuestion,
      }))
      .map((state) => ({
        ...state,
        currentAnswer: state.answerHealerResult.healedAnswer as string,
        currentQuality: state.qualityValidatorResult.qualityScore as number,
        currentIssues: state.qualityValidatorResult.issues as string[],
        shouldContinueHealing:
          (state.qualityValidatorResult.qualityScore as number) < state.qualityTarget,
      }))

      .endWhile()
      .when(false)
      // Skip quality healing - use answer directly from Phase 3
      .map((state) => ({
        ...state,
        currentAnswer: state.answerGeneratorResult.comprehensiveAnswer,
        currentQuality: 1.0, // Assume perfect quality when disabled
        currentIssues: [] as string[],
        shouldContinueHealing: false,
      }))
      .merge()

      // 🎯 Final output transformation
      .map((state) => ({
        finalAnswer: state.currentAnswer,
        totalHops: state.currentHop,
        retrievedContexts: state.retrievedContexts,
        iterationCount: state.iteration,
        healingAttempts: state.healingAttempts,
        qualityAchieved: state.currentQuality,
      }))
  );
};

🌟 Why This Demonstrates AxFlow’s Power

This axRAG implementation showcases AxFlow’s unique capabilities for building enterprise-grade LLM pipelines:

πŸ—οΈ Complex State Management: AxFlow seamlessly manages complex state transformations across 20+ pipeline steps, handling async operations, branching logic, and iterative loops without losing state consistency.

πŸ”„ Advanced Control Flow: The pipeline uses AxFlow’s .while(), .branch(), .when(), and .merge() operators to implement sophisticated control flow that would be complex and error-prone with traditional code.

⚑ Automatic Parallelization: AxFlow automatically parallelizes operations where possible, such as the parallel sub-query processing in Phase 2, maximizing performance without manual coordination.

🧠 AI-Native Design: Each .node() defines an AI task with typed signatures, making the pipeline self-documenting and enabling automatic prompt optimization and validation.

πŸ›‘οΈ Production Reliability: Built-in error handling, retry logic, state recovery, and comprehensive logging make this production-ready for real-world applications.

πŸ“Š Observability: The debug mode and logging capabilities provide complete visibility into the pipeline execution, essential for debugging and optimization.

This level of sophisticationβ€”multi-hop reasoning, self-healing quality loops, parallel processing, and intelligent branchingβ€”demonstrates how AxFlow enables developers to build the kinds of advanced LLM systems that solve real-world problems with enterprise reliability and maintainability.

Debug Mode Visualization

Enable debug: true to see the complete RAG pipeline execution:

πŸ”„ [ AXFLOW START ]
Input Fields: originalQuestion
Total Steps: 18
═══════════════════════════════════════

⚑ [ STEP 3 EXECUTE ] Node: contextualizer in 1.62s
New Fields: contextualizerResult
Result: {
  "enhancedContext": "Machine learning in financial services raises privacy concerns through data collection, algorithmic bias, and potential for discrimination. Regulations like GDPR require explicit consent and data protection measures."
}
────────────────────────────────────────

⚑ [ STEP 9 EXECUTE ] Node: evidenceSynthesizer in 1.42s  
New Fields: evidenceSynthesizerResult
Result: {
  "synthesizedEvidence": "Comprehensive analysis of ML privacy implications including regulatory compliance, bias prevention, and consumer protection measures.",
  "evidenceGaps": ["Technical implementation details", "Industry-specific case studies"]
}
────────────────────────────────────────

βœ… [ AXFLOW COMPLETE ]
Total Time: 12.49s
Steps Executed: 15
═══════════════════════════════════════

Integration with Vector Databases

// Weaviate integration
import { axDB } from "@ax-llm/ax";

const weaviateDB = new axDB("weaviate", {
  url: "http://localhost:8080",
});

const queryWeaviate = async (query: string): Promise<string> => {
  const embedding = await llm.embed({ texts: [query] });
  const results = await weaviateDB.query({
    table: "documents",
    values: embedding.embeddings[0],
    limit: 5,
  });
  return results.map(r => r.metadata.text).join('\n');
};

// Pinecone integration  
import { axDB } from "@ax-llm/ax";

const pineconeDB = new axDB("pinecone", {
  apiKey: process.env.PINECONE_API_KEY,
  environment: "us-west1-gcp",
});

const queryPinecone = async (query: string): Promise<string> => {
  const embedding = await llm.embed({ texts: [query] });
  const results = await pineconeDB.query({
    table: "knowledge-base",
    values: embedding.embeddings[0],
    topK: 10,
  });
  return results.matches.map(m => m.metadata.content).join('\n');
};

Performance Optimization

// Optimized for speed
const fastRAG = axRAG(queryFn, {
  maxHops: 2,              // Fewer hops for speed
  qualityThreshold: 0.7,   // Lower quality threshold
  maxIterations: 1,        // Single iteration
  disableQualityHealing: true, // Skip healing for speed
});

// Optimized for quality
const qualityRAG = axRAG(queryFn, {
  maxHops: 5,              // Thorough retrieval
  qualityThreshold: 0.9,   // High quality threshold  
  maxIterations: 3,        // Multiple iterations
  qualityTarget: 0.95,     // Very high healing target
  disableQualityHealing: false,
});

// Balanced configuration
const balancedRAG = axRAG(queryFn, {
  maxHops: 3,
  qualityThreshold: 0.8,
  maxIterations: 2,
  qualityTarget: 0.85,
});

Simple RAG Alternative

For basic use cases, axRAG also provides axSimpleRAG:

import { axSimpleRAG } from "@ax-llm/ax";

// Simple single-hop RAG
const simpleRAG = axSimpleRAG(queryVectorDB);

const result = await simpleRAG.forward(llm, {
  question: "What is renewable energy?"
});

console.log("Answer:", result.answer);
console.log("Context:", result.context);

Why axRAG is Powerful

πŸš€ Production-Ready Architecture:

🧠 Advanced Intelligence:

πŸ”§ Enterprise Features:

⚑ Performance Optimized:

β€œaxRAG doesn’t just retrieve and generateβ€”it thinks, analyzes, and iteratively improves to deliver the highest quality answers possible”

The axRAG function represents the future of RAG systems: intelligent, self-improving, and production-ready with enterprise-grade reliability built on AxFlow’s powerful orchestration capabilities.