Action Registry
The action registry provides centralised discovery, management, and execution of workflow actions with plugin architecture support.
The registry uses entry points to discover action providers from installed packages, enabling a clean plugin architecture where actions can be distributed as separate packages.
causaliq-core Integration
The registry imports the following from causaliq-core:
- CausalIQActionProvider - Base class that all discovered actions must implement
- ActionExecutionError - Exception type for action execution failures
Core Classes
causaliq_workflow.registry
ActionRegistry
ActionRegistry()
Registry for discovering and executing workflow actions dynamically.
Uses import-time introspection to automatically discover actions when packages are imported. No configuration needed - just import the package and use 'uses: package-name' in workflows.
Convention: Action packages should export a CausalIQActionProvider subclass named 'ActionProvider' in their init.py file to avoid namespace collisions.
Attributes:
-
_instance(Optional[ActionRegistry]) –Singleton instance of the ActionRegistry
-
_actions(Dict[str, Type[CausalIQActionProvider]]) –Dictionary mapping action names to CausalIQActionProvider classes
-
_entry_points(Dict[str, Any]) –Dictionary of lazy-loadable entry points
-
_discovery_errors(List[str]) –List of errors encountered during action discovery
Initialises
_actions: Dictionary mapping action names to CausalIQActionProvider _entry_points: Dictionary of entry points for lazy loading _discovery_errors: List to collect any discovery errors
Methods:
-
get_available_actions–Get dictionary of available action names to classes.
-
get_action_class–Get action class by name, loading from entry point if needed.
-
has_action–Check if action is available.
-
execute_action–Execute action with inputs and workflow context.
-
validate_workflow_actions–Validate all actions in workflow exist and can run.
get_available_actions
get_available_actions() -> Dict[str, Type[CausalIQActionProvider]]
Get dictionary of available action names to classes.
Note: Entry points that haven't been loaded yet will not appear in the returned dictionary. Use get_available_action_names() to get all available action names including lazy-loadable ones.
Returns:
-
Dict[str, Type[CausalIQActionProvider]]–Dictionary mapping action names to CausalIQActionProvider classes
get_action_class
get_action_class(name: str) -> Type[CausalIQActionProvider]
Get action class by name, loading from entry point if needed.
Parameters:
-
(namestr) –Action name
Returns:
-
Type[CausalIQActionProvider]–CausalIQActionProvider class
Raises:
-
ActionRegistryError–If action not found or fails to load
has_action
has_action(name: str) -> bool
Check if action is available.
Parameters:
-
(namestr) –Action name to check
Returns:
-
bool–True if action is available (loaded or lazy-loadable)
execute_action
execute_action(
name: str, inputs: Dict[str, Any], context: WorkflowContext
) -> Dict[str, Any]
Execute action with inputs and workflow context.
Extracts the 'action' key from inputs and passes it separately to the provider's run() method along with the remaining parameters.
Parameters:
-
(namestr) –Provider name (e.g., 'causaliq/knowledge')
-
(inputsDict[str, Any]) –Action parameters including 'action' key
-
(contextWorkflowContext) –Complete workflow context
Returns:
-
Dict[str, Any]–Action outputs dictionary with structure: - status: "success", "skipped", or "error" - **metadata: Execution metadata flattened into result - objects: List of object dicts
Raises:
-
ActionRegistryError–If action not found or execution fails
validate_workflow_actions
validate_workflow_actions(workflow: Dict[str, Any]) -> List[str]
Validate all actions in workflow exist and can run.
Parameters:
-
(workflowDict[str, Any]) –Parsed workflow dictionary
Returns:
-
List[str]–List of validation errors (empty if valid)
WorkflowContext
dataclass
WorkflowContext(
mode: str,
matrix: Dict[str, List[Any]],
matrix_values: Dict[str, Any] = dict(),
cache: Optional[WorkflowCache] = None,
job_index: int = 0,
total_jobs: int = 1,
)
Workflow context for action execution optimisation.
Provides minimal context needed for actions to optimise across workflows. Actions receive specific data through inputs; context provides meta-information.
Attributes:
-
mode(str) –Execution mode ('dry-run', 'run', 'compare')
-
matrix(Dict[str, List[Any]]) –Complete matrix definition for cross-job optimisation
-
matrix_values(Dict[str, Any]) –Current job's specific matrix variable values
-
cache(Optional[WorkflowCache]) –Optional WorkflowCache for storing step results
-
job_index(int) –Current job index (0-based) in matrix expansion
-
total_jobs(int) –Total number of jobs in matrix expansion
matrix_key
property
matrix_key: str
Compute cache key from matrix values.
Returns a truncated SHA-256 hash (16 hex characters) of the matrix variable values, suitable for use as a cache key.
The hash is computed from JSON-serialised matrix_values with sorted keys for deterministic ordering.
Returns:
-
str–Truncated hex hash string (16 characters), or empty string
-
str–if matrix_values is empty.
Example
context = WorkflowContext( ... mode="run", ... matrix={"algorithm": ["pc", "ges"]}, ... matrix_values={"algorithm": "pc"} ... ) len(context.matrix_key) 16
Exception Handling
ActionRegistryError
Raised when action registry operations fail.
This exception is raised when: - Requested action is not found in the registry - Action discovery fails during module scanning - Action validation fails - Other registry-related errors occur
Usage Examples
Basic Registry Operations
from causaliq_workflow.registry import ActionRegistry, ActionRegistryError
# Create registry instance
registry = ActionRegistry()
# List all available actions
actions = registry.get_available_actions()
for action_name, action_class in actions.items():
print(f"Action: {action_name} (v{action_class.version})")
# Check if specific action exists
if registry.has_action("causaliq-workflow"):
action_class = registry.get_action_class("causaliq-workflow")
print(f"Found action: {action_class.description}")
# Execute action directly through registry
try:
result = registry.execute_action(
"causaliq-workflow",
action="echo",
parameters={"message": "Hello", "nodes": 3},
mode="dry-run",
)
print(f"Execution result: {result}")
except ActionRegistryError as e:
print(f"Registry error: {e}")
Entry Point Discovery
Actions are discovered via Python entry points. Packages register their
actions in pyproject.toml:
[project.entry-points."causaliq.actions"]
my-package = "my_package:ActionProvider"
The registry discovers entry points at startup (metadata only) and loads them lazily on first use to avoid circular imports.
# Entry points are loaded when first accessed
registry = ActionRegistry()
# Check discovered entry points
print(f"Available actions: {list(registry.get_available_actions().keys())}")
Workflow Context
from causaliq_workflow.registry import WorkflowContext
# Create workflow context for action execution
context = WorkflowContext(
mode="run",
matrix={"dataset": ["asia", "cancer"], "algorithm": ["pc", "ges"]},
matrix_values={"dataset": "asia", "algorithm": "pc"},
)
# Context provides execution metadata for action optimisation
print(f"Execution mode: {context.mode}")
print(f"Matrix definition: {context.matrix}")
print(f"Current matrix values: {context.matrix_values}")
# Get cache key for current matrix combination
print(f"Matrix key: {context.matrix_key}") # SHA-256 hash, 16 chars
Architecture Notes
The ActionRegistry uses Python's entry point system to automatically find and register actions. Actions are discovered by:
- Entry point scanning - Discovers
causaliq.actionsentry points from installed packages - Lazy loading - Entry points recorded at startup but loaded on first use
- Module fallback - Also scans imported modules for CausalIQActionProvider subclasses
- Name-based lookup - Actions identified by their entry point name or
nameclass attribute
This design enables a flexible plugin architecture where actions can be distributed as separate packages and automatically discovered at runtime, while avoiding circular import issues.
← Previous: Actions | Back to API Overview | Next: Workflow Engine →