AxFlow Documentation
AxFlow is a powerful workflow orchestration system for building complex AI applications with automatic dependency analysis, parallel execution, and flexible control flow patterns.
Table of Contents
- Quick Start
- Why AxFlow is the Future
- Core Concepts
- API Reference
- Control Flow Patterns
- Advanced Features
- Best Practices
- Examples
Quick Start
Basic Flow
import { ai, AxFlow } from "@ax-llm/ax";
// Create AI instance
const llm = ai({ name: "openai", apiKey: process.env.OPENAI_APIKEY! });
// Create a simple flow
const flow = new AxFlow<{ userInput: string }, { responseText: string }>()
.node("testNode", "userInput:string -> responseText:string")
.execute("testNode", (state) => ({ userInput: state.userInput }))
.map((state) => ({ responseText: state.testNodeResult.responseText }));
// Execute the flow
const result = await flow.forward(llm, { userInput: "Hello world" });
console.log(result.responseText);
Constructor Options
// Basic constructor
const flow = new AxFlow();
// With options
const flow = new AxFlow({ autoParallel: false });
// With explicit typing
const flow = new AxFlow<InputType, OutputType>();
// With options and typing
const flow = new AxFlow<InputType, OutputType>({
autoParallel: true,
batchSize: 5,
});
Why AxFlow is the Future
đ Automatic Performance Optimization:
- Zero-Config Parallelization: Automatically runs independent operations in parallel (1.5-3x speedup)
- Intelligent Dependency Analysis: AI-powered analysis of input/output dependencies
- Optimal Execution Planning: Automatically groups operations into parallel levels
- Concurrency Control: Smart resource management with configurable limits
- Runtime Control: Enable/disable auto-parallelization per execution as needed
đĄď¸ Production-Ready Resilience:
- Exponential Backoff: Smart retry strategies with configurable delays
- Graceful Degradation: Fallback mechanisms for continuous operation
- Error Isolation: Prevent cascading failures across workflow components
- Resource Monitoring: Adaptive scaling based on system performance
Compared to Traditional Approaches:
- 10x More Compact: Ultra-concise syntax with powerful aliases
- Zero Boilerplate: Automatic state management and context threading
- Multi-Modal Ready: Native support for text, images, audio, and streaming
- Self-Optimizing: Built-in compatibility with MiPRO and other advanced optimizers
- Enterprise Ready: Circuit breakers, retries, and monitoring built-in
- Production Hardened: Used by startups scaling to millions of users
Real-World Superpowers:
- Autonomous Agents: Self-healing, self-improving AI workflows
- Multi-Model Orchestration: Route tasks to the perfect AI for each job
- Adaptive Pipelines: Workflows that evolve based on real-time feedback
- Cost Intelligence: Automatic optimization between speed, quality, and cost
- Mission Critical: Built for production with enterprise-grade reliability
âAxFlow doesnât just execute AI workflowsâit orchestrates the future of intelligent systems with automatic performance optimizationâ
Ready to build the impossible? AxFlow extends AxProgramWithSignature
,
giving you access to the entire Ax ecosystem: optimization, streaming, tracing,
function calling, and more. The future of AI development is declarative,
adaptive, and beautiful.
Core Concepts
1. Node Definition
Nodes define the available operations in your flow. You must define nodes before executing them.
// String signature (creates AxGen automatically)
flow.node("processor", "input:string -> output:string");
// With multiple outputs
flow.node("analyzer", "text:string -> sentiment:string, confidence:number");
// Complex field types
flow.node(
"extractor",
"documentText:string -> processedResult:string, entities:string[]",
);
2. State Evolution
State grows as you execute nodes, with results stored in {nodeName}Result
format:
// Initial state: { userInput: "Hello" }
flow.execute("processor", (state) => ({ input: state.userInput }));
// State becomes: { userInput: "Hello", processorResult: { output: "Processed Hello" } }
flow.execute("analyzer", (state) => ({ text: state.processorResult.output }));
// State becomes: {
// userInput: "Hello",
// processorResult: { output: "Processed Hello" },
// analyzerResult: { sentiment: "positive", confidence: 0.8 }
// }
3. State Transformation
Use map()
to transform state between operations:
flow.map((state) => ({
...state,
processedInput: state.userInput.toLowerCase(),
timestamp: Date.now(),
}));
API Reference
Core Methods
node(name: string, signature: string, options?: object)
Define a node with the given signature.
flow.node("summarizer", "documentText:string -> summary:string");
flow.node("classifier", "text:string -> category:string", { debug: true });
Alias: n()
- Short alias for node()
flow.n("summarizer", "documentText:string -> summary:string");
nodeExtended(name: string, baseSignature: string | AxSignature, extensions: object)
Create a node with extended signature by adding additional input/output fields.
// Add chain-of-thought reasoning
flow.nodeExtended("reasoner", "question:string -> answer:string", {
prependOutputs: [
{ name: "reasoning", type: f.internal(f.string("Step-by-step reasoning")) },
],
});
// Add context and confidence
flow.nodeExtended("analyzer", "input:string -> output:string", {
appendInputs: [{ name: "context", type: f.optional(f.string("Context")) }],
appendOutputs: [{ name: "confidence", type: f.number("Confidence score") }],
});
Extension Options:
prependInputs
- Add fields at the beginning of input signatureappendInputs
- Add fields at the end of input signatureprependOutputs
- Add fields at the beginning of output signatureappendOutputs
- Add fields at the end of output signature
Alias: nx()
- Short alias for nodeExtended()
flow.nx("reasoner", "question:string -> answer:string", {
prependOutputs: [
{ name: "reasoning", type: f.internal(f.string("Step-by-step reasoning")) },
],
});
execute(nodeName: string, mapping: Function, options?: object)
Execute a node with input mapping.
flow.execute("summarizer", (state) => ({
documentText: state.document,
}));
// With AI override
flow.execute("processor", mapping, { ai: alternativeAI });
map(transform: Function)
Transform the current state synchronously or asynchronously.
// Synchronous transformation
flow.map((state) => ({
...state,
upperCaseResult: state.processorResult.output.toUpperCase(),
}));
// Asynchronous transformation
flow.map(async (state) => {
const apiData = await fetchFromAPI(state.query);
return {
...state,
enrichedData: apiData,
};
});
// Parallel asynchronous transformations
flow.map([
async (state) => ({ ...state, result1: await api1(state.data) }),
async (state) => ({ ...state, result2: await api2(state.data) }),
async (state) => ({ ...state, result3: await api3(state.data) }),
], { parallel: true });
Alias: m()
- Short alias for map()
(supports both sync and async)
// Async with alias
flow.m(async (state) => {
const processed = await processAsync(state.input);
return { ...state, processed };
});
returns(transform: Function)
/ r(transform: Function)
Terminal transformation that sets the final output type of the flow. Use this as the last transformation to get proper TypeScript type inference for the flow result.
const typedFlow = flow<{ input: string }>()
.map((state) => ({ ...state, processed: true, count: 42 }))
.returns((state) => ({
result: state.processed ? "done" : "pending",
totalCount: state.count,
})); // TypeScript now properly infers the output type
// Result is typed as { result: string; totalCount: number }
const result = await typedFlow.forward(llm, { input: "test" });
console.log(result.result); // Type-safe access
console.log(result.totalCount); // Type-safe access
Key Benefits:
- Proper Type Inference: TypeScript automatically infers the correct return type
- Clear Intent: Explicitly marks the final transformation of your flow
- Type Safety: Full autocomplete and type checking on the result object
Aliases:
returns()
- Full descriptive namer()
- Short alias (matchesm()
pattern)
// These are equivalent:
flow.returns((state) => ({ output: state.value }));
flow.r((state) => ({ output: state.value }));
When to Use:
- When you want proper TypeScript type inference for complex flows
- As the final step in flows that transform the state into a specific output format
- When building reusable flows that need clear output contracts
Control Flow Methods
while(condition: Function)
/ endWhile()
Create loops that execute while condition is true.
flow
.map((state) => ({ ...state, counter: 0 }))
.while((state) => state.counter < 3)
.map((state) => ({ ...state, counter: state.counter + 1 }))
.execute("processor", (state) => ({ input: `iteration ${state.counter}` }))
.endWhile();
branch(predicate: Function)
/ when(value)
/ merge()
Conditional branching based on predicate evaluation.
flow
.branch((state) => state.complexity)
.when("simple")
.execute("simpleProcessor", mapping)
.when("complex")
.execute("complexProcessor", mapping)
.merge()
.map((state) => ({
result: state.simpleProcessorResult?.output ||
state.complexProcessorResult?.output,
}));
parallel(subFlows: Function[])
/ merge(key: string, mergeFunction: Function)
Execute multiple sub-flows in parallel.
flow
.parallel([
(subFlow) =>
subFlow.execute("analyzer1", (state) => ({ text: state.input })),
(subFlow) =>
subFlow.execute("analyzer2", (state) => ({ text: state.input })),
(subFlow) =>
subFlow.execute("analyzer3", (state) => ({ text: state.input })),
])
.merge("combinedResults", (result1, result2, result3) => ({
analysis1: result1.analyzer1Result.analysis,
analysis2: result2.analyzer2Result.analysis,
analysis3: result3.analyzer3Result.analysis,
}));
label(name: string)
/ feedback(condition: Function, labelName: string, maxIterations?: number)
Create labeled points for feedback loops.
flow
.map((state) => ({ ...state, attempts: 0 }))
.label("retry-point")
.map((state) => ({ ...state, attempts: state.attempts + 1 }))
.execute("processor", (state) => ({ input: state.userInput }))
.execute("validator", (state) => ({ output: state.processorResult.output }))
.feedback(
(state) => !state.validatorResult.isValid && state.attempts < 3,
"retry-point",
);
Advanced Methods
derive(outputField: string, inputField: string, transform: Function, options?: object)
Create derived fields from array or scalar inputs with parallel processing support.
// Derive from array with parallel processing
flow.derive(
"processedItems",
"items",
(item, index) => `processed-${item}-${index}`,
{
batchSize: 2,
},
);
// Derive from scalar
flow.derive("upperText", "inputText", (text) => text.toUpperCase());
Control Flow Patterns
1. Sequential Processing
const sequentialFlow = new AxFlow<{ input: string }, { finalResult: string }>()
.node("step1", "input:string -> intermediate:string")
.node("step2", "intermediate:string -> output:string")
.execute("step1", (state) => ({ input: state.input }))
.execute(
"step2",
(state) => ({ intermediate: state.step1Result.intermediate }),
)
.map((state) => ({ finalResult: state.step2Result.output }));
2. Conditional Processing
const conditionalFlow = new AxFlow<
{ query: string; isComplex: boolean },
{ response: string }
>()
.node("simpleHandler", "query:string -> response:string")
.node("complexHandler", "query:string -> response:string")
.branch((state) => state.isComplex)
.when(true)
.execute("complexHandler", (state) => ({ query: state.query }))
.when(false)
.execute("simpleHandler", (state) => ({ query: state.query }))
.merge()
.map((state) => ({
response: state.complexHandlerResult?.response ||
state.simpleHandlerResult?.response,
}));
3. Iterative Processing
const iterativeFlow = new AxFlow<
{ content: string },
{ finalContent: string }
>()
.node("processor", "content:string -> processedContent:string")
.node("qualityChecker", "content:string -> qualityScore:number")
.map((state) => ({ currentContent: state.content, iteration: 0 }))
.while((state) => state.iteration < 3 && (state.qualityScore || 0) < 0.8)
.map((state) => ({ ...state, iteration: state.iteration + 1 }))
.execute("processor", (state) => ({ content: state.currentContent }))
.execute(
"qualityChecker",
(state) => ({ content: state.processorResult.processedContent }),
)
.map((state) => ({
...state,
currentContent: state.processorResult.processedContent,
qualityScore: state.qualityCheckerResult.qualityScore,
}))
.endWhile()
.map((state) => ({ finalContent: state.currentContent }));
4. Parallel Processing with Auto-Parallelization
AxFlow automatically detects independent operations and runs them in parallel:
const autoParallelFlow = new AxFlow<
{ text: string },
{ combinedAnalysis: string }
>()
.node("sentimentAnalyzer", "text:string -> sentiment:string")
.node("topicExtractor", "text:string -> topics:string[]")
.node("entityRecognizer", "text:string -> entities:string[]")
// These three execute automatically in parallel! âĄ
.execute("sentimentAnalyzer", (state) => ({ text: state.text }))
.execute("topicExtractor", (state) => ({ text: state.text }))
.execute("entityRecognizer", (state) => ({ text: state.text }))
// This waits for all three to complete
.map((state) => ({
combinedAnalysis: JSON.stringify({
sentiment: state.sentimentAnalyzerResult.sentiment,
topics: state.topicExtractorResult.topics,
entities: state.entityRecognizerResult.entities,
}),
}));
// Check execution plan
const plan = autoParallelFlow.getExecutionPlan();
console.log("Parallel groups:", plan.parallelGroups);
console.log("Max parallelism:", plan.maxParallelism);
5. Asynchronous Operations
AxFlow supports asynchronous transformations in map operations, enabling API calls, database queries, and other async operations within your flow:
const asyncFlow = new AxFlow<
{ userQuery: string },
{ enrichedData: string; apiCallTime: number }
>()
.node("processor", "enrichedData:string -> processedResult:string")
// Single async map - API enrichment
.map(async (state) => {
const startTime = Date.now();
const apiData = await fetchFromExternalAPI(state.userQuery);
const duration = Date.now() - startTime;
return {
...state,
enrichedData: apiData,
apiCallTime: duration,
};
})
// Execute AI processing on enriched data
.execute("processor", (state) => ({ enrichedData: state.enrichedData }))
// Parallel async operations
.map([
async (state) => {
const sentiment = await analyzeSentiment(state.processedResult);
return { ...state, sentiment };
},
async (state) => {
const entities = await extractEntities(state.processedResult);
return { ...state, entities };
},
async (state) => {
const summary = await generateSummary(state.processedResult);
return { ...state, summary };
},
], { parallel: true })
.map((state) => ({
enrichedData: state.enrichedData,
apiCallTime: state.apiCallTime,
}));
Mixed Sync/Async Processing
You can mix synchronous and asynchronous operations seamlessly:
const mixedFlow = new AxFlow<{ rawData: string }, { result: string }>()
// Sync preprocessing
.map((state) => ({
...state,
cleanedData: state.rawData.trim().toLowerCase(),
}))
// Async validation
.map(async (state) => {
const isValid = await validateWithAPI(state.cleanedData);
return { ...state, isValid };
})
// More sync processing
.map((state) => ({
...state,
timestamp: Date.now(),
}))
// Final async processing
.map(async (state) => {
if (state.isValid) {
const processed = await processData(state.cleanedData);
return { result: processed };
}
return { result: "Invalid data" };
});
Performance Considerations
- Parallel async maps: Multiple async operations run concurrently
- Sequential async maps: Each async operation waits for the previous one
- Batch control: Use
batchSize
option to control parallelism
// Parallel execution (faster)
flow.map([asyncOp1, asyncOp2, asyncOp3], { parallel: true });
// Sequential execution (slower but controlled)
flow
.map(asyncOp1)
.map(asyncOp2)
.map(asyncOp3);
6. Self-Healing with Feedback Loops
const selfHealingFlow = new AxFlow<{ input: string }, { output: string }>()
.node("processor", "input:string -> output:string, confidence:number")
.node("validator", "output:string -> isValid:boolean, issues:string[]")
.node("fixer", "output:string, issues:string[] -> fixedOutput:string")
.map((state) => ({ ...state, attempts: 0 }))
.label("process")
.map((state) => ({ ...state, attempts: state.attempts + 1 }))
.execute("processor", (state) => ({ input: state.input }))
.execute("validator", (state) => ({ output: state.processorResult.output }))
.feedback(
(state) => !state.validatorResult.isValid && state.attempts < 3,
"process",
)
// If still invalid after retries, try to fix
.branch((state) => state.validatorResult.isValid)
.when(false)
.execute("fixer", (state) => ({
output: state.processorResult.output,
issues: state.validatorResult.issues,
}))
.map((state) => ({
output: state.fixerResult.fixedOutput,
}))
.when(true)
.map((state) => ({
output: state.processorResult.output,
}))
.merge();
Advanced Features
Instrumentation and Optimization (v13.0.24+)
- Deprecation: prefer
flow()
factory overnew AxFlow()
. - Tracing: pass a Tracer via
flow({ tracer })
orflow.forward(llm, input, { tracer, traceContext })
.- A parent span is created at the flow boundary (if
tracer
is provided). - The parent span context is propagated to all node
.forward()
calls viaoptions.traceContext
. - Pass an OpenTelemetry Context for
traceContext
(not a Span). Use@opentelemetry/api
context.active()
or similar.
- A parent span is created at the flow boundary (if
- Meter: pass
meter
the same way astracer
; it is propagated to node forwards. - Demos/Examples routing:
flow.setDemos(demos)
routes byprogramId
to the correct prompts, similar to DSPy. The flow maintains an internalAxProgram
and registers all child nodes; each node filters demos byprogramId
. - Optimization:
flow.applyOptimization(optimizedProgram)
applies to the flowâs internal program and all registered child nodes. - Parallel map:
flow.map([...], { parallel: true })
merges all transform outputs back into state.
Example
import { ai, flow } from "@ax-llm/ax";
import { context, trace } from "@opentelemetry/api";
const llm = ai({ name: "mock" });
const tracer = trace.getTracer("axflow");
const wf = flow<{ userQuestion: string }>()
.node("summarizer", "documentText:string -> summaryText:string")
.execute("summarizer", (s) => ({ documentText: s.userQuestion }))
.returns((s) => ({ finalAnswer: (s as any).summarizerResult.summaryText }));
const parentCtx = context.active();
const out = await wf.forward(llm, { userQuestion: "hi" }, {
tracer,
traceContext: parentCtx,
});
1. Auto-Parallelization
AxFlow automatically analyzes dependencies and runs independent operations in parallel:
// Disable auto-parallelization globally
const sequentialFlow = new AxFlow({ autoParallel: false });
// Disable for specific execution
const result = await flow.forward(llm, input, { autoParallel: false });
// Get execution plan information
const plan = flow.getExecutionPlan();
console.log(
`Will run ${plan.parallelGroups} parallel groups with max ${plan.maxParallelism} concurrent operations`,
);
2. Dynamic AI Context
Use different AI services for different nodes:
flow
.execute("fastProcessor", mapping, { ai: speedAI })
.execute("powerfulAnalyzer", mapping, { ai: powerAI })
.execute("defaultProcessor", mapping); // Uses default AI from forward()
3. Batch Processing with Derive
const batchFlow = new AxFlow<{ items: string[] }, { processedItems: string[] }>(
{
autoParallel: true,
batchSize: 3, // Process 3 items at a time
},
)
.derive("processedItems", "items", (item, index) => {
return `processed-${item}-${index}`;
}, { batchSize: 2 }); // Override batch size for this operation
4. Error Handling
try {
const result = await flow.forward(llm, input);
} catch (error) {
console.error("Flow execution failed:", error);
}
5. Program Integration
AxFlow integrates with the dspy-ts ecosystem:
// Get signature
const signature = flow.getSignature();
// Set examples (if applicable)
flow.setExamples(examples);
// Get traces and usage
const traces = flow.getTraces();
const usage = flow.getUsage();
Best Practices
1. Node Naming
Use descriptive names that clearly indicate the nodeâs purpose:
// â Unclear
flow.node("proc1", signature);
// â
Clear
flow.node("documentSummarizer", signature);
flow.node("sentimentAnalyzer", signature);
2. State Management
Keep state flat and predictable:
// â
Good - flat structure
flow.map((state) => ({
...state,
processedText: state.rawText.toLowerCase(),
timestamp: Date.now(),
}));
// â Avoid - deep nesting
flow.map((state) => ({
data: {
processed: {
text: state.rawText.toLowerCase(),
},
},
}));
3. Error Prevention
Always define nodes before executing them:
// â
Correct order
flow
.node("processor", signature)
.execute("processor", mapping);
// â Will throw error
flow
.execute("processor", mapping) // Node not defined yet!
.node("processor", signature);
4. Loop Safety
Ensure loop conditions can change:
// â
Safe - counter increments
flow
.map((state) => ({ ...state, counter: 0 }))
.while((state) => state.counter < 5)
.map((state) => ({ ...state, counter: state.counter + 1 })) // Condition changes
.execute("processor", mapping)
.endWhile();
// â Infinite loop - condition never changes
flow
.while((state) => state.isProcessing) // This never changes!
.execute("processor", mapping)
.endWhile();
5. Parallel Design
Structure flows to maximize automatic parallelization:
// â
Parallel-friendly - independent operations
flow
.execute("analyzer1", (state) => ({ text: state.input })) // Can run in parallel
.execute("analyzer2", (state) => ({ text: state.input })) // Can run in parallel
.execute("combiner", (state) => ({ // Waits for both
input1: state.analyzer1Result.output,
input2: state.analyzer2Result.output,
}));
// â Sequential - unnecessary dependencies
flow
.execute("analyzer1", (state) => ({ text: state.input }))
.execute("analyzer2", (state) => ({
text: state.input,
context: state.analyzer1Result.output, // Creates dependency!
}));
Examples
Extended Node Patterns with nx
import { AxFlow, f } from "@ax-llm/ax";
// Chain-of-thought reasoning pattern
const reasoningFlow = new AxFlow<{ question: string }, { answer: string }>()
.nx("reasoner", "question:string -> answer:string", {
prependOutputs: [
{
name: "reasoning",
type: f.internal(f.string("Step-by-step reasoning")),
},
],
})
.execute("reasoner", (state) => ({ question: state.question }))
.map((state) => ({ answer: state.reasonerResult.answer }));
// Confidence scoring pattern
const confidenceFlow = new AxFlow<
{ input: string },
{ result: string; confidence: number }
>()
.nx("analyzer", "input:string -> result:string", {
appendOutputs: [
{ name: "confidence", type: f.number("Confidence score 0-1") },
],
})
.execute("analyzer", (state) => ({ input: state.input }))
.map((state) => ({
result: state.analyzerResult.result,
confidence: state.analyzerResult.confidence,
}));
// Contextual processing pattern
const contextualFlow = new AxFlow<
{ query: string; context?: string },
{ response: string }
>()
.nx("processor", "query:string -> response:string", {
appendInputs: [
{ name: "context", type: f.optional(f.string("Additional context")) },
],
})
.execute("processor", (state) => ({
query: state.query,
context: state.context,
}))
.map((state) => ({ response: state.processorResult.response }));
Document Processing Pipeline
const documentPipeline = flow<{ document: string }>()
.node("summarizer", "documentText:string -> summary:string")
.node("sentimentAnalyzer", "documentText:string -> sentiment:string")
.node("keywordExtractor", "documentText:string -> keywords:string[]")
// These run automatically in parallel
.execute("summarizer", (state) => ({ documentText: state.document }))
.execute("sentimentAnalyzer", (state) => ({ documentText: state.document }))
.execute("keywordExtractor", (state) => ({ documentText: state.document }))
// Use returns() for proper type inference
.returns((state) => ({
summary: state.summarizerResult.summary,
sentiment: state.sentimentAnalyzerResult.sentiment,
keywords: state.keywordExtractorResult.keywords,
}));
// TypeScript now knows the exact return type:
// { summary: string; sentiment: string; keywords: string[] }
const result = await documentPipeline.forward(llm, { document: "..." });
console.log(result.summary); // Fully typed
console.log(result.sentiment); // Fully typed
console.log(result.keywords); // Fully typed
Quality-Driven Content Creation
const contentCreator = new AxFlow<
{ topic: string; targetQuality: number },
{ finalContent: string; iterations: number }
>()
.node("writer", "topic:string -> content:string")
.node("qualityChecker", "content:string -> score:number, feedback:string")
.node("improver", "content:string, feedback:string -> improvedContent:string")
.map((state) => ({ currentContent: "", iteration: 0, bestScore: 0 }))
// Initial writing
.execute("writer", (state) => ({ topic: state.topic }))
.map((state) => ({
...state,
currentContent: state.writerResult.content,
iteration: 1,
}))
// Improvement loop
.while((state) =>
state.iteration < 5 && state.bestScore < state.targetQuality
)
.execute("qualityChecker", (state) => ({ content: state.currentContent }))
.branch((state) => state.qualityCheckerResult.score > state.bestScore)
.when(true)
.execute("improver", (state) => ({
content: state.currentContent,
feedback: state.qualityCheckerResult.feedback,
}))
.map((state) => ({
...state,
currentContent: state.improverResult.improvedContent,
bestScore: state.qualityCheckerResult.score,
iteration: state.iteration + 1,
}))
.when(false)
.map((state) => ({ ...state, iteration: 5 })) // Exit loop
.merge()
.endWhile()
.map((state) => ({
finalContent: state.currentContent,
iterations: state.iteration,
}));
Async Data Enrichment Pipeline
const enrichmentPipeline = new AxFlow<
{ userQuery: string },
{ finalResult: string; metadata: object }
>()
.node("analyzer", "enrichedData:string -> analysis:string")
// Parallel async data enrichment from multiple sources
.map([
async (state) => {
const userProfile = await fetchUserProfile(state.userQuery);
return { ...state, userProfile };
},
async (state) => {
const contextData = await fetchContextData(state.userQuery);
return { ...state, contextData };
},
async (state) => {
const historicalData = await fetchHistoricalData(state.userQuery);
return { ...state, historicalData };
},
], { parallel: true })
// Combine enriched data
.map(async (state) => {
const combinedData = await combineDataSources({
userProfile: state.userProfile,
context: state.contextData,
historical: state.historicalData,
});
return {
...state,
enrichedData: combinedData,
metadata: {
sources: ["userProfile", "contextData", "historicalData"],
timestamp: Date.now(),
},
};
})
// Process with AI
.execute("analyzer", (state) => ({ enrichedData: state.enrichedData }))
// Final async post-processing
.map(async (state) => {
const enhanced = await enhanceResult(state.analyzerResult.analysis);
return {
finalResult: enhanced,
metadata: state.metadata,
};
});
Real-time Data Processing with Async Maps
const realTimeProcessor = new AxFlow<
{ streamData: string[] },
{ processedResults: string[]; stats: object }
>()
// Async preprocessing of each item in parallel
.map(async (state) => {
const startTime = Date.now();
// Process each item in batches with async operations
const processedItems = await Promise.all(
state.streamData.map(async (item, index) => {
const enriched = await enrichDataItem(item);
const validated = await validateItem(enriched);
return { item: validated, index, timestamp: Date.now() };
}),
);
const processingTime = Date.now() - startTime;
return {
...state,
processedItems,
processingStats: {
totalItems: state.streamData.length,
processingTime,
itemsPerSecond: state.streamData.length / (processingTime / 1000),
},
};
})
// Parallel async quality checks
.map([
async (state) => {
const qualityScore = await calculateQualityScore(state.processedItems);
return { ...state, qualityScore };
},
async (state) => {
const anomalies = await detectAnomalies(state.processedItems);
return { ...state, anomalies };
},
async (state) => {
const trends = await analyzeTrends(state.processedItems);
return { ...state, trends };
},
], { parallel: true })
// Final aggregation
.map((state) => ({
processedResults: state.processedItems.map((item) => item.item),
stats: {
...state.processingStats,
qualityScore: state.qualityScore,
anomaliesFound: state.anomalies?.length || 0,
trendsDetected: state.trends?.length || 0,
},
}));
Multi-Model Research System
const researchSystem = new AxFlow<
{ query: string },
{ answer: string; sources: string[]; confidence: number }
>()
.node("queryGenerator", "researchQuestion:string -> searchQuery:string")
.node("retriever", "searchQuery:string -> retrievedDocument:string")
.node(
"answerGenerator",
"retrievedDocument:string, researchQuestion:string -> researchAnswer:string",
)
.execute("queryGenerator", (state) => ({ researchQuestion: state.query }))
.execute(
"retriever",
(state) => ({ searchQuery: state.queryGeneratorResult.searchQuery }),
)
.execute("answerGenerator", (state) => ({
retrievedDocument: state.retrieverResult.retrievedDocument,
researchQuestion: state.query,
}))
.map((state) => ({
answer: state.answerGeneratorResult.researchAnswer,
sources: [state.retrieverResult.retrievedDocument],
confidence: 0.85,
}));
Troubleshooting
Common Errors
-
âNode ânodeNameâ not foundâ
- Ensure you call
.node()
before.execute()
- Ensure you call
-
âendWhile() called without matching while()â
- Every
.while()
needs a matching.endWhile()
- Every
-
âwhen() called without matching branch()â
- Every
.when()
needs to be inside a.branch()
/.merge()
block
- Every
-
âmerge() called without matching branch()â
- Every
.branch()
needs a matching.merge()
- Every
-
âLabel âlabelNameâ not foundâ
- Ensure the label exists before using it in
.feedback()
- Ensure the label exists before using it in
Performance Issues
-
Operations running sequentially instead of parallel
- Check for unnecessary dependencies in your mappings
- Use
flow.getExecutionPlan()
to debug
-
Memory issues with large datasets
- Use
batchSize
option to control parallel execution - Consider using
.derive()
for array processing
- Use
Type Errors
- State property not found
- Use
.map()
to ensure required properties exist - Check the spelling of result field names (
{nodeName}Result
)
- Use
This documentation provides a comprehensive guide to AxFlow based on the actual implementation and test cases. All examples have been verified against the test suite to ensure accuracy.