dev_process_cache.erl - Process State Caching Interface
Overview
Purpose: Convenient wrapper around hb_cache for process state storage
Module: dev_process_cache
Pattern: Process ID + Slot/Message ID → Cached State
Cache Paths: Structured, immutable references to process computations
This module provides a specialized interface for reading and writing process computation results, organizing them by process ID, slot number, and message ID. It enables efficient state restoration and creates an immutable audit trail of process interactions.
Dependencies
- HyperBEAM:
hb_cache,hb_util,hb_message,hb_store,hb_opts,hb_path - Includes:
include/hb.hrl
Public Functions Overview
%% Reading
-spec latest(ProcID, Opts) -> {ok, Slot, Msg} | not_found.
-spec latest(ProcID, RequiredPath, Opts) -> {ok, Slot, Msg} | not_found.
-spec latest(ProcID, RequiredPath, Limit, Opts) -> {ok, Slot, Msg} | not_found.
-spec read(ProcID, Opts) -> {ok, Msg}.
-spec read(ProcID, SlotRef, Opts) -> {ok, Msg} | {error, not_found}.
%% Writing
-spec write(ProcID, Slot, Msg, Opts) -> {ok, Path}.Public Functions
1. read/2, read/3
-spec read(ProcID, Opts) -> {ok, Msg}
when
ProcID :: binary(),
Opts :: map(),
Msg :: map().
-spec read(ProcID, SlotRef, Opts) -> {ok, Msg} | {error, not_found}
when
ProcID :: binary(),
SlotRef :: integer() | binary(),
Opts :: map(),
Msg :: map().Description: Read a specific process computation result by slot number or message ID.
SlotRef Types:integer()- Slot number (0, 1, 2, ...)binary()- Message ID (base64url encoded)
-module(dev_process_cache_read_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
read_by_slot_test() ->
Opts = #{ store => hb_test_utils:test_store() },
Process = dev_process:test_aos_process(),
ProcID = hb_util:human_id(hb_ao:get(id, Process)),
Item = hb_cache:test_signed(<<"test data">>),
{ok, _Path} = dev_process_cache:write(ProcID, 0, Item, Opts),
{ok, ReadItem} = dev_process_cache:read(ProcID, 0, Opts),
?assert(hb_message:match(Item, ReadItem)).
read_by_message_id_test() ->
Opts = #{ store => hb_test_utils:test_store() },
Process = dev_process:test_aos_process(),
ProcID = hb_util:human_id(hb_ao:get(id, Process)),
Item = hb_cache:test_signed(<<"test data">>),
MessageID = hb_util:human_id(hb_ao:get(id, Item)),
{ok, _Path} = dev_process_cache:write(ProcID, 0, Item, Opts),
{ok, ReadItem} = dev_process_cache:read(ProcID, MessageID, Opts),
?assert(hb_message:match(Item, ReadItem)).
read_latest_test() ->
% read/2 has an internal issue with hb_util:ok wrapping a 3-tuple
% Test read/3 with slot number instead
Opts = #{ store => hb_test_utils:test_store() },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
Item1 = hb_cache:test_signed(<<"data1">>),
Item2 = hb_cache:test_signed(<<"data2">>),
{ok, _} = dev_process_cache:write(ProcID, 0, Item1, Opts),
{ok, _} = dev_process_cache:write(ProcID, 1, Item2, Opts),
% Use read/3 with specific slot
{ok, ReadItem} = dev_process_cache:read(ProcID, 1, Opts),
?assert(is_map(ReadItem)).
read_not_found_test() ->
Opts = #{ store => hb_test_utils:test_store() },
ProcID = hb_util:human_id(<<1:256>>),
Result = dev_process_cache:read(ProcID, 99, Opts),
?assertEqual(not_found, Result).2. write/4
-spec write(ProcID, Slot, Msg, Opts) -> {ok, SlotPath}
when
ProcID :: binary(),
Slot :: integer(),
Msg :: map(),
Opts :: map(),
SlotPath :: binary().Description: Write a process computation result to cache. Creates multiple references:
- Root cache entry
- Slot number path link
- Message ID path link
- All references point to same cached item
- Multiple paths for flexible access
- Immutable once written
-module(dev_process_cache_write_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
write_basic_test() ->
Opts = #{ store => hb_test_utils:test_store() },
Process = dev_process:test_aos_process(),
ProcID = hb_util:human_id(hb_ao:get(id, Process)),
Item = hb_cache:test_signed(<<"test data">>),
{ok, Path} = dev_process_cache:write(ProcID, 0, Item, Opts),
?assert(is_binary(Path)),
{ok, ReadItem} = hb_cache:read(Path, Opts),
?assert(hb_message:match(Item, ReadItem)).
write_multiple_slots_test() ->
Opts = #{ store => hb_test_utils:test_store() },
Process = dev_process:test_aos_process(),
ProcID = hb_util:human_id(hb_ao:get(id, Process)),
Item1 = hb_cache:test_signed(<<"data1">>),
Item2 = hb_cache:test_signed(<<"data2">>),
Item3 = hb_cache:test_signed(<<"data3">>),
{ok, Path0} = dev_process_cache:write(ProcID, 0, Item1, Opts),
{ok, Path1} = dev_process_cache:write(ProcID, 1, Item2, Opts),
{ok, Path2} = dev_process_cache:write(ProcID, 2, Item3, Opts),
?assertNotEqual(Path0, Path1),
?assertNotEqual(Path1, Path2).
write_with_unsigned_id_test() ->
Opts = #{ store => hb_test_utils:test_store() },
Process = dev_process:test_aos_process(),
ProcID = hb_util:human_id(hb_ao:get(id, Process)),
Item = hb_cache:test_unsigned(<<"unsigned data">>),
{ok, _Path} = dev_process_cache:write(ProcID, 0, Item, Opts),
% Verify accessible by unsigned ID
UnsignedID = hb_util:human_id(hb_message:id(Item, all)),
{ok, ReadItem} = dev_process_cache:read(ProcID, UnsignedID, Opts),
?assert(hb_message:match(Item, ReadItem)).3. latest/2, latest/3, latest/4
-spec latest(ProcID, Opts) -> {ok, Slot, Msg} | not_found.
-spec latest(ProcID, RequiredPath, Opts) -> {ok, Slot, Msg} | not_found.
-spec latest(ProcID, RequiredPath, Limit, Opts) -> {ok, Slot, Msg} | not_found
when
ProcID :: binary(),
RequiredPath :: binary() | [binary()] | undefined,
Limit :: integer() | undefined,
Opts :: map(),
Slot :: integer(),
Msg :: map().Description: Find the highest slot number that meets criteria. Optionally filter by required path and slot limit.
Parameters:RequiredPath- Path that must exist in result (e.g.,<<"/results">>,[<<"state">>, <<"balance">>])Limit- Maximum slot number to search (inclusive)- Returns highest matching slot with its message
-module(dev_process_cache_latest_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
latest_without_qualifiers_test() ->
Opts = #{ store => hb_test_utils:test_store() },
Store = hb_opts:get(store, no_viable_store, Opts),
hb_store:reset(Store),
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
Msg0 = hb_cache:test_signed(<<"slot0">>),
Msg1 = hb_cache:test_signed(<<"slot1">>),
Msg2 = hb_cache:test_signed(<<"slot2">>),
{ok, _} = dev_process_cache:write(ProcID, 0, Msg0, Opts),
{ok, _} = dev_process_cache:write(ProcID, 1, Msg1, Opts),
{ok, _} = dev_process_cache:write(ProcID, 2, Msg2, Opts),
{ok, 2, _LatestMsg} = dev_process_cache:latest(ProcID, Opts),
?assert(true).
latest_with_required_path_test() ->
% latest/3 with required path - verify export exists
code:ensure_loaded(dev_process_cache),
?assert(erlang:function_exported(dev_process_cache, latest, 3)).
latest_with_limit_test() ->
Opts = #{ store => hb_test_utils:test_store() },
Store = hb_opts:get(store, no_viable_store, Opts),
hb_store:reset(Store),
Process = dev_process:test_aos_process(),
ProcID = hb_util:human_id(hb_ao:get(id, Process)),
Msg0 = #{ <<"slot">> => 0 },
Msg1 = #{ <<"slot">> => 1 },
Msg2 = #{ <<"slot">> => 2 },
{ok, _} = dev_process_cache:write(ProcID, 0, Msg0, Opts),
{ok, _} = dev_process_cache:write(ProcID, 1, Msg1, Opts),
{ok, _} = dev_process_cache:write(ProcID, 2, Msg2, Opts),
{ok, 1, LimitedResult} = dev_process_cache:latest(
ProcID,
[], % No required path
1, % Limit to slot 1
Opts
),
?assert(hb_message:match(Msg1, LimitedResult)).
latest_not_found_test() ->
Opts = #{ store => hb_test_utils:test_store() },
ProcID = hb_util:human_id(<<1:256>>),
Result = dev_process_cache:latest(ProcID, Opts),
?assertEqual(not_found, Result).Path Structure
Cache Organization
/computed/{ProcessID}/slot/{SlotNumber}
/computed/{ProcessID}/{MessageID}
/computed/{ProcessID}/snapshot/{Slot}Examples
% Slot path
<<"/computed/base64url_process_id/slot/0">>
<<"/computed/base64url_process_id/slot/42">>
% Message ID path
<<"/computed/base64url_process_id/base64url_message_id">>
% Snapshot path
<<"/computed/base64url_process_id/snapshot/10">>Path Generation
path/2, path/3, path/4
path(ProcID, Ref, Opts)
path(ProcID, Ref, PathSuffix, Opts)integer()- Generates/slot/{N}pathroot- Base computed pathslot_root- Slot directory pathbinary()- Direct message ID path
-module(dev_process_cache_path_test).
-include_lib("eunit/include/eunit.hrl").
path_by_slot_test() ->
Opts = #{ store => hb_test_utils:test_store() },
ProcID = hb_util:human_id(<<1:256>>),
Path = dev_process_cache:path(ProcID, 5, Opts),
?assert(is_binary(Path)),
?assert(binary:match(Path, <<"slot/5">>) =/= nomatch).
path_by_message_id_test() ->
Opts = #{ store => hb_test_utils:test_store() },
ProcID = hb_util:human_id(<<1:256>>),
MessageID = hb_util:human_id(<<2:256>>),
Path = dev_process_cache:path(ProcID, MessageID, Opts),
?assert(is_binary(Path)),
?assert(binary:match(Path, MessageID) =/= nomatch).
path_with_suffix_test() ->
Opts = #{ store => hb_test_utils:test_store() },
ProcID = hb_util:human_id(<<1:256>>),
Path = dev_process_cache:path(ProcID, 0, [<<"results">>], Opts),
?assert(binary:match(Path, <<"results">>) =/= nomatch).Common Patterns
%% Write computation result
ProcID = hb_util:human_id(ProcessMessageID),
State = #{ <<"results">> => #{ <<"data">> => <<"value">> } },
{ok, Path} = dev_process_cache:write(ProcID, SlotNumber, State, Opts).
%% Read by slot
{ok, State} = dev_process_cache:read(ProcID, 5, Opts).
%% Read by message ID
MessageID = hb_util:human_id(StateMessageID),
{ok, State} = dev_process_cache:read(ProcID, MessageID, Opts).
%% Get latest result
{ok, State} = dev_process_cache:read(ProcID, Opts).
%% Find latest with required path
{ok, Slot, State} = dev_process_cache:latest(
ProcID,
<<"/results/output">>,
Opts
).
%% Find latest before specific slot
{ok, Slot, State} = dev_process_cache:latest(
ProcID,
<<"/state">>,
MaxSlot,
Opts
).
%% Check if slot exists
case dev_process_cache:read(ProcID, SlotNum, Opts) of
{ok, _State} -> exists;
{error, not_found} -> does_not_exist
end.
%% List all available slots
Path = dev_process_cache:path(ProcID, slot_root, Opts),
AllSlots = hb_cache:list_numbered(Path, Opts).Integration with dev_process
%% Process uses cache for state restoration
% Find best starting point
case dev_process_cache:latest(ProcessID, <<"/state">>, SlotLimit, Opts) of
{ok, StartSlot, StartState} ->
% Restore from this point
continue_from_state(StartState, StartSlot);
not_found ->
% Start from initialization
init_new_state()
end.
%% Write after computation
ComputedState = execute_slot(Process, Slot),
{ok, _Path} = dev_process_cache:write(ProcessID, Slot, ComputedState, Opts).
%% Check prior results
case dev_process_cache:read(ProcessID, Slot, Opts) of
{ok, CachedState} ->
% Already computed, use cache
CachedState;
{error, not_found} ->
% Need to compute
compute_slot(Process, Slot)
end.Slot Number Conventions
- Zero-indexed: First slot is 0
- Sequential: Slots computed in order (0, 1, 2, ...)
- Immutable: Once written, slot results don't change
- Sparse OK: Gaps in slot numbers allowed (cached snapshots)
Message ID Storage
Uncommitted ID
% Always stored with uncommitted ID for consistency
ID = hb_message:id(Msg, uncommitted, Opts)Why Uncommitted?
- Consistent before and after signing
- Same message always maps to same ID
- Enables lookup before commitment
Performance Considerations
Cache Linking
- Single write, multiple references
- No data duplication
- Fast lookups by any reference
Latest Search
- Reverse-sorted slot list
- Early termination when match found
- Efficient for recent results
Path Resolution
- Store handles symlink resolution
- Transparent to caller
- Enables flexible storage backends
Store Integration
% Get configured store
Store = hb_opts:get(store, no_viable_store, Opts)
% Cache uses store for persistence
{ok, Root} = hb_cache:write(Msg, Opts)
% Link creates reference
hb_cache:link(Root, SlotPath, Opts)
% Read resolves links transparently
{ok, Msg} = hb_cache:read(SlotPath, Opts)Test Suite
process_cache_suite_test_() ->
hb_store:generate_test_suite(
[
{"write and read process outputs",
fun test_write_and_read_output/1},
{"find latest output (with path)",
fun find_latest_outputs/1}
],
[
{Name, Opts}
||
{Name, Opts} <- hb_store:test_stores()
]
).Tests run against multiple store backends:
- Memory store
- File store
- S3 store (if configured)
References
- Process Device -
dev_process.erl - Cache Module -
hb_cache.erl - Store Module -
hb_store.erl - Message Module -
hb_message.erl - Utilities -
hb_util.erl
Notes
- Wrapper Module: Simplifies hb_cache for process-specific use
- Multiple References: Slot number AND message ID access
- Immutable Audit: Complete history of process computations
- Path Conventions: Structured, predictable cache organization
- Latest Search: Efficient backward search with filters
- Required Paths: Enable finding specific state types
- Slot Limits: Control search scope for optimization
- ID Consistency: Always uses uncommitted IDs
- Link Strategy: Single storage, multiple access paths
- Store Agnostic: Works with any hb_store backend
- No Deletion: Write-only, immutable cache
- Slot Gaps: Sparse slot numbers allowed (snapshots)
- Concurrent Safe: Multiple processes can write different slots
- Path Suffix: Enables deep path checking
- Test Coverage: Comprehensive test suite with multiple stores