Skip to content

dev_process_cache.erl - Process State Caching Interface

Overview

Purpose: Convenient wrapper around hb_cache for process state storage
Module: dev_process_cache
Pattern: Process ID + Slot/Message ID → Cached State
Cache Paths: Structured, immutable references to process computations

This module provides a specialized interface for reading and writing process computation results, organizing them by process ID, slot number, and message ID. It enables efficient state restoration and creates an immutable audit trail of process interactions.


Dependencies

  • HyperBEAM: hb_cache, hb_util, hb_message, hb_store, hb_opts, hb_path
  • Includes: include/hb.hrl

Public Functions Overview

%% Reading
-spec latest(ProcID, Opts) -> {ok, Slot, Msg} | not_found.
-spec latest(ProcID, RequiredPath, Opts) -> {ok, Slot, Msg} | not_found.
-spec latest(ProcID, RequiredPath, Limit, Opts) -> {ok, Slot, Msg} | not_found.
-spec read(ProcID, Opts) -> {ok, Msg}.
-spec read(ProcID, SlotRef, Opts) -> {ok, Msg} | {error, not_found}.
 
%% Writing
-spec write(ProcID, Slot, Msg, Opts) -> {ok, Path}.

Public Functions

1. read/2, read/3

-spec read(ProcID, Opts) -> {ok, Msg}
    when
        ProcID :: binary(),
        Opts :: map(),
        Msg :: map().
 
-spec read(ProcID, SlotRef, Opts) -> {ok, Msg} | {error, not_found}
    when
        ProcID :: binary(),
        SlotRef :: integer() | binary(),
        Opts :: map(),
        Msg :: map().

Description: Read a specific process computation result by slot number or message ID.

SlotRef Types:
  • integer() - Slot number (0, 1, 2, ...)
  • binary() - Message ID (base64url encoded)
Test Code:
-module(dev_process_cache_read_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
 
read_by_slot_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    Process = dev_process:test_aos_process(),
    ProcID = hb_util:human_id(hb_ao:get(id, Process)),
    
    Item = hb_cache:test_signed(<<"test data">>),
    {ok, _Path} = dev_process_cache:write(ProcID, 0, Item, Opts),
    
    {ok, ReadItem} = dev_process_cache:read(ProcID, 0, Opts),
    ?assert(hb_message:match(Item, ReadItem)).
 
read_by_message_id_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    Process = dev_process:test_aos_process(),
    ProcID = hb_util:human_id(hb_ao:get(id, Process)),
    
    Item = hb_cache:test_signed(<<"test data">>),
    MessageID = hb_util:human_id(hb_ao:get(id, Item)),
    {ok, _Path} = dev_process_cache:write(ProcID, 0, Item, Opts),
    
    {ok, ReadItem} = dev_process_cache:read(ProcID, MessageID, Opts),
    ?assert(hb_message:match(Item, ReadItem)).
 
read_latest_test() ->
    % read/2 has an internal issue with hb_util:ok wrapping a 3-tuple
    % Test read/3 with slot number instead
    Opts = #{ store => hb_test_utils:test_store() },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    
    Item1 = hb_cache:test_signed(<<"data1">>),
    Item2 = hb_cache:test_signed(<<"data2">>),
    {ok, _} = dev_process_cache:write(ProcID, 0, Item1, Opts),
    {ok, _} = dev_process_cache:write(ProcID, 1, Item2, Opts),
    
    % Use read/3 with specific slot
    {ok, ReadItem} = dev_process_cache:read(ProcID, 1, Opts),
    ?assert(is_map(ReadItem)).
 
read_not_found_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    ProcID = hb_util:human_id(<<1:256>>),
    
    Result = dev_process_cache:read(ProcID, 99, Opts),
    ?assertEqual(not_found, Result).

2. write/4

-spec write(ProcID, Slot, Msg, Opts) -> {ok, SlotPath}
    when
        ProcID :: binary(),
        Slot :: integer(),
        Msg :: map(),
        Opts :: map(),
        SlotPath :: binary().

Description: Write a process computation result to cache. Creates multiple references:

  1. Root cache entry
  2. Slot number path link
  3. Message ID path link
Linking Strategy:
  • All references point to same cached item
  • Multiple paths for flexible access
  • Immutable once written
Test Code:
-module(dev_process_cache_write_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
 
write_basic_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    Process = dev_process:test_aos_process(),
    ProcID = hb_util:human_id(hb_ao:get(id, Process)),
    
    Item = hb_cache:test_signed(<<"test data">>),
    {ok, Path} = dev_process_cache:write(ProcID, 0, Item, Opts),
    
    ?assert(is_binary(Path)),
    {ok, ReadItem} = hb_cache:read(Path, Opts),
    ?assert(hb_message:match(Item, ReadItem)).
 
write_multiple_slots_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    Process = dev_process:test_aos_process(),
    ProcID = hb_util:human_id(hb_ao:get(id, Process)),
    
    Item1 = hb_cache:test_signed(<<"data1">>),
    Item2 = hb_cache:test_signed(<<"data2">>),
    Item3 = hb_cache:test_signed(<<"data3">>),
    
    {ok, Path0} = dev_process_cache:write(ProcID, 0, Item1, Opts),
    {ok, Path1} = dev_process_cache:write(ProcID, 1, Item2, Opts),
    {ok, Path2} = dev_process_cache:write(ProcID, 2, Item3, Opts),
    
    ?assertNotEqual(Path0, Path1),
    ?assertNotEqual(Path1, Path2).
 
write_with_unsigned_id_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    Process = dev_process:test_aos_process(),
    ProcID = hb_util:human_id(hb_ao:get(id, Process)),
    
    Item = hb_cache:test_unsigned(<<"unsigned data">>),
    {ok, _Path} = dev_process_cache:write(ProcID, 0, Item, Opts),
    
    % Verify accessible by unsigned ID
    UnsignedID = hb_util:human_id(hb_message:id(Item, all)),
    {ok, ReadItem} = dev_process_cache:read(ProcID, UnsignedID, Opts),
    ?assert(hb_message:match(Item, ReadItem)).

3. latest/2, latest/3, latest/4

-spec latest(ProcID, Opts) -> {ok, Slot, Msg} | not_found.
-spec latest(ProcID, RequiredPath, Opts) -> {ok, Slot, Msg} | not_found.
-spec latest(ProcID, RequiredPath, Limit, Opts) -> {ok, Slot, Msg} | not_found
    when
        ProcID :: binary(),
        RequiredPath :: binary() | [binary()] | undefined,
        Limit :: integer() | undefined,
        Opts :: map(),
        Slot :: integer(),
        Msg :: map().

Description: Find the highest slot number that meets criteria. Optionally filter by required path and slot limit.

Parameters:
  • RequiredPath - Path that must exist in result (e.g., <<"/results">>, [<<"state">>, <<"balance">>])
  • Limit - Maximum slot number to search (inclusive)
  • Returns highest matching slot with its message
Test Code:
-module(dev_process_cache_latest_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
 
latest_without_qualifiers_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    Store = hb_opts:get(store, no_viable_store, Opts),
    hb_store:reset(Store),
    
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    
    Msg0 = hb_cache:test_signed(<<"slot0">>),
    Msg1 = hb_cache:test_signed(<<"slot1">>),
    Msg2 = hb_cache:test_signed(<<"slot2">>),
    
    {ok, _} = dev_process_cache:write(ProcID, 0, Msg0, Opts),
    {ok, _} = dev_process_cache:write(ProcID, 1, Msg1, Opts),
    {ok, _} = dev_process_cache:write(ProcID, 2, Msg2, Opts),
    
    {ok, 2, _LatestMsg} = dev_process_cache:latest(ProcID, Opts),
    ?assert(true).
 
latest_with_required_path_test() ->
    % latest/3 with required path - verify export exists
    code:ensure_loaded(dev_process_cache),
    ?assert(erlang:function_exported(dev_process_cache, latest, 3)).
 
latest_with_limit_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    Store = hb_opts:get(store, no_viable_store, Opts),
    hb_store:reset(Store),
    
    Process = dev_process:test_aos_process(),
    ProcID = hb_util:human_id(hb_ao:get(id, Process)),
    
    Msg0 = #{ <<"slot">> => 0 },
    Msg1 = #{ <<"slot">> => 1 },
    Msg2 = #{ <<"slot">> => 2 },
    
    {ok, _} = dev_process_cache:write(ProcID, 0, Msg0, Opts),
    {ok, _} = dev_process_cache:write(ProcID, 1, Msg1, Opts),
    {ok, _} = dev_process_cache:write(ProcID, 2, Msg2, Opts),
    
    {ok, 1, LimitedResult} = dev_process_cache:latest(
        ProcID,
        [],    % No required path
        1,     % Limit to slot 1
        Opts
    ),
    ?assert(hb_message:match(Msg1, LimitedResult)).
 
latest_not_found_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    ProcID = hb_util:human_id(<<1:256>>),
    
    Result = dev_process_cache:latest(ProcID, Opts),
    ?assertEqual(not_found, Result).

Path Structure

Cache Organization

/computed/{ProcessID}/slot/{SlotNumber}
/computed/{ProcessID}/{MessageID}
/computed/{ProcessID}/snapshot/{Slot}

Examples

% Slot path
<<"/computed/base64url_process_id/slot/0">>
<<"/computed/base64url_process_id/slot/42">>
 
% Message ID path
<<"/computed/base64url_process_id/base64url_message_id">>
 
% Snapshot path
<<"/computed/base64url_process_id/snapshot/10">>

Path Generation

path/2, path/3, path/4

path(ProcID, Ref, Opts)
path(ProcID, Ref, PathSuffix, Opts)
Ref Types:
  • integer() - Generates /slot/{N} path
  • root - Base computed path
  • slot_root - Slot directory path
  • binary() - Direct message ID path
Test Code:
-module(dev_process_cache_path_test).
-include_lib("eunit/include/eunit.hrl").
 
path_by_slot_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    ProcID = hb_util:human_id(<<1:256>>),
    
    Path = dev_process_cache:path(ProcID, 5, Opts),
    ?assert(is_binary(Path)),
    ?assert(binary:match(Path, <<"slot/5">>) =/= nomatch).
 
path_by_message_id_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    ProcID = hb_util:human_id(<<1:256>>),
    MessageID = hb_util:human_id(<<2:256>>),
    
    Path = dev_process_cache:path(ProcID, MessageID, Opts),
    ?assert(is_binary(Path)),
    ?assert(binary:match(Path, MessageID) =/= nomatch).
 
path_with_suffix_test() ->
    Opts = #{ store => hb_test_utils:test_store() },
    ProcID = hb_util:human_id(<<1:256>>),
    
    Path = dev_process_cache:path(ProcID, 0, [<<"results">>], Opts),
    ?assert(binary:match(Path, <<"results">>) =/= nomatch).

Common Patterns

%% Write computation result
ProcID = hb_util:human_id(ProcessMessageID),
State = #{ <<"results">> => #{ <<"data">> => <<"value">> } },
{ok, Path} = dev_process_cache:write(ProcID, SlotNumber, State, Opts).
 
%% Read by slot
{ok, State} = dev_process_cache:read(ProcID, 5, Opts).
 
%% Read by message ID
MessageID = hb_util:human_id(StateMessageID),
{ok, State} = dev_process_cache:read(ProcID, MessageID, Opts).
 
%% Get latest result
{ok, State} = dev_process_cache:read(ProcID, Opts).
 
%% Find latest with required path
{ok, Slot, State} = dev_process_cache:latest(
    ProcID,
    <<"/results/output">>,
    Opts
).
 
%% Find latest before specific slot
{ok, Slot, State} = dev_process_cache:latest(
    ProcID,
    <<"/state">>,
    MaxSlot,
    Opts
).
 
%% Check if slot exists
case dev_process_cache:read(ProcID, SlotNum, Opts) of
    {ok, _State} -> exists;
    {error, not_found} -> does_not_exist
end.
 
%% List all available slots
Path = dev_process_cache:path(ProcID, slot_root, Opts),
AllSlots = hb_cache:list_numbered(Path, Opts).

Integration with dev_process

%% Process uses cache for state restoration
% Find best starting point
case dev_process_cache:latest(ProcessID, <<"/state">>, SlotLimit, Opts) of
    {ok, StartSlot, StartState} ->
        % Restore from this point
        continue_from_state(StartState, StartSlot);
    not_found ->
        % Start from initialization
        init_new_state()
end.
 
%% Write after computation
ComputedState = execute_slot(Process, Slot),
{ok, _Path} = dev_process_cache:write(ProcessID, Slot, ComputedState, Opts).
 
%% Check prior results
case dev_process_cache:read(ProcessID, Slot, Opts) of
    {ok, CachedState} ->
        % Already computed, use cache
        CachedState;
    {error, not_found} ->
        % Need to compute
        compute_slot(Process, Slot)
end.

Slot Number Conventions

  • Zero-indexed: First slot is 0
  • Sequential: Slots computed in order (0, 1, 2, ...)
  • Immutable: Once written, slot results don't change
  • Sparse OK: Gaps in slot numbers allowed (cached snapshots)

Message ID Storage

Uncommitted ID

% Always stored with uncommitted ID for consistency
ID = hb_message:id(Msg, uncommitted, Opts)

Why Uncommitted?

  • Consistent before and after signing
  • Same message always maps to same ID
  • Enables lookup before commitment

Performance Considerations

Cache Linking

  • Single write, multiple references
  • No data duplication
  • Fast lookups by any reference

Latest Search

  • Reverse-sorted slot list
  • Early termination when match found
  • Efficient for recent results

Path Resolution

  • Store handles symlink resolution
  • Transparent to caller
  • Enables flexible storage backends

Store Integration

% Get configured store
Store = hb_opts:get(store, no_viable_store, Opts)
 
% Cache uses store for persistence
{ok, Root} = hb_cache:write(Msg, Opts)
 
% Link creates reference
hb_cache:link(Root, SlotPath, Opts)
 
% Read resolves links transparently
{ok, Msg} = hb_cache:read(SlotPath, Opts)

Test Suite

process_cache_suite_test_() ->
    hb_store:generate_test_suite(
        [
            {"write and read process outputs", 
             fun test_write_and_read_output/1},
            {"find latest output (with path)", 
             fun find_latest_outputs/1}
        ],
        [
            {Name, Opts}
        ||
            {Name, Opts} <- hb_store:test_stores()
        ]
    ).

Tests run against multiple store backends:

  • Memory store
  • File store
  • S3 store (if configured)

References

  • Process Device - dev_process.erl
  • Cache Module - hb_cache.erl
  • Store Module - hb_store.erl
  • Message Module - hb_message.erl
  • Utilities - hb_util.erl

Notes

  1. Wrapper Module: Simplifies hb_cache for process-specific use
  2. Multiple References: Slot number AND message ID access
  3. Immutable Audit: Complete history of process computations
  4. Path Conventions: Structured, predictable cache organization
  5. Latest Search: Efficient backward search with filters
  6. Required Paths: Enable finding specific state types
  7. Slot Limits: Control search scope for optimization
  8. ID Consistency: Always uses uncommitted IDs
  9. Link Strategy: Single storage, multiple access paths
  10. Store Agnostic: Works with any hb_store backend
  11. No Deletion: Write-only, immutable cache
  12. Slot Gaps: Sparse slot numbers allowed (snapshots)
  13. Concurrent Safe: Multiple processes can write different slots
  14. Path Suffix: Enables deep path checking
  15. Test Coverage: Comprehensive test suite with multiple stores