Skip to content

Workflow Engine

The workflow execution engine provides powerful workflow parsing, validation, and execution with matrix expansion and template variable support.

Core Classes

causaliq_workflow.workflow

WorkflowExecutor

WorkflowExecutor()

Parse and execute GitHub Actions-style workflows with matrix expansion.

This class handles the parsing of YAML workflow files and expansion of matrix strategies into individual experiment jobs. It provides the foundation for executing multi-step causal discovery workflows with parameterised experiments using flexible action parameter templating.

Methods:

parse_workflow

parse_workflow(
    workflow_path: Union[str, Path], mode: str = "dry-run"
) -> Dict[str, Any]

Parse workflow YAML file with validation.

Parameters:

  • workflow_path
    (Union[str, Path]) –

    Path to workflow YAML file

  • mode
    (str, default: 'dry-run' ) –

    Execution mode for action validation

Returns:

  • Dict[str, Any]

    Parsed and validated workflow dictionary

Raises:

expand_matrix

expand_matrix(matrix: Dict[str, List[Any]]) -> List[Dict[str, Any]]

Expand matrix variables into individual job configurations.

Generates all combinations from matrix variables using cartesian product. Each combination becomes a separate job configuration.

Range strings like "0-24" are automatically expanded to individual integer values [0, 1, 2, ..., 24].

Parameters:

  • matrix
    (Dict[str, List[Any]]) –

    Dictionary mapping variable names to lists of values

Returns:

  • List[Dict[str, Any]]

    List of job configurations with matrix variables expanded

Raises:

execute_workflow

execute_workflow(
    workflow: Dict[str, Any],
    mode: str = "dry-run",
    cli_params: Optional[Dict[str, Any]] = None,
    step_logger: Optional[Callable[[str, str, str, Dict[str, Any]], None]] = None,
) -> List[Dict[str, Any]]

Execute complete workflow with matrix expansion.

Uses two-pass execution:

  1. Validation pass: Iterates all entries (matrix combos/cache entries) and validates action parameters. This catches semantic errors like undefined filter variables before any execution.
  2. Execution pass: If validation passes, executes the workflow.

Caching is controlled at the step level via the 'output' parameter in each step's 'with' block. Each step can write to its own cache.

Parameters:

  • workflow
    (Dict[str, Any]) –

    Parsed workflow dictionary

  • mode
    (str, default: 'dry-run' ) –

    Execution mode ('dry-run', 'run', 'compare')

  • cli_params
    (Optional[Dict[str, Any]], default: None ) –

    Additional parameters from CLI

  • step_logger
    (Optional[Callable[[str, str, str, Dict[str, Any]], None]], default: None ) –

    Optional function to log step execution (action_method, step_name, status, matrix_values)

Returns:

  • List[Dict[str, Any]]

    List of job results from matrix expansion

Raises:

Exception Handling

WorkflowExecutionError

Raised when workflow execution fails.


Usage Examples

Basic Workflow Execution

from causaliq_workflow import WorkflowExecutor, WorkflowExecutionError
from causaliq_workflow.registry import ActionRegistry

# Create executor instance
executor = WorkflowExecutor()

try:
    # Parse and validate workflow (includes template variable validation)
    workflow = executor.parse_workflow("experiment.yml")
    print(f"Workflow ID: {workflow['id']}")
    print(f"Description: {workflow['description']}")

    # Execute the complete workflow
    results = executor.execute_workflow(workflow, mode="run")
    print(f"Workflow completed successfully")

except WorkflowExecutionError as e:
    print(f"Workflow execution failed: {e}")

Matrix Expansion

from causaliq_workflow import WorkflowExecutor

executor = WorkflowExecutor()

# Define matrix for parameter sweeps
matrix = {
    "algorithm": ["pc", "ges", "lingam"],
    "dataset": ["asia", "cancer"],
    "alpha": [0.01, 0.05]
}

# Expand into individual parameter combinations
jobs = executor.expand_matrix(matrix)
print(f"Generated {len(jobs)} jobs from matrix")  # Results in 12 jobs (3 × 2 × 2)

for i, job in enumerate(jobs):
    print(f"Job {i}: Algorithm={job['algorithm']}, Dataset={job['dataset']}, Alpha={job['alpha']}")

Template Variable System

# The WorkflowExecutor automatically validates template variables during parsing
# Template variables ({{variable}}) are checked against available context

# Example: Valid template usage
valid_workflow = {
    "id": "test-001",
    "description": "Template validation example", 
    "matrix": {"dataset": ["asia"], "algorithm": ["pc"]},
    "steps": [{
        "uses": "my-custom-action",
        "with": {
            "output": "/results/{{id}}/{{dataset}}_{{algorithm}}.xml",
            "description": "Processing {{dataset}} with {{algorithm}}"
        }
    }]
}

try:
    workflow = executor.parse_workflow_dict(valid_workflow)
    print("Template validation passed!")
except WorkflowExecutionError as e:
    if "Unknown template variables" in str(e):
        print(f"Template validation failed: {e}")
        # Example: "Unknown template variables: missing_var. Available context: id, dataset, algorithm"
    else:
        print(f"Workflow execution failed: {e}")

Advanced Workflow Features

# Example workflow YAML showing flexible action parameters
workflow_yaml = \"\"\"
id: "experiment-001"
description: "Flexible causal discovery experiment"
matrix:
  dataset: ["asia", "cancer"]
  algorithm: ["pc", "ges"]
  alpha: [0.01, 0.05]

steps:
  - name: "Structure Learning"
    uses: "my-structure-learner"
    with:
      data_path: "/experiments/data/{{dataset}}.csv"
      output_dir: "/experiments/results/{{id}}/{{algorithm}}/"
      alpha: "{{alpha}}"
      max_iter: 1000

  - name: "Validation"
    uses: "validate-graph"
    with:
      graph_path: "/experiments/results/{{id}}/{{algorithm}}/graph.graphml"
      metrics_output: "/experiments/results/{{id}}/{{algorithm}}/metrics.json"
\"\"\"

# Save and parse the workflow
with open("experiment.yml", "w") as f:
    f.write(workflow_yaml)

workflow = executor.parse_workflow("experiment.yml")

# Matrix expansion creates jobs with substituted variables
if "matrix" in workflow:
    jobs = executor.expand_matrix(workflow["matrix"])
    # Creates 8 jobs (2×2×2) with customizable file paths

    for job in jobs:
        print(f"Job: {job}")
        # Example output: {'dataset': 'asia', 'algorithm': 'pc', 'alpha': 0.01}

Template Variable Context

Template variables can reference:

  1. Workflow properties: {{id}}, {{description}}, {{name}}
  2. Matrix variables: Any variables defined in the matrix section
  3. Step context: Variables available during step execution
  4. File paths: Dynamic path generation using workflow context

Available Template Variables

Context Variables Example
Workflow id, description, name {{id}}
Matrix User-defined matrix vars {{dataset}}, {{algorithm}}
Paths data_root, output_root {{output_root}}/results

Aggregation Processing

Aggregation mode enables workflow steps to consume entries from multiple cache entries. This is essential for research workflows that merge graphs or compute aggregate statistics across parameter sweeps.

Aggregation Configuration

AggregationConfig dataclass

AggregationConfig(
    input_caches: List[str] = list(),
    filter_expr: Optional[str] = None,
    matrix_vars: List[str] = list(),
)

Configuration for aggregation mode execution.

Aggregation mode is activated when a workflow step has: - An input parameter specifying workflow cache(s) - A matrix definition in the workflow

The matrix variables define the grouping dimensions for aggregation.

Attributes:

  • filter_expr (Optional[str]) –

    Optional filter expression to restrict input entries.

  • input_caches (List[str]) –

    List of input workflow cache paths.

  • matrix_vars (List[str]) –

    Matrix variables defining grouping dimensions.

filter_expr class-attribute instance-attribute
filter_expr: Optional[str] = None

Optional filter expression to restrict input entries.

input_caches class-attribute instance-attribute
input_caches: List[str] = field(default_factory=list)

List of input workflow cache paths.

matrix_vars class-attribute instance-attribute
matrix_vars: List[str] = field(default_factory=list)

Matrix variables defining grouping dimensions.

Aggregation Example

# Example workflow with aggregation
workflow_yaml = \"\"\"
id: "merge_experiment"
description: "Merge LLM-generated graphs"
matrix:
  model: ["asia", "cancer"]

steps:
  - name: "Merge Graphs"
    uses: "causaliq-analysis"
    with:
      action: "merge_graphs"
      input: "results/llm_graphs.db"  # .db triggers aggregation
      output: "results/merged.db"
\"\"\"

# Execute with aggregation mode
executor = WorkflowExecutor()
workflow = executor.parse_workflow_dict(yaml.safe_load(workflow_yaml))

# For each matrix value (asia, cancer), the action receives
# all cache entries from llm_graphs.db that match that model
results = executor.execute_workflow(workflow, mode="run")

Filter Expressions

Use filters to restrict which entries are aggregated:

steps:
  - name: "Merge Filtered"
    uses: "causaliq-analysis"
    with:
      action: "merge_graphs"
      aggregate: "results/graphs.db"
      filter: "algorithm == 'pc' and sample_size >= 1000"
      output: "results/merged.db"

Error Handling

The WorkflowExecutor provides detailed error reporting for:

  • Parse errors: Invalid YAML/JSON syntax
  • Schema validation: Workflow structure validation
  • Template errors: Unknown or invalid template variables
  • Action errors: Action execution failures
  • Matrix errors: Invalid matrix definitions

← Previous: Registry | Back to API Overview | Next: Cache →