Workflow Cache API
The workflow cache provides SQLite-based storage for workflow step results,
enabling conservative execution and reproducibility. It is built on
causaliq-core's TokenCache infrastructure.
causaliq-core Foundation
The workflow cache imports the following from causaliq-core:
- TokenCache - SQLite-based caching with tokenised JSON storage
- JsonCompressor - Compressor for JSON tokenisation
- Compressor - Abstract compressor interface
Core Classes
causaliq_workflow.cache.WorkflowCache
WorkflowCache
WorkflowCache(db_path: str | Path)
High-level cache for workflow step results.
Provides a simplified interface for storing and retrieving workflow results as CacheEntry objects. Uses matrix variable values as cache keys, with SHA-256 hashing for compact storage.
Each entry contains metadata and named objects (e.g., 'graph', 'confidences'), allowing a single workflow step to produce multiple outputs that are stored together.
Attributes:
-
db_path–Path to SQLite database file, or ":memory:" for in-memory.
Example
from causaliq_workflow.cache import WorkflowCache, CacheEntry with WorkflowCache(":memory:") as cache: ... entry = CacheEntry() ... entry.metadata["node_count"] = 5 ... entry.add_object("dag", "graphml", "
...", "learn") ... key = {"algorithm": "pc", "network": "asia"} ... cache.put(key, entry) ... result = cache.get(key) ... print(result.metadata) {'node_count': 5}
Parameters:
-
(db_pathstr | Path) –Path to SQLite database file. Use ":memory:" for in-memory database (fast, non-persistent).
Methods:
-
open–Open the database connection and initialise schema.
-
close–Close the database connection.
-
put–Store a workflow entry in the cache.
-
get–Retrieve a workflow entry from the cache.
-
exists–Check if a cache entry exists.
-
entry_count–Count cache entries (excluding internal config).
-
export–Export cache entries to directory or zip file.
-
import_entries–Import cache entries from directory or zip file.
open
open() -> WorkflowCache
Open the database connection and initialise schema.
Returns:
-
WorkflowCache–self for method chaining.
Raises:
-
RuntimeError–If already connected.
close
close() -> None
Close the database connection.
put
put(key_data: dict[str, Any], entry: CacheEntry) -> str
Store a workflow entry in the cache.
If an entry with the same key already exists, it is replaced.
Validates that key_data uses the same matrix variable names as existing entries to ensure cache consistency.
Parameters:
-
(key_datadict[str, Any]) –Dictionary of matrix variable values (cache key).
-
(entryCacheEntry) –CacheEntry containing metadata and objects.
Returns:
-
str–The hash key used for storage.
Raises:
-
MatrixSchemaError–If key_data uses different variable names than existing entries.
Example
with WorkflowCache(":memory:") as cache: ... entry = CacheEntry() ... entry.metadata["result"] = "ok" ... key = {"algorithm": "pc"} ... hash_key = cache.put(key, entry)
get
get(key_data: dict[str, Any]) -> CacheEntry | None
Retrieve a workflow entry from the cache.
Parameters:
-
(key_datadict[str, Any]) –Dictionary of matrix variable values (cache key).
Returns:
-
CacheEntry | None–CacheEntry if found, None otherwise.
Example
with WorkflowCache(":memory:") as cache: ... entry = CacheEntry() ... entry.metadata["result"] = "ok" ... cache.put({"algo": "pc"}, entry) ... result = cache.get({"algo": "pc"}) ... print(result.metadata) {'result': 'ok'}
exists
exists(key_data: dict[str, Any]) -> bool
Check if a cache entry exists.
Parameters:
-
(key_datadict[str, Any]) –Dictionary of matrix variable values (cache key).
Returns:
-
bool–True if entry exists, False otherwise.
Example
with WorkflowCache(":memory:") as cache: ... cache.exists({"algo": "pc"}) # False ... cache.put({"algo": "pc"}, CacheEntry()) ... cache.exists({"algo": "pc"}) # True
entry_count
entry_count() -> int
Count cache entries (excluding internal config).
Returns:
-
int–Number of entries in the cache.
export
export(output_path: str | Path, matrix_keys: list[str] | None = None) -> int
Export cache entries to directory or zip file.
Creates a hierarchical directory structure based on matrix variable values. Each entry's objects are written as individual files.
The output format is determined by the path extension: - Path ending in .zip: creates a zip archive - Otherwise: creates a directory structure
Parameters:
-
(output_pathstr | Path) –Path to output directory or .zip file.
-
(matrix_keyslist[str] | None, default:None) –Ordered list of matrix variable names for directory hierarchy. If None, uses stored matrix key order from workflow, or falls back to alphabetical order.
Returns:
-
int–Number of entries exported.
Example
with WorkflowCache("cache.db") as cache: ... # Export to dir: asia/pc/graph.graphml ... cache.export("./out", ["dataset", "algorithm"]) ... # Export to zip file ... cache.export("./out.zip", ["dataset"])
import_entries
import_entries(input_path: str | Path) -> int
Import cache entries from directory or zip file.
Reads entries exported by export() and stores them back into
the cache. The input format is determined by the path:
- Path ending in .zip: reads from a zip archive
- Otherwise: reads from a directory structure
Parameters:
-
(input_pathstr | Path) –Path to input directory or .zip file.
Returns:
-
int–Number of entries imported.
Raises:
-
FileNotFoundError–If input_path does not exist.
Example
with WorkflowCache("cache.db") as cache: ... # Import from directory ... cache.import_entries("./exported") ... # Import from zip file ... cache.import_entries("./exported.zip")
causaliq_workflow.cache.CacheEntry
CacheEntry
dataclass
CacheEntry(metadata: Dict[str, Any] = dict(), objects: Dict[str, CacheObject] = dict())
A cached workflow result containing metadata and typed objects.
Represents a single cache entry identified by matrix variable values. Contains workflow metadata and zero or more typed objects. Each object has a semantic type (e.g., 'dag', 'pdg') and serialisation format.
Objects are keyed by their semantic type. Each entry may contain at most one object of each type.
The entry structure maps directly to TokenCache storage: - metadata → TokenCache metadata field - objects → TokenCache data field
Attributes:
-
metadata(Dict[str, Any]) –Workflow metadata dictionary.
-
objects(Dict[str, CacheObject]) –Typed objects dictionary (type → CacheObject).
Example
entry = CacheEntry() entry.metadata["node_count"] = 5 entry.add_object("pdg", "graphml", "
... ") entry.add_object("trace", "json", '{"iterations": [...]}')
Methods:
-
add_object–Add or replace a typed object.
-
from_action_result–Create from action result format.
-
from_storage–Create from TokenCache storage format.
-
get_object–Get an object by type.
-
has_object–Check if an object type exists.
-
object_types–Get list of object types in this entry.
-
remove_object–Remove an object by type.
-
to_action_result–Convert to action result format.
-
to_storage–Convert to storage format for TokenCache.
add_object
add_object(
obj_type: str, obj_format: str, content: Any, action: str = "unknown"
) -> None
Add or replace a typed object.
Parameters:
-
(obj_typestr) –Semantic object type (e.g., 'dag', 'pdg', 'trace').
-
(obj_formatstr) –Serialisation format (e.g., 'graphml', 'json').
-
(contentAny) –Object content.
-
(actionstr, default:'unknown') –Name of the action creating this object.
Example
entry = CacheEntry() entry.add_object("pdg", "graphml", "
", "merge")
from_action_result
classmethod
from_action_result(
metadata: Dict[str, Any], objects: list[Dict[str, Any]]
) -> CacheEntry
Create from action result format.
Converts the action return format (metadata dict and objects list) to a CacheEntry. Objects are keyed by their 'type' field.
Parameters:
-
(metadataDict[str, Any]) –Action metadata dictionary.
-
(objectslist[Dict[str, Any]]) –List of object dicts with 'type', 'format', 'action', 'content'.
Returns:
-
CacheEntry–CacheEntry instance.
Example
entry = CacheEntry.from_action_result( ... {"node_count": 5}, ... [{"type": "pdg", "format": "graphml", ... "action": "merge_graphs", "content": "..."}] ... )
from_storage
classmethod
from_storage(
data: Dict[str, Any] | None, metadata: Dict[str, Any] | None
) -> CacheEntry
Create from TokenCache storage format.
Parameters:
-
(dataDict[str, Any] | None) –Objects dict from TokenCache.get_data().
-
(metadataDict[str, Any] | None) –Metadata dict from TokenCache.
Returns:
-
CacheEntry–CacheEntry instance.
get_object
get_object(obj_type: str) -> CacheObject | None
Get an object by type.
Parameters:
-
(obj_typestr) –Object type to retrieve (e.g., 'dag', 'pdg').
Returns:
-
CacheObject | None–CacheObject if found, None otherwise.
has_object
has_object(obj_type: str) -> bool
Check if an object type exists.
Parameters:
-
(obj_typestr) –Object type to check.
Returns:
-
bool–True if object exists.
object_types
object_types() -> list[str]
Get list of object types in this entry.
Returns:
-
list[str]–List of object types.
remove_object
remove_object(obj_type: str) -> bool
Remove an object by type.
Parameters:
-
(obj_typestr) –Object type to remove.
Returns:
-
bool–True if object was removed, False if not found.
to_action_result
to_action_result() -> tuple[Dict[str, Any], list[Dict[str, Any]]]
Convert to action result format.
Returns:
-
tuple[Dict[str, Any], list[Dict[str, Any]]]–Tuple of (metadata, objects_list) matching action return format.
to_storage
to_storage() -> tuple[Dict[str, Any], Dict[str, Any]]
Convert to storage format for TokenCache.
Returns:
-
Dict[str, Any]–Tuple of (data, metadata) for TokenCache.put_data().
-
Dict[str, Any]–- data: Objects dict serialised to dicts
-
tuple[Dict[str, Any], Dict[str, Any]]–- metadata: Entry metadata dict
causaliq_workflow.cache.CacheObject
CacheObject
dataclass
CacheObject(format: str, action: str, content: Any)
A typed object within a cache entry.
Represents a single piece of data with a serialisation format and the action that created it. Objects are keyed by semantic type (e.g., 'dag', 'pdg') within a CacheEntry.
Attributes:
-
format(str) –Serialisation format (e.g., 'graphml', 'json', 'csv').
-
action(str) –Name of the action that created this object.
-
content(Any) –The object content (string for serialised formats).
Example
obj = CacheObject( ... format="graphml", ... action="migrate_trace", ... content="
... " ... ) obj.format 'graphml' obj.action 'migrate_trace'
Methods:
from_dict
classmethod
from_dict(data: Dict[str, Any]) -> CacheObject
Create from dictionary.
Parameters:
-
(dataDict[str, Any]) –Dictionary with 'format', 'action', and 'content' keys.
Returns:
-
CacheObject–CacheObject instance.
to_dict
to_dict() -> Dict[str, Any]
Convert to dictionary for serialisation.
Returns:
-
Dict[str, Any]–Dictionary with 'format', 'action', and 'content' keys.
Exception Handling
causaliq_workflow.cache.MatrixSchemaError
MatrixSchemaError
Raised when matrix variable keys don't match existing cache entries.
Once a cache contains entries, all subsequent entries must use the same matrix variable names. This ensures cache consistency and prevents accidental data corruption from mismatched workflows.
Example
Cache has entries with keys {"algorithm", "dataset"}
Trying to add entry with {"algorithm", "network"} raises
raise MatrixSchemaError( ... "Matrix keys mismatch: got {'algorithm', 'network'}, " ... "expected {'algorithm', 'dataset'}" ... )
Usage Examples
Basic Cache Operations
from causaliq_workflow.cache import WorkflowCache, CacheEntry
# Create and use cache with context manager
with WorkflowCache("results/experiment.db") as cache:
# Create an entry with metadata
entry = CacheEntry()
entry.metadata["algorithm"] = "pc"
entry.metadata["alpha"] = 0.05
entry.metadata["node_count"] = 8
# Add a graph object
graphml_content = '''<?xml version="1.0" encoding="UTF-8"?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns">
<graph id="G" edgedefault="directed">
<node id="A"/><node id="B"/>
<edge source="A" target="B"/>
</graph>
</graphml>'''
entry.add_object("graph", "graphml", graphml_content)
# Store with matrix key
key = {"network": "asia", "algorithm": "pc", "sample_size": "1000"}
cache.put(key, entry)
# Check existence
if cache.exists(key):
print("Entry found in cache")
# Retrieve entry
result = cache.get(key)
print(f"Algorithm: {result.metadata['algorithm']}")
Matrix Key Hashing
The cache uses SHA-256 hashing of matrix variable values as keys:
from causaliq_workflow.cache import WorkflowCache
with WorkflowCache(":memory:") as cache:
# These keys produce different hashes
key1 = {"algorithm": "pc", "network": "asia"}
key2 = {"algorithm": "ges", "network": "asia"}
# Key order doesn't matter - sorted before hashing
key3 = {"network": "asia", "algorithm": "pc"} # Same hash as key1
Matrix Schema Validation
Once a cache contains entries, all subsequent entries must use the same matrix variable names:
from causaliq_workflow.cache import WorkflowCache, CacheEntry, MatrixSchemaError
with WorkflowCache(":memory:") as cache:
entry = CacheEntry()
# First entry establishes schema
cache.put({"algorithm": "pc", "network": "asia"}, entry)
# This raises MatrixSchemaError - wrong keys
try:
cache.put({"method": "pc", "dataset": "asia"}, entry)
except MatrixSchemaError as e:
print(f"Schema error: {e}")
Export and Import
from causaliq_workflow.cache import WorkflowCache
from pathlib import Path
# Export cache to directory
with WorkflowCache("experiment.db") as cache:
exported = cache.export(Path("./exported"))
print(f"Exported {exported} entries")
# Export to zip file
with WorkflowCache("experiment.db") as cache:
exported = cache.export(Path("./results.zip"))
# Import from directory
with WorkflowCache("new_cache.db") as cache:
imported = cache.import_entries(Path("./exported"))
print(f"Imported {imported} entries")
In-Memory Cache
Use :memory: for fast, non-persistent caching:
from causaliq_workflow.cache import WorkflowCache, CacheEntry
# In-memory cache for testing
with WorkflowCache(":memory:") as cache:
entry = CacheEntry()
entry.metadata["test"] = True
cache.put({"key": "value"}, entry)
assert cache.entry_count() == 1
# Cache automatically closed and discarded
Architecture Notes
The WorkflowCache wraps causaliq-core's TokenCache with workflow-specific functionality:
- Matrix key hashing - SHA-256 hash of sorted matrix values (16 hex chars)
- Schema validation - Ensures consistent matrix variable names across entries
- Entry model - CacheEntry with metadata dict and named objects list
- Export/Import - Convert entries to/from open standard formats (GraphML, JSON)
The cache design focuses on:
- Speed - Fast existence checks and lookups via hash keys
- Compactness - JSON tokenisation reduces storage size
- Reproducibility - Entries can be exported to human-readable formats
← Previous: Workflow Engine | Back to API Overview | Next: Schema Validation →