Workflow Engine
The workflow execution engine provides powerful workflow parsing, validation, and execution with matrix expansion and template variable support.
Core Classes
causaliq_workflow.workflow
WorkflowExecutor
WorkflowExecutor()
Parse and execute GitHub Actions-style workflows with matrix expansion.
This class handles the parsing of YAML workflow files and expansion of matrix strategies into individual experiment jobs. It provides the foundation for executing multi-step causal discovery workflows with parameterised experiments using flexible action parameter templating.
Methods:
-
parse_workflow–Parse workflow YAML file with validation.
-
expand_matrix–Expand matrix variables into individual job configurations.
-
execute_workflow–Execute complete workflow with matrix expansion.
parse_workflow
parse_workflow(
workflow_path: Union[str, Path], mode: str = "dry-run"
) -> Dict[str, Any]
Parse workflow YAML file with validation.
Parameters:
-
(workflow_pathUnion[str, Path]) –Path to workflow YAML file
-
(modestr, default:'dry-run') –Execution mode for action validation
Returns:
-
Dict[str, Any]–Parsed and validated workflow dictionary
Raises:
-
WorkflowExecutionError–If workflow parsing or validation fails
expand_matrix
expand_matrix(matrix: Dict[str, List[Any]]) -> List[Dict[str, Any]]
Expand matrix variables into individual job configurations.
Generates all combinations from matrix variables using cartesian product. Each combination becomes a separate job configuration.
Range strings like "0-24" are automatically expanded to individual integer values [0, 1, 2, ..., 24].
Parameters:
-
(matrixDict[str, List[Any]]) –Dictionary mapping variable names to lists of values
Returns:
-
List[Dict[str, Any]]–List of job configurations with matrix variables expanded
Raises:
-
WorkflowExecutionError–If matrix expansion fails
execute_workflow
execute_workflow(
workflow: Dict[str, Any],
mode: str = "dry-run",
cli_params: Optional[Dict[str, Any]] = None,
step_logger: Optional[Callable[[str, str, str, Dict[str, Any]], None]] = None,
) -> List[Dict[str, Any]]
Execute complete workflow with matrix expansion.
Uses two-pass execution:
- Validation pass: Iterates all entries (matrix combos/cache entries) and validates action parameters. This catches semantic errors like undefined filter variables before any execution.
- Execution pass: If validation passes, executes the workflow.
Caching is controlled at the step level via the 'output' parameter in each step's 'with' block. Each step can write to its own cache.
Parameters:
-
(workflowDict[str, Any]) –Parsed workflow dictionary
-
(modestr, default:'dry-run') –Execution mode ('dry-run', 'run', 'compare')
-
(cli_paramsOptional[Dict[str, Any]], default:None) –Additional parameters from CLI
-
(step_loggerOptional[Callable[[str, str, str, Dict[str, Any]], None]], default:None) –Optional function to log step execution (action_method, step_name, status, matrix_values)
Returns:
-
List[Dict[str, Any]]–List of job results from matrix expansion
Raises:
-
WorkflowExecutionError–If validation or execution fails
Exception Handling
WorkflowExecutionError
Raised when workflow execution fails.
Usage Examples
Basic Workflow Execution
from causaliq_workflow import WorkflowExecutor, WorkflowExecutionError
from causaliq_workflow.registry import ActionRegistry
# Create executor instance
executor = WorkflowExecutor()
try:
# Parse and validate workflow (includes template variable validation)
workflow = executor.parse_workflow("experiment.yml")
print(f"Workflow ID: {workflow['id']}")
print(f"Description: {workflow['description']}")
# Execute the complete workflow
results = executor.execute_workflow(workflow, mode="run")
print(f"Workflow completed successfully")
except WorkflowExecutionError as e:
print(f"Workflow execution failed: {e}")
Matrix Expansion
from causaliq_workflow import WorkflowExecutor
executor = WorkflowExecutor()
# Define matrix for parameter sweeps
matrix = {
"algorithm": ["pc", "ges", "lingam"],
"dataset": ["asia", "cancer"],
"alpha": [0.01, 0.05]
}
# Expand into individual parameter combinations
jobs = executor.expand_matrix(matrix)
print(f"Generated {len(jobs)} jobs from matrix") # Results in 12 jobs (3 × 2 × 2)
for i, job in enumerate(jobs):
print(f"Job {i}: Algorithm={job['algorithm']}, Dataset={job['dataset']}, Alpha={job['alpha']}")
Template Variable System
# The WorkflowExecutor automatically validates template variables during parsing
# Template variables ({{variable}}) are checked against available context
# Example: Valid template usage
valid_workflow = {
"id": "test-001",
"description": "Template validation example",
"matrix": {"dataset": ["asia"], "algorithm": ["pc"]},
"steps": [{
"uses": "my-custom-action",
"with": {
"output": "/results/{{id}}/{{dataset}}_{{algorithm}}.xml",
"description": "Processing {{dataset}} with {{algorithm}}"
}
}]
}
try:
workflow = executor.parse_workflow_dict(valid_workflow)
print("Template validation passed!")
except WorkflowExecutionError as e:
if "Unknown template variables" in str(e):
print(f"Template validation failed: {e}")
# Example: "Unknown template variables: missing_var. Available context: id, dataset, algorithm"
else:
print(f"Workflow execution failed: {e}")
Advanced Workflow Features
# Example workflow YAML showing flexible action parameters
workflow_yaml = \"\"\"
id: "experiment-001"
description: "Flexible causal discovery experiment"
matrix:
dataset: ["asia", "cancer"]
algorithm: ["pc", "ges"]
alpha: [0.01, 0.05]
steps:
- name: "Structure Learning"
uses: "my-structure-learner"
with:
data_path: "/experiments/data/{{dataset}}.csv"
output_dir: "/experiments/results/{{id}}/{{algorithm}}/"
alpha: "{{alpha}}"
max_iter: 1000
- name: "Validation"
uses: "validate-graph"
with:
graph_path: "/experiments/results/{{id}}/{{algorithm}}/graph.graphml"
metrics_output: "/experiments/results/{{id}}/{{algorithm}}/metrics.json"
\"\"\"
# Save and parse the workflow
with open("experiment.yml", "w") as f:
f.write(workflow_yaml)
workflow = executor.parse_workflow("experiment.yml")
# Matrix expansion creates jobs with substituted variables
if "matrix" in workflow:
jobs = executor.expand_matrix(workflow["matrix"])
# Creates 8 jobs (2×2×2) with customizable file paths
for job in jobs:
print(f"Job: {job}")
# Example output: {'dataset': 'asia', 'algorithm': 'pc', 'alpha': 0.01}
Template Variable Context
Template variables can reference:
- Workflow properties:
{{id}},{{description}},{{name}} - Matrix variables: Any variables defined in the
matrixsection - Step context: Variables available during step execution
- File paths: Dynamic path generation using workflow context
Available Template Variables
| Context | Variables | Example |
|---|---|---|
| Workflow | id, description, name |
{{id}} |
| Matrix | User-defined matrix vars | {{dataset}}, {{algorithm}} |
| Paths | data_root, output_root |
{{output_root}}/results |
Aggregation Processing
Aggregation mode enables workflow steps to consume entries from multiple cache entries. This is essential for research workflows that merge graphs or compute aggregate statistics across parameter sweeps.
Aggregation Configuration
AggregationConfig
dataclass
AggregationConfig(
input_caches: List[str] = list(),
filter_expr: Optional[str] = None,
matrix_vars: List[str] = list(),
)
Configuration for aggregation mode execution.
Aggregation mode is activated when a workflow step has:
- An input parameter specifying workflow cache(s)
- A matrix definition in the workflow
The matrix variables define the grouping dimensions for aggregation.
Attributes:
-
filter_expr(Optional[str]) –Optional filter expression to restrict input entries.
-
input_caches(List[str]) –List of input workflow cache paths.
-
matrix_vars(List[str]) –Matrix variables defining grouping dimensions.
filter_expr
class-attribute
instance-attribute
filter_expr: Optional[str] = None
Optional filter expression to restrict input entries.
input_caches
class-attribute
instance-attribute
input_caches: List[str] = field(default_factory=list)
List of input workflow cache paths.
matrix_vars
class-attribute
instance-attribute
matrix_vars: List[str] = field(default_factory=list)
Matrix variables defining grouping dimensions.
Aggregation Example
# Example workflow with aggregation
workflow_yaml = \"\"\"
id: "merge_experiment"
description: "Merge LLM-generated graphs"
matrix:
model: ["asia", "cancer"]
steps:
- name: "Merge Graphs"
uses: "causaliq-analysis"
with:
action: "merge_graphs"
input: "results/llm_graphs.db" # .db triggers aggregation
output: "results/merged.db"
\"\"\"
# Execute with aggregation mode
executor = WorkflowExecutor()
workflow = executor.parse_workflow_dict(yaml.safe_load(workflow_yaml))
# For each matrix value (asia, cancer), the action receives
# all cache entries from llm_graphs.db that match that model
results = executor.execute_workflow(workflow, mode="run")
Filter Expressions
Use filters to restrict which entries are aggregated:
steps:
- name: "Merge Filtered"
uses: "causaliq-analysis"
with:
action: "merge_graphs"
aggregate: "results/graphs.db"
filter: "algorithm == 'pc' and sample_size >= 1000"
output: "results/merged.db"
Error Handling
The WorkflowExecutor provides detailed error reporting for:
- Parse errors: Invalid YAML/JSON syntax
- Schema validation: Workflow structure validation
- Template errors: Unknown or invalid template variables
- Action errors: Action execution failures
- Matrix errors: Invalid matrix definitions