Skip to content

Cache Module

SQLite-backed caching infrastructure with shared token dictionary for efficient storage.

Overview

The cache module provides:

  • TokenCache - SQLite-backed cache with connection management
  • Compressor - Abstract base class for pluggable type-specific compressors
  • JsonCompressor - Tokenised compressor for JSON-serialisable data

Design Philosophy

The cache uses SQLite for storage, providing:

  • Fast indexed key lookup
  • Built-in concurrency via SQLite locking
  • In-memory mode via :memory: for testing
  • Incremental updates without rewriting

See Caching Architecture for full design details.

Usage

Basic In-Memory Cache

from causaliq_core.cache import TokenCache

# In-memory cache (fast, non-persistent)
with TokenCache(":memory:") as cache:
    assert cache.table_exists("tokens")
    assert cache.table_exists("cache_entries")

File-Based Persistent Cache

from causaliq_core.cache import TokenCache

# File-based cache (persistent)
with TokenCache("my_cache.db") as cache:
    # Data persists across sessions
    print(f"Entries: {cache.entry_count()}")
    print(f"Tokens: {cache.token_count()}")

Transaction Support

from causaliq_core.cache import TokenCache

with TokenCache(":memory:") as cache:
    # Transactions auto-commit on success, rollback on exception
    with cache.transaction() as cursor:
        cursor.execute("INSERT INTO tokens (token) VALUES (?)", ("example",))

Token Dictionary

The cache maintains a shared token dictionary for cross-entry compression. Compressors use this to convert strings to compact integer IDs:

from causaliq_core.cache import TokenCache

with TokenCache(":memory:") as cache:
    # Get or create token IDs (used by compressors)
    id1 = cache.get_or_create_token("hello")  # Returns 1
    id2 = cache.get_or_create_token("world")  # Returns 2
    id1_again = cache.get_or_create_token("hello")  # Returns 1 (cached)

    # Look up token by ID (used for decompression)
    token = cache.get_token(1)  # Returns "hello"

Storing and Retrieving Entries

Cache entries are stored as binary blobs with a hash key:

from causaliq_core.cache import TokenCache

with TokenCache(":memory:") as cache:
    # Store an entry
    cache.put("abc123", b"response data")

    # Check if entry exists
    if cache.exists("abc123"):
        # Retrieve entry
        data = cache.get("abc123")  # Returns b"response data"

    # Store with metadata
    cache.put("def456", b"data", metadata=b"extra info")
    result = cache.get_with_metadata("def456")
    # result = (b"data", b"extra info")

    # Delete entry
    cache.delete("abc123")

Auto-Compression with Registered Compressor

Set a compressor to automatically compress/decompress entries:

from causaliq_core.cache import TokenCache
from causaliq_core.cache.compressors import JsonCompressor

with TokenCache(":memory:") as cache:
    # Set compressor for automatic compression
    cache.set_compressor(JsonCompressor())

    # Store data (auto-compressed)
    cache.put_data("hash1", {"role": "user", "content": "Hello"})

    # Retrieve data (auto-decompressed)
    data = cache.get_data("hash1")
    # data = {"role": "user", "content": "Hello"}

    # Store with metadata
    cache.put_data("hash2",
                   {"response": "Hi!"},
                   metadata={"latency_ms": 150})
    result = cache.get_data_with_metadata("hash2")
    # result = ({"response": "Hi!"}, {"latency_ms": 150})

Hash Collision Handling

Use key_json parameter to prevent returning incorrect data when different keys produce the same truncated hash:

from causaliq_core.cache import TokenCache
from causaliq_core.cache.compressors import JsonCompressor

with TokenCache(":memory:") as cache:
    cache.set_compressor(JsonCompressor())

    # Store with original key for collision safety
    key_json = '{"model": "gpt-4", "prompt": "Hello"}'
    cache.put_data("abc123", {"response": "Hi!"}, key_json=key_json)

    # Retrieve with key_json verification
    data = cache.get_data("abc123", key_json=key_json)

Exporting and Importing Entries

Export cache entries to files for backup, migration, or sharing. Import entries from files into a cache:

from pathlib import Path
from causaliq_core.cache import TokenCache
from causaliq_core.cache.compressors import JsonCompressor

# Export entries to directory
with TokenCache("my_cache.db") as cache:
    cache.set_compressor(JsonCompressor())

    # Export all entries to directory
    # Creates one file per entry: {hash}.json
    count = cache.export_entries(Path("./export"))
    print(f"Exported {count} entries")

# Import entries from directory
with TokenCache("new_cache.db") as cache:
    cache.set_compressor(JsonCompressor())

    # Import all .json files from directory
    # Uses filename (without extension) as hash key
    count = cache.import_entries(Path("./export"))
    print(f"Imported {count} entries")

Export behaviour:

  • Creates output directory if it doesn't exist
  • Writes each entry to {hash}.{ext} (e.g., abc123.json)
  • Uses compressor's export() method for human-readable format
  • Returns count of exported entries

Import behaviour:

  • Reads all files in directory (skips subdirectories)
  • Uses filename stem as hash key (e.g., abc123.json → key abc123)
  • Uses compressor's import_() method to parse content
  • Returns count of imported entries

API Reference

TokenCache

TokenCache(db_path: str | Path)

SQLite-backed cache with shared token dictionary.

Attributes:

  • db_path

    Path to SQLite database file, or ":memory:" for in-memory.

  • conn (Connection) –

    SQLite connection (None until open() called or context entered).

Example

with TokenCache(":memory:") as cache: ... cache.put("abc123", b"hello") ... data = cache.get("abc123")

Parameters:

  • db_path

    (str | Path) –

    Path to SQLite database file. Use ":memory:" for in-memory database (fast, non-persistent).

Methods:

  • open

    Open the database connection and initialise schema.

  • close

    Close the database connection.

  • transaction

    Context manager for a database transaction.

  • table_exists

    Check if a table exists in the database.

  • entry_count

    Count cache entries.

  • token_count

    Count tokens in the dictionary.

  • list_entries

    List all cache entries with metadata.

  • total_hits

    Get total cache hits across all entries.

  • get_or_create_token

    Get token ID, creating a new entry if needed.

  • get_token

    Get token string by ID.

  • set_compressor

    Set the compressor for automatic data compression.

  • get_compressor

    Get the current compressor.

  • has_compressor

    Check if a compressor is set.

  • put

    Store a cache entry with collision handling.

  • get

    Retrieve a cache entry and increment hit count.

  • get_with_metadata

    Retrieve a cache entry with its metadata.

  • exists

    Check if a cache entry exists.

  • delete

    Delete a cache entry.

  • put_data

    Store data using the registered compressor.

  • get_data

    Retrieve and decompress data using the registered compressor.

  • get_data_with_metadata

    Retrieve and decompress data with metadata.

  • export_entries

    Export cache entries to human-readable files.

  • import_entries

    Import human-readable files into the cache.

is_open property

is_open: bool

Check if the cache connection is open.

is_memory property

is_memory: bool

Check if this is an in-memory database.

conn property

conn: Connection

Get the database connection, raising if not connected.

open

open() -> TokenCache

Open the database connection and initialise schema.

Returns:

Raises:

  • RuntimeError

    If already connected.

close

close() -> None

Close the database connection.

transaction

transaction() -> Iterator[Cursor]

Context manager for a database transaction.

Commits on success, rolls back on exception.

Yields:

  • Cursor

    SQLite cursor for executing statements.

table_exists

table_exists(table_name: str) -> bool

Check if a table exists in the database.

Parameters:

  • table_name

    (str) –

    Name of the table to check.

Returns:

  • bool

    True if table exists, False otherwise.

entry_count

entry_count() -> int

Count cache entries.

Returns:

  • int

    Number of entries in the cache.

token_count

token_count() -> int

Count tokens in the dictionary.

Returns:

  • int

    Number of tokens.

list_entries

list_entries() -> list[dict[str, Any]]

List all cache entries with metadata.

Returns a list of dictionaries containing entry details including hash, key_json, created_at, and metadata blob.

Returns:

  • list[dict[str, Any]]

    List of entry dictionaries with keys: hash, key_json,

  • list[dict[str, Any]]

    created_at, metadata (raw bytes or None).

Example

with TokenCache(":memory:") as cache: ... cache.set_compressor(JsonCompressor()) ... cache.put_data("h1", {"test": 1}) ... entries = cache.list_entries() ... len(entries) 1

total_hits

total_hits() -> int

Get total cache hits across all entries.

Returns:

  • int

    Total hit count.

get_or_create_token

get_or_create_token(token: str) -> int

Get token ID, creating a new entry if needed.

This method is used by compressors to compress strings to integer IDs. The token dictionary grows dynamically as new tokens are encountered.

Parameters:

  • token

    (str) –

    The string token to look up or create.

Returns:

  • int

    Integer ID for the token (1-65535 range).

Raises:

  • ValueError

    If token dictionary exceeds uint16 capacity.

get_token

get_token(token_id: int) -> str | None

Get token string by ID.

This method is used by decompressors to expand integer IDs back to strings.

Parameters:

  • token_id

    (int) –

    The integer ID to look up.

Returns:

  • str | None

    The token string, or None if not found.

set_compressor

set_compressor(compressor: Compressor) -> None

Set the compressor for automatic data compression.

Once set, put_data() and get_data() will automatically compress/decompress entries using this compressor.

Parameters:

  • compressor

    (Compressor) –

    Compressor instance for data compression.

Example

from causaliq_core.cache.compressors import JsonCompressor with TokenCache(":memory:") as cache: ... cache.set_compressor(JsonCompressor()) ... cache.put_data("key1", {"msg": "hello"})

get_compressor

get_compressor() -> Compressor | None

Get the current compressor.

Returns:

  • Compressor | None

    The registered compressor, or None if not set.

has_compressor

has_compressor() -> bool

Check if a compressor is set.

Returns:

  • bool

    True if compressor is set, False otherwise.

put

put(hash: str, data: bytes, metadata: bytes | None = None, key_json: str = '') -> None

Store a cache entry with collision handling.

If an entry with the same hash exists but different key_json, a new entry is created with incremented seq (collision). If key_json matches, the existing entry is updated.

Parameters:

  • hash

    (str) –

    Unique identifier for the entry (e.g. SHA-256 truncated).

  • data

    (bytes) –

    Binary data to store.

  • metadata

    (bytes | None, default: None ) –

    Optional binary metadata.

  • key_json

    (str, default: '' ) –

    Original unhashed key as JSON string for collision detection. Empty string if not provided.

get

get(hash: str, key_json: str = '') -> bytes | None

Retrieve a cache entry and increment hit count.

If key_json is provided, only returns data if key_json matches. This prevents returning wrong data in case of hash collisions.

Parameters:

  • hash

    (str) –

    Unique identifier for the entry.

  • key_json

    (str, default: '' ) –

    Original unhashed key for collision verification. If empty string, returns first matching entry (legacy mode).

Returns:

  • bytes | None

    Binary data if found, None otherwise.

get_with_metadata

get_with_metadata(hash: str, key_json: str = '') -> tuple[bytes, bytes | None] | None

Retrieve a cache entry with its metadata.

Parameters:

  • hash

    (str) –

    Unique identifier for the entry.

  • key_json

    (str, default: '' ) –

    Original unhashed key for collision verification. If empty string, returns first matching entry (legacy mode).

Returns:

  • tuple[bytes, bytes | None] | None

    Tuple of (data, metadata) if found, None otherwise.

exists

exists(hash: str, key_json: str = '') -> bool

Check if a cache entry exists.

Parameters:

  • hash

    (str) –

    Unique identifier for the entry.

  • key_json

    (str, default: '' ) –

    Original unhashed key for collision verification. If empty string, checks for any entry with this hash.

Returns:

  • bool

    True if entry exists, False otherwise.

delete

delete(hash: str, key_json: str = '') -> bool

Delete a cache entry.

Parameters:

  • hash

    (str) –

    Unique identifier for the entry.

  • key_json

    (str, default: '' ) –

    Original unhashed key for collision verification. If empty string, deletes all entries with this hash.

Returns:

  • bool

    True if entry was deleted, False if it didn't exist.

put_data

put_data(hash: str, data: Any, metadata: Any | None = None, key_json: str = '') -> None

Store data using the registered compressor.

This method automatically compresses the data using the compressor set via set_compressor(). Use put() for raw bytes.

Parameters:

  • hash

    (str) –

    Unique identifier for the entry.

  • data

    (Any) –

    Data to compress and store.

  • metadata

    (Any | None, default: None ) –

    Optional metadata to compress and store.

  • key_json

    (str, default: '' ) –

    Original unhashed key as JSON string for collision detection. Empty string if not provided.

Raises:

  • RuntimeError

    If no compressor is set.

Example

with TokenCache(":memory:") as cache: ... cache.set_compressor(JsonCompressor()) ... cache.put_data("abc", {"key": "value"})

get_data

get_data(hash: str, key_json: str = '') -> Any | None

Retrieve and decompress data using the registered compressor.

This method automatically decompresses the data using the compressor set via set_compressor(). Use get() for raw bytes.

Parameters:

  • hash

    (str) –

    Unique identifier for the entry.

  • key_json

    (str, default: '' ) –

    Original unhashed key for collision verification. If empty string, returns first matching entry (legacy mode).

Returns:

  • Any | None

    Decompressed data if found, None otherwise.

Raises:

  • RuntimeError

    If no compressor is set.

Example

with TokenCache(":memory:") as cache: ... cache.set_compressor(JsonCompressor()) ... cache.put_data("abc", {"key": "value"}) ... data = cache.get_data("abc")

get_data_with_metadata

get_data_with_metadata(hash: str, key_json: str = '') -> tuple[Any, Any | None] | None

Retrieve and decompress data with metadata.

Parameters:

  • hash

    (str) –

    Unique identifier for the entry.

  • key_json

    (str, default: '' ) –

    Original unhashed key for collision verification. If empty string, returns first matching entry (legacy mode).

Returns:

  • tuple[Any, Any | None] | None

    Tuple of (decompressed_data, decompressed_metadata) if found,

  • tuple[Any, Any | None] | None

    None otherwise. metadata may be None if not stored.

Raises:

  • RuntimeError

    If no compressor is set.

export_entries

export_entries(output_dir: Path, fmt: str | None = None) -> int

Export cache entries to human-readable files.

Each entry is exported to a separate file named {hash}.{ext} where ext is determined by the format or compressor's default_export_format.

Parameters:

  • output_dir

    (Path) –

    Directory to write exported files to. Created if it doesn't exist.

  • fmt

    (str | None, default: None ) –

    Export format (e.g. 'json', 'yaml'). If None, uses the compressor's default_export_format.

Returns:

  • int

    Number of entries exported.

Raises:

  • RuntimeError

    If no compressor is set.

Example

from pathlib import Path from causaliq_core.cache import TokenCache from causaliq_core.cache.compressors import JsonCompressor with TokenCache(":memory:") as cache: ... cache.set_compressor(JsonCompressor()) ... cache.put_data("abc123", {"key": "value"}) ... count = cache.export_entries(Path("./export")) ... # Creates ./export/abc123.json

import_entries

import_entries(input_dir: Path) -> int

Import human-readable files into the cache.

Each file is imported with its stem (filename without extension) used as the cache hash. The compressor's import_() method reads the file and the data is compressed before storage.

Parameters:

  • input_dir

    (Path) –

    Directory containing files to import.

Returns:

  • int

    Number of entries imported.

Raises:

  • RuntimeError

    If no compressor is set.

  • FileNotFoundError

    If input_dir doesn't exist.

Example

from pathlib import Path from causaliq_core.cache import TokenCache from causaliq_core.cache.compressors import JsonCompressor with TokenCache(":memory:") as cache: ... cache.set_compressor(JsonCompressor()) ... count = cache.import_entries(Path("./import")) ... # Imports all files from ./import