Documentation

Build LLM-powered agents
with production-ready TypeScript

DSPy for TypeScript. Working with LLMs is complex—they don't always do what you want. DSPy makes it easier to build amazing things with LLMs. Just define your inputs and outputs (signature) and an efficient prompt is auto-generated and used. Connect together various signatures to build complex systems and workflows using LLMs.

15+ LLM Providers
End-to-end Streaming
Auto Prompt Tuning

AxFlow Documentation

AxFlow is a powerful workflow orchestration system for building complex AI applications with automatic dependency analysis, parallel execution, and flexible control flow patterns.

Table of Contents

Quick Start

Basic Flow

import { ai, AxFlow } from "@ax-llm/ax";

// Create AI instance
const llm = ai({ name: "openai", apiKey: process.env.OPENAI_APIKEY! });

// Create a simple flow
const flow = new AxFlow<{ userInput: string }, { responseText: string }>()
  .node("testNode", "userInput:string -> responseText:string")
  .execute("testNode", (state) => ({ userInput: state.userInput }))
  .map((state) => ({ responseText: state.testNodeResult.responseText }));

// Execute the flow
const result = await flow.forward(llm, { userInput: "Hello world" });
console.log(result.responseText);

Constructor Options

// Basic constructor
const flow = new AxFlow();

// With options
const flow = new AxFlow({ autoParallel: false });

// With explicit typing
const flow = new AxFlow<InputType, OutputType>();

// With options and typing
const flow = new AxFlow<InputType, OutputType>({
  autoParallel: true,
  batchSize: 5,
});

Why AxFlow is the Future

🚀 Automatic Performance Optimization:

🛡️ Production-Ready Resilience:

Compared to Traditional Approaches:

Real-World Superpowers:

“AxFlow doesn’t just execute AI workflows—it orchestrates the future of intelligent systems with automatic performance optimization”

Ready to build the impossible? AxFlow extends AxProgramWithSignature, giving you access to the entire Ax ecosystem: optimization, streaming, tracing, function calling, and more. The future of AI development is declarative, adaptive, and beautiful.

Core Concepts

1. Node Definition

Nodes define the available operations in your flow. You must define nodes before executing them.

// String signature (creates AxGen automatically)
flow.node("processor", "input:string -> output:string");

// With multiple outputs
flow.node("analyzer", "text:string -> sentiment:string, confidence:number");

// Complex field types
flow.node(
  "extractor",
  "documentText:string -> processedResult:string, entities:string[]",
);

2. State Evolution

State grows as you execute nodes, with results stored in {nodeName}Result format:

// Initial state: { userInput: "Hello" }
flow.execute("processor", (state) => ({ input: state.userInput }));
// State becomes: { userInput: "Hello", processorResult: { output: "Processed Hello" } }

flow.execute("analyzer", (state) => ({ text: state.processorResult.output }));
// State becomes: {
//   userInput: "Hello",
//   processorResult: { output: "Processed Hello" },
//   analyzerResult: { sentiment: "positive", confidence: 0.8 }
// }

3. State Transformation

Use map() to transform state between operations:

flow.map((state) => ({
  ...state,
  processedInput: state.userInput.toLowerCase(),
  timestamp: Date.now(),
}));

API Reference

Core Methods

node(name: string, signature: string, options?: object)

Define a node with the given signature.

flow.node("summarizer", "documentText:string -> summary:string");
flow.node("classifier", "text:string -> category:string", { debug: true });

Alias: n() - Short alias for node()

flow.n("summarizer", "documentText:string -> summary:string");

nodeExtended(name: string, baseSignature: string | AxSignature, extensions: object)

Create a node with extended signature by adding additional input/output fields.

// Add chain-of-thought reasoning
flow.nodeExtended("reasoner", "question:string -> answer:string", {
  prependOutputs: [
    { name: "reasoning", type: f.internal(f.string("Step-by-step reasoning")) },
  ],
});

// Add context and confidence
flow.nodeExtended("analyzer", "input:string -> output:string", {
  appendInputs: [{ name: "context", type: f.optional(f.string("Context")) }],
  appendOutputs: [{ name: "confidence", type: f.number("Confidence score") }],
});

Extension Options:

Alias: nx() - Short alias for nodeExtended()

flow.nx("reasoner", "question:string -> answer:string", {
  prependOutputs: [
    { name: "reasoning", type: f.internal(f.string("Step-by-step reasoning")) },
  ],
});

execute(nodeName: string, mapping: Function, options?: object)

Execute a node with input mapping.

flow.execute("summarizer", (state) => ({
  documentText: state.document,
}));

// With AI override
flow.execute("processor", mapping, { ai: alternativeAI });

map(transform: Function)

Transform the current state synchronously or asynchronously.

// Synchronous transformation
flow.map((state) => ({
  ...state,
  upperCaseResult: state.processorResult.output.toUpperCase(),
}));

// Asynchronous transformation
flow.map(async (state) => {
  const apiData = await fetchFromAPI(state.query);
  return {
    ...state,
    enrichedData: apiData,
  };
});

// Parallel asynchronous transformations
flow.map([
  async (state) => ({ ...state, result1: await api1(state.data) }),
  async (state) => ({ ...state, result2: await api2(state.data) }),
  async (state) => ({ ...state, result3: await api3(state.data) }),
], { parallel: true });

Alias: m() - Short alias for map() (supports both sync and async)

// Async with alias
flow.m(async (state) => {
  const processed = await processAsync(state.input);
  return { ...state, processed };
});

returns(transform: Function) / r(transform: Function)

Terminal transformation that sets the final output type of the flow. Use this as the last transformation to get proper TypeScript type inference for the flow result.

const typedFlow = flow<{ input: string }>()
  .map((state) => ({ ...state, processed: true, count: 42 }))
  .returns((state) => ({
    result: state.processed ? "done" : "pending",
    totalCount: state.count,
  })); // TypeScript now properly infers the output type

// Result is typed as { result: string; totalCount: number }
const result = await typedFlow.forward(llm, { input: "test" });
console.log(result.result); // Type-safe access
console.log(result.totalCount); // Type-safe access

Key Benefits:

Aliases:

// These are equivalent:
flow.returns((state) => ({ output: state.value }));
flow.r((state) => ({ output: state.value }));

When to Use:

Control Flow Methods

while(condition: Function) / endWhile()

Create loops that execute while condition is true.

flow
  .map((state) => ({ ...state, counter: 0 }))
  .while((state) => state.counter < 3)
  .map((state) => ({ ...state, counter: state.counter + 1 }))
  .execute("processor", (state) => ({ input: `iteration ${state.counter}` }))
  .endWhile();

branch(predicate: Function) / when(value) / merge()

Conditional branching based on predicate evaluation.

flow
  .branch((state) => state.complexity)
  .when("simple")
  .execute("simpleProcessor", mapping)
  .when("complex")
  .execute("complexProcessor", mapping)
  .merge()
  .map((state) => ({
    result: state.simpleProcessorResult?.output ||
      state.complexProcessorResult?.output,
  }));

parallel(subFlows: Function[]) / merge(key: string, mergeFunction: Function)

Execute multiple sub-flows in parallel.

flow
  .parallel([
    (subFlow) =>
      subFlow.execute("analyzer1", (state) => ({ text: state.input })),
    (subFlow) =>
      subFlow.execute("analyzer2", (state) => ({ text: state.input })),
    (subFlow) =>
      subFlow.execute("analyzer3", (state) => ({ text: state.input })),
  ])
  .merge("combinedResults", (result1, result2, result3) => ({
    analysis1: result1.analyzer1Result.analysis,
    analysis2: result2.analyzer2Result.analysis,
    analysis3: result3.analyzer3Result.analysis,
  }));

label(name: string) / feedback(condition: Function, labelName: string, maxIterations?: number)

Create labeled points for feedback loops.

flow
  .map((state) => ({ ...state, attempts: 0 }))
  .label("retry-point")
  .map((state) => ({ ...state, attempts: state.attempts + 1 }))
  .execute("processor", (state) => ({ input: state.userInput }))
  .execute("validator", (state) => ({ output: state.processorResult.output }))
  .feedback(
    (state) => !state.validatorResult.isValid && state.attempts < 3,
    "retry-point",
  );

Advanced Methods

derive(outputField: string, inputField: string, transform: Function, options?: object)

Create derived fields from array or scalar inputs with parallel processing support.

// Derive from array with parallel processing
flow.derive(
  "processedItems",
  "items",
  (item, index) => `processed-${item}-${index}`,
  {
    batchSize: 2,
  },
);

// Derive from scalar
flow.derive("upperText", "inputText", (text) => text.toUpperCase());

Control Flow Patterns

1. Sequential Processing

const sequentialFlow = new AxFlow<{ input: string }, { finalResult: string }>()
  .node("step1", "input:string -> intermediate:string")
  .node("step2", "intermediate:string -> output:string")
  .execute("step1", (state) => ({ input: state.input }))
  .execute(
    "step2",
    (state) => ({ intermediate: state.step1Result.intermediate }),
  )
  .map((state) => ({ finalResult: state.step2Result.output }));

2. Conditional Processing

const conditionalFlow = new AxFlow<
  { query: string; isComplex: boolean },
  { response: string }
>()
  .node("simpleHandler", "query:string -> response:string")
  .node("complexHandler", "query:string -> response:string")
  .branch((state) => state.isComplex)
  .when(true)
  .execute("complexHandler", (state) => ({ query: state.query }))
  .when(false)
  .execute("simpleHandler", (state) => ({ query: state.query }))
  .merge()
  .map((state) => ({
    response: state.complexHandlerResult?.response ||
      state.simpleHandlerResult?.response,
  }));

3. Iterative Processing

const iterativeFlow = new AxFlow<
  { content: string },
  { finalContent: string }
>()
  .node("processor", "content:string -> processedContent:string")
  .node("qualityChecker", "content:string -> qualityScore:number")
  .map((state) => ({ currentContent: state.content, iteration: 0 }))
  .while((state) => state.iteration < 3 && (state.qualityScore || 0) < 0.8)
  .map((state) => ({ ...state, iteration: state.iteration + 1 }))
  .execute("processor", (state) => ({ content: state.currentContent }))
  .execute(
    "qualityChecker",
    (state) => ({ content: state.processorResult.processedContent }),
  )
  .map((state) => ({
    ...state,
    currentContent: state.processorResult.processedContent,
    qualityScore: state.qualityCheckerResult.qualityScore,
  }))
  .endWhile()
  .map((state) => ({ finalContent: state.currentContent }));

4. Parallel Processing with Auto-Parallelization

AxFlow automatically detects independent operations and runs them in parallel:

const autoParallelFlow = new AxFlow<
  { text: string },
  { combinedAnalysis: string }
>()
  .node("sentimentAnalyzer", "text:string -> sentiment:string")
  .node("topicExtractor", "text:string -> topics:string[]")
  .node("entityRecognizer", "text:string -> entities:string[]")
  // These three execute automatically in parallel! ⚡
  .execute("sentimentAnalyzer", (state) => ({ text: state.text }))
  .execute("topicExtractor", (state) => ({ text: state.text }))
  .execute("entityRecognizer", (state) => ({ text: state.text }))
  // This waits for all three to complete
  .map((state) => ({
    combinedAnalysis: JSON.stringify({
      sentiment: state.sentimentAnalyzerResult.sentiment,
      topics: state.topicExtractorResult.topics,
      entities: state.entityRecognizerResult.entities,
    }),
  }));

// Check execution plan
const plan = autoParallelFlow.getExecutionPlan();
console.log("Parallel groups:", plan.parallelGroups);
console.log("Max parallelism:", plan.maxParallelism);

5. Asynchronous Operations

AxFlow supports asynchronous transformations in map operations, enabling API calls, database queries, and other async operations within your flow:

const asyncFlow = new AxFlow<
  { userQuery: string },
  { enrichedData: string; apiCallTime: number }
>()
  .node("processor", "enrichedData:string -> processedResult:string")
  // Single async map - API enrichment
  .map(async (state) => {
    const startTime = Date.now();
    const apiData = await fetchFromExternalAPI(state.userQuery);
    const duration = Date.now() - startTime;

    return {
      ...state,
      enrichedData: apiData,
      apiCallTime: duration,
    };
  })
  // Execute AI processing on enriched data
  .execute("processor", (state) => ({ enrichedData: state.enrichedData }))
  // Parallel async operations
  .map([
    async (state) => {
      const sentiment = await analyzeSentiment(state.processedResult);
      return { ...state, sentiment };
    },
    async (state) => {
      const entities = await extractEntities(state.processedResult);
      return { ...state, entities };
    },
    async (state) => {
      const summary = await generateSummary(state.processedResult);
      return { ...state, summary };
    },
  ], { parallel: true })
  .map((state) => ({
    enrichedData: state.enrichedData,
    apiCallTime: state.apiCallTime,
  }));

Mixed Sync/Async Processing

You can mix synchronous and asynchronous operations seamlessly:

const mixedFlow = new AxFlow<{ rawData: string }, { result: string }>()
  // Sync preprocessing
  .map((state) => ({
    ...state,
    cleanedData: state.rawData.trim().toLowerCase(),
  }))
  // Async validation
  .map(async (state) => {
    const isValid = await validateWithAPI(state.cleanedData);
    return { ...state, isValid };
  })
  // More sync processing
  .map((state) => ({
    ...state,
    timestamp: Date.now(),
  }))
  // Final async processing
  .map(async (state) => {
    if (state.isValid) {
      const processed = await processData(state.cleanedData);
      return { result: processed };
    }
    return { result: "Invalid data" };
  });

Performance Considerations

// Parallel execution (faster)
flow.map([asyncOp1, asyncOp2, asyncOp3], { parallel: true });

// Sequential execution (slower but controlled)
flow
  .map(asyncOp1)
  .map(asyncOp2)
  .map(asyncOp3);

6. Self-Healing with Feedback Loops

const selfHealingFlow = new AxFlow<{ input: string }, { output: string }>()
  .node("processor", "input:string -> output:string, confidence:number")
  .node("validator", "output:string -> isValid:boolean, issues:string[]")
  .node("fixer", "output:string, issues:string[] -> fixedOutput:string")
  .map((state) => ({ ...state, attempts: 0 }))
  .label("process")
  .map((state) => ({ ...state, attempts: state.attempts + 1 }))
  .execute("processor", (state) => ({ input: state.input }))
  .execute("validator", (state) => ({ output: state.processorResult.output }))
  .feedback(
    (state) => !state.validatorResult.isValid && state.attempts < 3,
    "process",
  )
  // If still invalid after retries, try to fix
  .branch((state) => state.validatorResult.isValid)
  .when(false)
  .execute("fixer", (state) => ({
    output: state.processorResult.output,
    issues: state.validatorResult.issues,
  }))
  .map((state) => ({
    output: state.fixerResult.fixedOutput,
  }))
  .when(true)
  .map((state) => ({
    output: state.processorResult.output,
  }))
  .merge();

Advanced Features

Instrumentation and Optimization (v13.0.24+)

Example

import { ai, flow } from "@ax-llm/ax";
import { context, trace } from "@opentelemetry/api";

const llm = ai({ name: "mock" });
const tracer = trace.getTracer("axflow");

const wf = flow<{ userQuestion: string }>()
  .node("summarizer", "documentText:string -> summaryText:string")
  .execute("summarizer", (s) => ({ documentText: s.userQuestion }))
  .returns((s) => ({ finalAnswer: (s as any).summarizerResult.summaryText }));

const parentCtx = context.active();
const out = await wf.forward(llm, { userQuestion: "hi" }, {
  tracer,
  traceContext: parentCtx,
});

1. Auto-Parallelization

AxFlow automatically analyzes dependencies and runs independent operations in parallel:

// Disable auto-parallelization globally
const sequentialFlow = new AxFlow({ autoParallel: false });

// Disable for specific execution
const result = await flow.forward(llm, input, { autoParallel: false });

// Get execution plan information
const plan = flow.getExecutionPlan();
console.log(
  `Will run ${plan.parallelGroups} parallel groups with max ${plan.maxParallelism} concurrent operations`,
);

2. Dynamic AI Context

Use different AI services for different nodes:

flow
  .execute("fastProcessor", mapping, { ai: speedAI })
  .execute("powerfulAnalyzer", mapping, { ai: powerAI })
  .execute("defaultProcessor", mapping); // Uses default AI from forward()

3. Batch Processing with Derive

const batchFlow = new AxFlow<{ items: string[] }, { processedItems: string[] }>(
  {
    autoParallel: true,
    batchSize: 3, // Process 3 items at a time
  },
)
  .derive("processedItems", "items", (item, index) => {
    return `processed-${item}-${index}`;
  }, { batchSize: 2 }); // Override batch size for this operation

4. Error Handling

try {
  const result = await flow.forward(llm, input);
} catch (error) {
  console.error("Flow execution failed:", error);
}

5. Program Integration

AxFlow integrates with the dspy-ts ecosystem:

// Get signature
const signature = flow.getSignature();

// Set examples (if applicable)
flow.setExamples(examples);

// Get traces and usage
const traces = flow.getTraces();
const usage = flow.getUsage();

Best Practices

1. Node Naming

Use descriptive names that clearly indicate the node’s purpose:

// ❌ Unclear
flow.node("proc1", signature);

// ✅ Clear
flow.node("documentSummarizer", signature);
flow.node("sentimentAnalyzer", signature);

2. State Management

Keep state flat and predictable:

// ✅ Good - flat structure
flow.map((state) => ({
  ...state,
  processedText: state.rawText.toLowerCase(),
  timestamp: Date.now(),
}));

// ❌ Avoid - deep nesting
flow.map((state) => ({
  data: {
    processed: {
      text: state.rawText.toLowerCase(),
    },
  },
}));

3. Error Prevention

Always define nodes before executing them:

// ✅ Correct order
flow
  .node("processor", signature)
  .execute("processor", mapping);

// ❌ Will throw error
flow
  .execute("processor", mapping) // Node not defined yet!
  .node("processor", signature);

4. Loop Safety

Ensure loop conditions can change:

// ✅ Safe - counter increments
flow
  .map((state) => ({ ...state, counter: 0 }))
  .while((state) => state.counter < 5)
  .map((state) => ({ ...state, counter: state.counter + 1 })) // Condition changes
  .execute("processor", mapping)
  .endWhile();

// ❌ Infinite loop - condition never changes
flow
  .while((state) => state.isProcessing) // This never changes!
  .execute("processor", mapping)
  .endWhile();

5. Parallel Design

Structure flows to maximize automatic parallelization:

// ✅ Parallel-friendly - independent operations
flow
  .execute("analyzer1", (state) => ({ text: state.input })) // Can run in parallel
  .execute("analyzer2", (state) => ({ text: state.input })) // Can run in parallel
  .execute("combiner", (state) => ({ // Waits for both
    input1: state.analyzer1Result.output,
    input2: state.analyzer2Result.output,
  }));

// ❌ Sequential - unnecessary dependencies
flow
  .execute("analyzer1", (state) => ({ text: state.input }))
  .execute("analyzer2", (state) => ({
    text: state.input,
    context: state.analyzer1Result.output, // Creates dependency!
  }));

Examples

Extended Node Patterns with nx

import { AxFlow, f } from "@ax-llm/ax";

// Chain-of-thought reasoning pattern
const reasoningFlow = new AxFlow<{ question: string }, { answer: string }>()
  .nx("reasoner", "question:string -> answer:string", {
    prependOutputs: [
      {
        name: "reasoning",
        type: f.internal(f.string("Step-by-step reasoning")),
      },
    ],
  })
  .execute("reasoner", (state) => ({ question: state.question }))
  .map((state) => ({ answer: state.reasonerResult.answer }));

// Confidence scoring pattern
const confidenceFlow = new AxFlow<
  { input: string },
  { result: string; confidence: number }
>()
  .nx("analyzer", "input:string -> result:string", {
    appendOutputs: [
      { name: "confidence", type: f.number("Confidence score 0-1") },
    ],
  })
  .execute("analyzer", (state) => ({ input: state.input }))
  .map((state) => ({
    result: state.analyzerResult.result,
    confidence: state.analyzerResult.confidence,
  }));

// Contextual processing pattern
const contextualFlow = new AxFlow<
  { query: string; context?: string },
  { response: string }
>()
  .nx("processor", "query:string -> response:string", {
    appendInputs: [
      { name: "context", type: f.optional(f.string("Additional context")) },
    ],
  })
  .execute("processor", (state) => ({
    query: state.query,
    context: state.context,
  }))
  .map((state) => ({ response: state.processorResult.response }));

Document Processing Pipeline

const documentPipeline = flow<{ document: string }>()
  .node("summarizer", "documentText:string -> summary:string")
  .node("sentimentAnalyzer", "documentText:string -> sentiment:string")
  .node("keywordExtractor", "documentText:string -> keywords:string[]")
  // These run automatically in parallel
  .execute("summarizer", (state) => ({ documentText: state.document }))
  .execute("sentimentAnalyzer", (state) => ({ documentText: state.document }))
  .execute("keywordExtractor", (state) => ({ documentText: state.document }))
  // Use returns() for proper type inference
  .returns((state) => ({
    summary: state.summarizerResult.summary,
    sentiment: state.sentimentAnalyzerResult.sentiment,
    keywords: state.keywordExtractorResult.keywords,
  }));

// TypeScript now knows the exact return type:
// { summary: string; sentiment: string; keywords: string[] }
const result = await documentPipeline.forward(llm, { document: "..." });
console.log(result.summary); // Fully typed
console.log(result.sentiment); // Fully typed
console.log(result.keywords); // Fully typed

Quality-Driven Content Creation

const contentCreator = new AxFlow<
  { topic: string; targetQuality: number },
  { finalContent: string; iterations: number }
>()
  .node("writer", "topic:string -> content:string")
  .node("qualityChecker", "content:string -> score:number, feedback:string")
  .node("improver", "content:string, feedback:string -> improvedContent:string")
  .map((state) => ({ currentContent: "", iteration: 0, bestScore: 0 }))
  // Initial writing
  .execute("writer", (state) => ({ topic: state.topic }))
  .map((state) => ({
    ...state,
    currentContent: state.writerResult.content,
    iteration: 1,
  }))
  // Improvement loop
  .while((state) =>
    state.iteration < 5 && state.bestScore < state.targetQuality
  )
  .execute("qualityChecker", (state) => ({ content: state.currentContent }))
  .branch((state) => state.qualityCheckerResult.score > state.bestScore)
  .when(true)
  .execute("improver", (state) => ({
    content: state.currentContent,
    feedback: state.qualityCheckerResult.feedback,
  }))
  .map((state) => ({
    ...state,
    currentContent: state.improverResult.improvedContent,
    bestScore: state.qualityCheckerResult.score,
    iteration: state.iteration + 1,
  }))
  .when(false)
  .map((state) => ({ ...state, iteration: 5 })) // Exit loop
  .merge()
  .endWhile()
  .map((state) => ({
    finalContent: state.currentContent,
    iterations: state.iteration,
  }));

Async Data Enrichment Pipeline

const enrichmentPipeline = new AxFlow<
  { userQuery: string },
  { finalResult: string; metadata: object }
>()
  .node("analyzer", "enrichedData:string -> analysis:string")
  // Parallel async data enrichment from multiple sources
  .map([
    async (state) => {
      const userProfile = await fetchUserProfile(state.userQuery);
      return { ...state, userProfile };
    },
    async (state) => {
      const contextData = await fetchContextData(state.userQuery);
      return { ...state, contextData };
    },
    async (state) => {
      const historicalData = await fetchHistoricalData(state.userQuery);
      return { ...state, historicalData };
    },
  ], { parallel: true })
  // Combine enriched data
  .map(async (state) => {
    const combinedData = await combineDataSources({
      userProfile: state.userProfile,
      context: state.contextData,
      historical: state.historicalData,
    });

    return {
      ...state,
      enrichedData: combinedData,
      metadata: {
        sources: ["userProfile", "contextData", "historicalData"],
        timestamp: Date.now(),
      },
    };
  })
  // Process with AI
  .execute("analyzer", (state) => ({ enrichedData: state.enrichedData }))
  // Final async post-processing
  .map(async (state) => {
    const enhanced = await enhanceResult(state.analyzerResult.analysis);
    return {
      finalResult: enhanced,
      metadata: state.metadata,
    };
  });

Real-time Data Processing with Async Maps

const realTimeProcessor = new AxFlow<
  { streamData: string[] },
  { processedResults: string[]; stats: object }
>()
  // Async preprocessing of each item in parallel
  .map(async (state) => {
    const startTime = Date.now();

    // Process each item in batches with async operations
    const processedItems = await Promise.all(
      state.streamData.map(async (item, index) => {
        const enriched = await enrichDataItem(item);
        const validated = await validateItem(enriched);
        return { item: validated, index, timestamp: Date.now() };
      }),
    );

    const processingTime = Date.now() - startTime;

    return {
      ...state,
      processedItems,
      processingStats: {
        totalItems: state.streamData.length,
        processingTime,
        itemsPerSecond: state.streamData.length / (processingTime / 1000),
      },
    };
  })
  // Parallel async quality checks
  .map([
    async (state) => {
      const qualityScore = await calculateQualityScore(state.processedItems);
      return { ...state, qualityScore };
    },
    async (state) => {
      const anomalies = await detectAnomalies(state.processedItems);
      return { ...state, anomalies };
    },
    async (state) => {
      const trends = await analyzeTrends(state.processedItems);
      return { ...state, trends };
    },
  ], { parallel: true })
  // Final aggregation
  .map((state) => ({
    processedResults: state.processedItems.map((item) => item.item),
    stats: {
      ...state.processingStats,
      qualityScore: state.qualityScore,
      anomaliesFound: state.anomalies?.length || 0,
      trendsDetected: state.trends?.length || 0,
    },
  }));

Multi-Model Research System

const researchSystem = new AxFlow<
  { query: string },
  { answer: string; sources: string[]; confidence: number }
>()
  .node("queryGenerator", "researchQuestion:string -> searchQuery:string")
  .node("retriever", "searchQuery:string -> retrievedDocument:string")
  .node(
    "answerGenerator",
    "retrievedDocument:string, researchQuestion:string -> researchAnswer:string",
  )
  .execute("queryGenerator", (state) => ({ researchQuestion: state.query }))
  .execute(
    "retriever",
    (state) => ({ searchQuery: state.queryGeneratorResult.searchQuery }),
  )
  .execute("answerGenerator", (state) => ({
    retrievedDocument: state.retrieverResult.retrievedDocument,
    researchQuestion: state.query,
  }))
  .map((state) => ({
    answer: state.answerGeneratorResult.researchAnswer,
    sources: [state.retrieverResult.retrievedDocument],
    confidence: 0.85,
  }));

Troubleshooting

Common Errors

  1. “Node ‘nodeName’ not found”

    • Ensure you call .node() before .execute()
  2. “endWhile() called without matching while()”

    • Every .while() needs a matching .endWhile()
  3. “when() called without matching branch()”

    • Every .when() needs to be inside a .branch() / .merge() block
  4. “merge() called without matching branch()”

    • Every .branch() needs a matching .merge()
  5. “Label ‘labelName’ not found”

    • Ensure the label exists before using it in .feedback()

Performance Issues

  1. Operations running sequentially instead of parallel

    • Check for unnecessary dependencies in your mappings
    • Use flow.getExecutionPlan() to debug
  2. Memory issues with large datasets

    • Use batchSize option to control parallel execution
    • Consider using .derive() for array processing

Type Errors

  1. State property not found
    • Use .map() to ensure required properties exist
    • Check the spelling of result field names ({nodeName}Result)

This documentation provides a comprehensive guide to AxFlow based on the actual implementation and test cases. All examples have been verified against the test suite to ensure accuracy.