Skip to content

WorkflowExecutor Implementation Design

Overview

The WorkflowExecutor class provides the foundation for parsing and executing GitHub Actions-style YAML workflows with matrix expansion support. This implementation focuses on the parsing and preparation phase, establishing the infrastructure for workflow execution.

Implementation Architecture

Class Structure

class WorkflowExecutor:
    """Parse and execute GitHub Actions-style workflows with matrix expansion."""

    def parse_workflow(self, workflow_path: Union[str, Path]) -> Dict[str, Any]
    def expand_matrix(self, matrix: Dict[str, List[Any]]) -> List[Dict[str, Any]]
    def construct_paths(self, job: Dict[str, Any], data_root: str, 
                       output_root: str, workflow_id: str) -> Dict[str, str]

Key Design Decisions

1. Integration with Existing Schema Validation

The WorkflowExecutor leverages the existing causaliq_workflow.schema module for workflow validation:

def parse_workflow(self, workflow_path):
    workflow = load_workflow_file(workflow_path)  # Existing function
    validate_workflow(workflow)                   # Existing function
    return workflow

Rationale: Reuse proven validation logic, maintain single source of truth for workflow structure validation.

2. Matrix Expansion Strategy

Matrix expansion uses cartesian product generation:

def expand_matrix(self, matrix):
    variables = list(matrix.keys())
    value_lists = list(matrix.values())
    combinations = list(itertools.product(*value_lists))

    jobs = []
    for combination in combinations:
        job = dict(zip(variables, combination))
        jobs.append(job)
    return jobs

Rationale: - Simple, predictable algorithm matching GitHub Actions behaviour - Easy to understand and debug - Supports arbitrary matrix dimensions - Deterministic ordering for reproducible results

3. Path Construction Pattern

Paths follow the established pattern from the examples:

# Input: {data_root}/{dataset}/input.csv
# Output: {output_root}/{workflow_id}/{dataset}_{algorithm}/

Rationale: - Consistent with documentation examples - Organises outputs by workflow and experiment parameters - Supports hierarchical result organisation - Compatible with existing action framework expectations

4. Error Handling Strategy

All methods use consistent error propagation:

try:
    # Core logic
except Exception as e:
    raise WorkflowExecutionError(f"Operation failed: {e}") from e

Rationale: - Maintains error chain for debugging - Provides consistent error interface - Follows established patterns in the codebase

Implementation Phases

Phase 1: Parsing Foundation (Complete ✅)

Scope: Basic workflow parsing and matrix expansion - Parse and validate YAML workflow files - Expand matrix variables into job configurations - Construct file paths from matrix variables - Comprehensive error handling

Test Coverage: 100% with edge case coverage - Unit tests with mocked dependencies - Functional tests with real YAML files - Exception handling verification

Phase 2: Execution Engine (Future)

Scope: Step execution and orchestration - Execute workflow steps with action coordination - Environment variable management - Conditional execution (if: conditions) - Step output handling and dependencies

Phase 3: Advanced Features (Future)

Scope: Enterprise workflow features
- DASK task graph integration - Progress monitoring and status reporting - Resource management and limits - Workflow queue management

Integration Points

With Action Framework

# Future integration pattern
for step in workflow["steps"]:
    action = ActionRegistry.get_action(step["uses"])
    inputs = construct_action_inputs(step["with"], job_context)
    result = action.run(inputs)

With Schema Validation

# Current integration
workflow = load_workflow_file(path)     # schema.py
validate_workflow(workflow)             # schema.py
jobs = executor.expand_matrix(workflow["matrix"])  # workflow.py

With DASK (Future)

# Planned integration
jobs = executor.expand_matrix(matrix)
task_graph = DaskBuilder.build_workflow_graph(jobs, workflow)
results = executor.execute_dask_graph(task_graph)

Testing Strategy

Unit Tests (Isolated Logic)

  • Matrix expansion algorithm verification
  • Path construction with various inputs
  • Exception handling edge cases
  • Mocked dependencies for isolation

Functional Tests (Real Operations)

  • YAML file parsing with real workflow files
  • Filesystem operations for temporary test files
  • Integration with schema validation
  • End-to-end workflow parsing scenarios

Edge Case Coverage

  • Exception handling in matrix expansion (itertools.product failures)
  • Empty matrices and default value handling
  • Invalid workflow file scenarios
  • Missing matrix variable handling

Performance Characteristics

Matrix Expansion Scaling

  • Time Complexity: O(n₁ × n₂ × ... × nₖ) where nᵢ is the size of each matrix dimension
  • Space Complexity: O(jobs) linear in the number of generated job combinations
  • Practical Limits: Reasonable for typical causal discovery experiments (< 1000 jobs)

Memory Usage

  • Workflow parsing: Linear in YAML file size
  • Matrix expansion: Linear in number of generated jobs
  • Path construction: Constant per job

Future Optimisations

  • Lazy matrix expansion for large matrices
  • Streaming job processing for memory efficiency
  • Job batching for DASK execution

Alignment with CausalIQ Standards

Development Guidelines Compliance

  • Small incremental changes: 99-line focused implementation
  • 100% test coverage: Comprehensive unit and functional tests
  • CI compliance: All formatting, linting, and type checking standards met
  • British English: Documentation and code comments
  • Type safety: Complete type annotations with mypy validation

Architecture Principles

  • GitHub Actions foundation: Consistent with CI/CD workflow patterns
  • Action-based components: Integration with existing action framework
  • Schema-first design: Leverage existing validation infrastructure
  • Incremental functionality: Foundation for future execution features

This design provides a solid foundation for workflow execution while maintaining the incremental development approach and high quality standards established in the CausalIQ Workflow project.