WorkflowExecutor Implementation Design
Overview
The WorkflowExecutor class provides the foundation for parsing and executing GitHub Actions-style YAML workflows with matrix expansion support. This implementation focuses on the parsing and preparation phase, establishing the infrastructure for workflow execution.
Implementation Architecture
Class Structure
class WorkflowExecutor:
"""Parse and execute GitHub Actions-style workflows with matrix expansion."""
def parse_workflow(self, workflow_path: Union[str, Path]) -> Dict[str, Any]
def expand_matrix(self, matrix: Dict[str, List[Any]]) -> List[Dict[str, Any]]
def construct_paths(self, job: Dict[str, Any], data_root: str,
output_root: str, workflow_id: str) -> Dict[str, str]
Key Design Decisions
1. Integration with Existing Schema Validation
The WorkflowExecutor leverages the existing causaliq_workflow.schema module for workflow validation:
def parse_workflow(self, workflow_path):
workflow = load_workflow_file(workflow_path) # Existing function
validate_workflow(workflow) # Existing function
return workflow
Rationale: Reuse proven validation logic, maintain single source of truth for workflow structure validation.
2. Matrix Expansion Strategy
Matrix expansion uses cartesian product generation:
def expand_matrix(self, matrix):
variables = list(matrix.keys())
value_lists = list(matrix.values())
combinations = list(itertools.product(*value_lists))
jobs = []
for combination in combinations:
job = dict(zip(variables, combination))
jobs.append(job)
return jobs
Rationale: - Simple, predictable algorithm matching GitHub Actions behaviour - Easy to understand and debug - Supports arbitrary matrix dimensions - Deterministic ordering for reproducible results
3. Path Construction Pattern
Paths follow the established pattern from the examples:
# Input: {data_root}/{dataset}/input.csv
# Output: {output_root}/{workflow_id}/{dataset}_{algorithm}/
Rationale: - Consistent with documentation examples - Organises outputs by workflow and experiment parameters - Supports hierarchical result organisation - Compatible with existing action framework expectations
4. Error Handling Strategy
All methods use consistent error propagation:
try:
# Core logic
except Exception as e:
raise WorkflowExecutionError(f"Operation failed: {e}") from e
Rationale: - Maintains error chain for debugging - Provides consistent error interface - Follows established patterns in the codebase
Implementation Phases
Phase 1: Parsing Foundation (Complete ✅)
Scope: Basic workflow parsing and matrix expansion - Parse and validate YAML workflow files - Expand matrix variables into job configurations - Construct file paths from matrix variables - Comprehensive error handling
Test Coverage: 100% with edge case coverage - Unit tests with mocked dependencies - Functional tests with real YAML files - Exception handling verification
Phase 2: Execution Engine (Future)
Scope: Step execution and orchestration
- Execute workflow steps with action coordination
- Environment variable management
- Conditional execution (if: conditions)
- Step output handling and dependencies
Phase 3: Advanced Features (Future)
Scope: Enterprise workflow features
- DASK task graph integration
- Progress monitoring and status reporting
- Resource management and limits
- Workflow queue management
Integration Points
With Action Framework
# Future integration pattern
for step in workflow["steps"]:
action = ActionRegistry.get_action(step["uses"])
inputs = construct_action_inputs(step["with"], job_context)
result = action.run(inputs)
With Schema Validation
# Current integration
workflow = load_workflow_file(path) # schema.py
validate_workflow(workflow) # schema.py
jobs = executor.expand_matrix(workflow["matrix"]) # workflow.py
With DASK (Future)
# Planned integration
jobs = executor.expand_matrix(matrix)
task_graph = DaskBuilder.build_workflow_graph(jobs, workflow)
results = executor.execute_dask_graph(task_graph)
Testing Strategy
Unit Tests (Isolated Logic)
- Matrix expansion algorithm verification
- Path construction with various inputs
- Exception handling edge cases
- Mocked dependencies for isolation
Functional Tests (Real Operations)
- YAML file parsing with real workflow files
- Filesystem operations for temporary test files
- Integration with schema validation
- End-to-end workflow parsing scenarios
Edge Case Coverage
- Exception handling in matrix expansion (
itertools.productfailures) - Empty matrices and default value handling
- Invalid workflow file scenarios
- Missing matrix variable handling
Performance Characteristics
Matrix Expansion Scaling
- Time Complexity: O(n₁ × n₂ × ... × nₖ) where nᵢ is the size of each matrix dimension
- Space Complexity: O(jobs) linear in the number of generated job combinations
- Practical Limits: Reasonable for typical causal discovery experiments (< 1000 jobs)
Memory Usage
- Workflow parsing: Linear in YAML file size
- Matrix expansion: Linear in number of generated jobs
- Path construction: Constant per job
Future Optimisations
- Lazy matrix expansion for large matrices
- Streaming job processing for memory efficiency
- Job batching for DASK execution
Alignment with CausalIQ Standards
Development Guidelines Compliance
- ✅ Small incremental changes: 99-line focused implementation
- ✅ 100% test coverage: Comprehensive unit and functional tests
- ✅ CI compliance: All formatting, linting, and type checking standards met
- ✅ British English: Documentation and code comments
- ✅ Type safety: Complete type annotations with mypy validation
Architecture Principles
- ✅ GitHub Actions foundation: Consistent with CI/CD workflow patterns
- ✅ Action-based components: Integration with existing action framework
- ✅ Schema-first design: Leverage existing validation infrastructure
- ✅ Incremental functionality: Foundation for future execution features
This design provides a solid foundation for workflow execution while maintaining the incremental development approach and high quality standards established in the CausalIQ Workflow project.