AxFlowExecutionPlanner
Builds and manages the execution plan with automatic parallelization.
This class is the core of AxFlow’s performance optimization system. It analyzes the dependency relationships between steps and creates an optimized execution plan that maximizes parallelism while ensuring correct execution order.
Key responsibilities:
- Dependency Analysis: Tracks what fields each step depends on and produces
- Parallel Grouping: Groups independent steps that can run simultaneously
- Execution Optimization: Creates optimized execution functions that run parallel groups concurrently
- Signature Inference: Provides data for automatic signature generation
The planner works by building a directed acyclic graph (DAG) of dependencies and then creating execution levels where all steps in a level can run in parallel.
Constructors
Constructor
new AxFlowExecutionPlanner(): AxFlowExecutionPlanner;
Returns
AxFlowExecutionPlanner
Methods
addExecutionStep()
addExecutionStep(
stepFunction: AxFlowStepFunction,
nodeName?: string,
mapping?: (state: any) => any,
stepType?: "map" | "execute" | "merge" | "parallel-map" | "parallel" | "derive",
mapTransform?: (state: any) => any,
mergeOptions?: object,
deriveOptions?: object): void;
Adds an execution step to the plan for analysis and optimization.
This method is called for every operation in the flow (execute, map, merge, etc.) and performs dependency analysis to understand what the step needs and produces. This information is crucial for building the parallel execution plan.
The method handles different types of steps:
- Execute steps: LLM node operations that depend on specific state fields
- Map steps: Transformations that modify the state object
- Merge steps: Operations that combine results from branches or parallel operations
- Other steps: Generic operations that don’t fit other categories
Parameters
Parameter | Type | Description |
---|---|---|
stepFunction | AxFlowStepFunction | The actual function to execute for this step |
nodeName? | string | Name of the node (for execute steps) |
mapping? | (state : any ) => any | Function that maps state to node inputs (for execute steps) |
stepType? | "map" | "execute" | "merge" | "parallel-map" | "parallel" | "derive" | Type of step for specialized analysis |
mapTransform? | (state : any ) => any | Transformation function (for map steps) |
mergeOptions? | { mergeFunction? : (…args : any []) => any ; resultKey? : string ; } | Options for merge operations (result key, merge function) |
mergeOptions.mergeFunction? | (…args : any []) => any | - |
mergeOptions.resultKey? | string | - |
deriveOptions? | { batchSize? : number ; inputFieldName : string ; outputFieldName : string ; } | - |
deriveOptions.batchSize? | number | - |
deriveOptions.inputFieldName? | string | - |
deriveOptions.outputFieldName? | string | - |
Returns
void
createOptimizedExecution()
createOptimizedExecution(batchSize?: number): AxFlowStepFunction[];
Creates optimized execution functions that implement the parallel execution plan.
This method converts the parallel groups into actual executable functions. It creates a series of steps where:
- Single-step groups execute directly
- Multi-step groups execute in parallel with batch size control
- Results are properly merged to maintain state consistency
The optimized execution can significantly improve performance for flows with independent operations, especially I/O-bound operations like LLM calls.
Performance benefits:
- Reduces total execution time for independent operations
- Maximizes CPU and I/O utilization
- Maintains correctness through dependency management
- Controls resource usage through batch size limiting
Parameters
Parameter | Type | Description |
---|---|---|
batchSize? | number | Maximum number of concurrent operations (optional) |
Returns
Array of optimized step functions ready for execution
getExecutionPlan()
getExecutionPlan(): object;
Gets detailed execution plan information for debugging and analysis.
This method provides comprehensive information about the execution plan, including step counts, parallel grouping details, and the complete dependency structure. It’s particularly useful for:
- Debugging execution flow issues
- Performance analysis and optimization
- Understanding parallelization effectiveness
- Monitoring execution plan complexity
Returns
object
Object containing detailed execution plan metrics and data
Name | Type |
---|---|
groups | AxFlowParallelGroup [] |
maxParallelism | number |
parallelGroups | number |
steps | AxFlowExecutionStep [] |
totalSteps | number |
getOptimizedExecutionSteps()
getOptimizedExecutionSteps(): AxFlowStepFunction[];
Gets optimized execution steps for the flow.
This method provides the optimized execution steps that can be used to execute the flow with maximum parallelism while maintaining dependency order.
Returns
Array of optimized step functions ready for execution
setInitialFields()
setInitialFields(fields: string[]): void;
Sets the initial fields and triggers parallel group rebuilding.
This method is called once the flow knows what input fields are available. It triggers the parallel group analysis which determines the optimal execution strategy for the entire flow.
Parameters
Parameter | Type | Description |
---|---|---|
fields | string [] | Array of field names available at the start of execution |
Returns
void