Skip to content

Workflow Cache API

The workflow cache provides SQLite-based storage for workflow step results, enabling conservative execution and reproducibility. It is built on causaliq-core's TokenCache infrastructure.

causaliq-core Foundation

The workflow cache imports the following from causaliq-core:

  • TokenCache - SQLite-based caching with tokenised JSON storage
  • JsonCompressor - Compressor for JSON tokenisation
  • Compressor - Abstract compressor interface

Core Classes

causaliq_workflow.cache.WorkflowCache

WorkflowCache

WorkflowCache(db_path: str | Path)

High-level cache for workflow step results.

Provides a simplified interface for storing and retrieving workflow results as CacheEntry objects. Uses matrix variable values as cache keys, with SHA-256 hashing for compact storage.

Each entry contains metadata and named objects (e.g., 'graph', 'confidences'), allowing a single workflow step to produce multiple outputs that are stored together.

Attributes:

  • db_path

    Path to SQLite database file, or ":memory:" for in-memory.

Example

from causaliq_workflow.cache import WorkflowCache, CacheEntry with WorkflowCache(":memory:") as cache: ... entry = CacheEntry() ... entry.metadata["node_count"] = 5 ... entry.add_object("dag", "graphml", "...", "learn") ... key = {"algorithm": "pc", "network": "asia"} ... cache.put(key, entry) ... result = cache.get(key) ... print(result.metadata) {'node_count': 5}

Parameters:

  • db_path

    (str | Path) –

    Path to SQLite database file. Use ":memory:" for in-memory database (fast, non-persistent).

Methods:

  • open

    Open the database connection and initialise schema.

  • close

    Close the database connection.

  • put

    Store a workflow entry in the cache.

  • get

    Retrieve a workflow entry from the cache.

  • exists

    Check if a cache entry exists.

  • entry_count

    Count cache entries (excluding internal config).

  • export

    Export cache entries to directory or zip file.

  • import_entries

    Import cache entries from directory or zip file.

open

open() -> WorkflowCache

Open the database connection and initialise schema.

Returns:

Raises:

  • RuntimeError

    If already connected.

close

close() -> None

Close the database connection.

put

put(key_data: dict[str, Any], entry: CacheEntry) -> str

Store a workflow entry in the cache.

If an entry with the same key already exists, it is replaced.

Validates that key_data uses the same matrix variable names as existing entries to ensure cache consistency.

Parameters:

  • key_data
    (dict[str, Any]) –

    Dictionary of matrix variable values (cache key).

  • entry
    (CacheEntry) –

    CacheEntry containing metadata and objects.

Returns:

  • str

    The hash key used for storage.

Raises:

  • MatrixSchemaError

    If key_data uses different variable names than existing entries.

Example

with WorkflowCache(":memory:") as cache: ... entry = CacheEntry() ... entry.metadata["result"] = "ok" ... key = {"algorithm": "pc"} ... hash_key = cache.put(key, entry)

get

get(key_data: dict[str, Any]) -> CacheEntry | None

Retrieve a workflow entry from the cache.

Parameters:

  • key_data
    (dict[str, Any]) –

    Dictionary of matrix variable values (cache key).

Returns:

  • CacheEntry | None

    CacheEntry if found, None otherwise.

Example

with WorkflowCache(":memory:") as cache: ... entry = CacheEntry() ... entry.metadata["result"] = "ok" ... cache.put({"algo": "pc"}, entry) ... result = cache.get({"algo": "pc"}) ... print(result.metadata) {'result': 'ok'}

exists

exists(key_data: dict[str, Any]) -> bool

Check if a cache entry exists.

Parameters:

  • key_data
    (dict[str, Any]) –

    Dictionary of matrix variable values (cache key).

Returns:

  • bool

    True if entry exists, False otherwise.

Example

with WorkflowCache(":memory:") as cache: ... cache.exists({"algo": "pc"}) # False ... cache.put({"algo": "pc"}, CacheEntry()) ... cache.exists({"algo": "pc"}) # True

entry_count

entry_count() -> int

Count cache entries (excluding internal config).

Returns:

  • int

    Number of entries in the cache.

export

export(output_path: str | Path, matrix_keys: list[str] | None = None) -> int

Export cache entries to directory or zip file.

Creates a hierarchical directory structure based on matrix variable values. Each entry's objects are written as individual files.

The output format is determined by the path extension: - Path ending in .zip: creates a zip archive - Otherwise: creates a directory structure

Parameters:

  • output_path
    (str | Path) –

    Path to output directory or .zip file.

  • matrix_keys
    (list[str] | None, default: None ) –

    Ordered list of matrix variable names for directory hierarchy. If None, uses stored matrix key order from workflow, or falls back to alphabetical order.

Returns:

  • int

    Number of entries exported.

Example

with WorkflowCache("cache.db") as cache: ... # Export to dir: asia/pc/graph.graphml ... cache.export("./out", ["dataset", "algorithm"]) ... # Export to zip file ... cache.export("./out.zip", ["dataset"])

import_entries

import_entries(input_path: str | Path) -> int

Import cache entries from directory or zip file.

Reads entries exported by export() and stores them back into the cache. The input format is determined by the path: - Path ending in .zip: reads from a zip archive - Otherwise: reads from a directory structure

Parameters:

  • input_path
    (str | Path) –

    Path to input directory or .zip file.

Returns:

  • int

    Number of entries imported.

Raises:

  • FileNotFoundError

    If input_path does not exist.

Example

with WorkflowCache("cache.db") as cache: ... # Import from directory ... cache.import_entries("./exported") ... # Import from zip file ... cache.import_entries("./exported.zip")

causaliq_workflow.cache.CacheEntry

CacheEntry dataclass

CacheEntry(metadata: Dict[str, Any] = dict(), objects: Dict[str, CacheObject] = dict())

A cached workflow result containing metadata and typed objects.

Represents a single cache entry identified by matrix variable values. Contains workflow metadata and zero or more typed objects. Each object has a semantic type (e.g., 'dag', 'pdg') and serialisation format.

Objects are keyed by their semantic type. Each entry may contain at most one object of each type.

The entry structure maps directly to TokenCache storage: - metadata → TokenCache metadata field - objects → TokenCache data field

Attributes:

  • metadata (Dict[str, Any]) –

    Workflow metadata dictionary.

  • objects (Dict[str, CacheObject]) –

    Typed objects dictionary (type → CacheObject).

Example

entry = CacheEntry() entry.metadata["node_count"] = 5 entry.add_object("pdg", "graphml", "...") entry.add_object("trace", "json", '{"iterations": [...]}')

Methods:

add_object

add_object(
    obj_type: str, obj_format: str, content: Any, action: str = "unknown"
) -> None

Add or replace a typed object.

Parameters:

  • obj_type
    (str) –

    Semantic object type (e.g., 'dag', 'pdg', 'trace').

  • obj_format
    (str) –

    Serialisation format (e.g., 'graphml', 'json').

  • content
    (Any) –

    Object content.

  • action
    (str, default: 'unknown' ) –

    Name of the action creating this object.

Example

entry = CacheEntry() entry.add_object("pdg", "graphml", "", "merge")

from_action_result classmethod

from_action_result(
    metadata: Dict[str, Any], objects: list[Dict[str, Any]]
) -> CacheEntry

Create from action result format.

Converts the action return format (metadata dict and objects list) to a CacheEntry. Objects are keyed by their 'type' field.

Parameters:

  • metadata
    (Dict[str, Any]) –

    Action metadata dictionary.

  • objects
    (list[Dict[str, Any]]) –

    List of object dicts with 'type', 'format', 'action', 'content'.

Returns:

Example

entry = CacheEntry.from_action_result( ... {"node_count": 5}, ... [{"type": "pdg", "format": "graphml", ... "action": "merge_graphs", "content": "..."}] ... )

from_storage classmethod

from_storage(
    data: Dict[str, Any] | None, metadata: Dict[str, Any] | None
) -> CacheEntry

Create from TokenCache storage format.

Parameters:

  • data
    (Dict[str, Any] | None) –

    Objects dict from TokenCache.get_data().

  • metadata
    (Dict[str, Any] | None) –

    Metadata dict from TokenCache.

Returns:

get_object

get_object(obj_type: str) -> CacheObject | None

Get an object by type.

Parameters:

  • obj_type
    (str) –

    Object type to retrieve (e.g., 'dag', 'pdg').

Returns:

  • CacheObject | None

    CacheObject if found, None otherwise.

has_object

has_object(obj_type: str) -> bool

Check if an object type exists.

Parameters:

  • obj_type
    (str) –

    Object type to check.

Returns:

  • bool

    True if object exists.

object_types

object_types() -> list[str]

Get list of object types in this entry.

Returns:

  • list[str]

    List of object types.

remove_object

remove_object(obj_type: str) -> bool

Remove an object by type.

Parameters:

  • obj_type
    (str) –

    Object type to remove.

Returns:

  • bool

    True if object was removed, False if not found.

to_action_result

to_action_result() -> tuple[Dict[str, Any], list[Dict[str, Any]]]

Convert to action result format.

Returns:

  • tuple[Dict[str, Any], list[Dict[str, Any]]]

    Tuple of (metadata, objects_list) matching action return format.

to_storage

to_storage() -> tuple[Dict[str, Any], Dict[str, Any]]

Convert to storage format for TokenCache.

Returns:

  • Dict[str, Any]

    Tuple of (data, metadata) for TokenCache.put_data().

  • Dict[str, Any]
    • data: Objects dict serialised to dicts
  • tuple[Dict[str, Any], Dict[str, Any]]
    • metadata: Entry metadata dict

causaliq_workflow.cache.CacheObject

CacheObject dataclass

CacheObject(format: str, action: str, content: Any)

A typed object within a cache entry.

Represents a single piece of data with a serialisation format and the action that created it. Objects are keyed by semantic type (e.g., 'dag', 'pdg') within a CacheEntry.

Attributes:

  • format (str) –

    Serialisation format (e.g., 'graphml', 'json', 'csv').

  • action (str) –

    Name of the action that created this object.

  • content (Any) –

    The object content (string for serialised formats).

Example

obj = CacheObject( ... format="graphml", ... action="migrate_trace", ... content="..." ... ) obj.format 'graphml' obj.action 'migrate_trace'

Methods:

  • from_dict

    Create from dictionary.

  • to_dict

    Convert to dictionary for serialisation.

from_dict classmethod

from_dict(data: Dict[str, Any]) -> CacheObject

Create from dictionary.

Parameters:

  • data
    (Dict[str, Any]) –

    Dictionary with 'format', 'action', and 'content' keys.

Returns:

to_dict

to_dict() -> Dict[str, Any]

Convert to dictionary for serialisation.

Returns:

  • Dict[str, Any]

    Dictionary with 'format', 'action', and 'content' keys.

Exception Handling

causaliq_workflow.cache.MatrixSchemaError

MatrixSchemaError

Raised when matrix variable keys don't match existing cache entries.

Once a cache contains entries, all subsequent entries must use the same matrix variable names. This ensures cache consistency and prevents accidental data corruption from mismatched workflows.

Example

Cache has entries with keys {"algorithm", "dataset"}

Trying to add entry with {"algorithm", "network"} raises

raise MatrixSchemaError( ... "Matrix keys mismatch: got {'algorithm', 'network'}, " ... "expected {'algorithm', 'dataset'}" ... )


Usage Examples

Basic Cache Operations

from causaliq_workflow.cache import WorkflowCache, CacheEntry

# Create and use cache with context manager
with WorkflowCache("results/experiment.db") as cache:
    # Create an entry with metadata
    entry = CacheEntry()
    entry.metadata["algorithm"] = "pc"
    entry.metadata["alpha"] = 0.05
    entry.metadata["node_count"] = 8

    # Add a graph object
    graphml_content = '''<?xml version="1.0" encoding="UTF-8"?>
    <graphml xmlns="http://graphml.graphdrawing.org/xmlns">
      <graph id="G" edgedefault="directed">
        <node id="A"/><node id="B"/>
        <edge source="A" target="B"/>
      </graph>
    </graphml>'''
    entry.add_object("graph", "graphml", graphml_content)

    # Store with matrix key
    key = {"network": "asia", "algorithm": "pc", "sample_size": "1000"}
    cache.put(key, entry)

    # Check existence
    if cache.exists(key):
        print("Entry found in cache")

    # Retrieve entry
    result = cache.get(key)
    print(f"Algorithm: {result.metadata['algorithm']}")

Matrix Key Hashing

The cache uses SHA-256 hashing of matrix variable values as keys:

from causaliq_workflow.cache import WorkflowCache

with WorkflowCache(":memory:") as cache:
    # These keys produce different hashes
    key1 = {"algorithm": "pc", "network": "asia"}
    key2 = {"algorithm": "ges", "network": "asia"}

    # Key order doesn't matter - sorted before hashing
    key3 = {"network": "asia", "algorithm": "pc"}  # Same hash as key1

Matrix Schema Validation

Once a cache contains entries, all subsequent entries must use the same matrix variable names:

from causaliq_workflow.cache import WorkflowCache, CacheEntry, MatrixSchemaError

with WorkflowCache(":memory:") as cache:
    entry = CacheEntry()

    # First entry establishes schema
    cache.put({"algorithm": "pc", "network": "asia"}, entry)

    # This raises MatrixSchemaError - wrong keys
    try:
        cache.put({"method": "pc", "dataset": "asia"}, entry)
    except MatrixSchemaError as e:
        print(f"Schema error: {e}")

Export and Import

from causaliq_workflow.cache import WorkflowCache
from pathlib import Path

# Export cache to directory
with WorkflowCache("experiment.db") as cache:
    exported = cache.export(Path("./exported"))
    print(f"Exported {exported} entries")

# Export to zip file
with WorkflowCache("experiment.db") as cache:
    exported = cache.export(Path("./results.zip"))

# Import from directory
with WorkflowCache("new_cache.db") as cache:
    imported = cache.import_entries(Path("./exported"))
    print(f"Imported {imported} entries")

In-Memory Cache

Use :memory: for fast, non-persistent caching:

from causaliq_workflow.cache import WorkflowCache, CacheEntry

# In-memory cache for testing
with WorkflowCache(":memory:") as cache:
    entry = CacheEntry()
    entry.metadata["test"] = True

    cache.put({"key": "value"}, entry)
    assert cache.entry_count() == 1
# Cache automatically closed and discarded

Architecture Notes

The WorkflowCache wraps causaliq-core's TokenCache with workflow-specific functionality:

  • Matrix key hashing - SHA-256 hash of sorted matrix values (16 hex chars)
  • Schema validation - Ensures consistent matrix variable names across entries
  • Entry model - CacheEntry with metadata dict and named objects list
  • Export/Import - Convert entries to/from open standard formats (GraphML, JSON)

The cache design focuses on:

  • Speed - Fast existence checks and lookups via hash keys
  • Compactness - JSON tokenisation reduces storage size
  • Reproducibility - Entries can be exported to human-readable formats

← Previous: Workflow Engine | Back to API Overview | Next: Schema Validation →