dev_scheduler_cache.erl - Scheduler Assignment Cache
Overview
Purpose: Provide persistent caching for scheduler assignments and location records
Module: dev_scheduler_cache
Cache Prefix: ~scheduler@1.0
Storage: Configurable via scheduler_store option
This module provides a caching layer for scheduler assignments and scheduler location records. It supports separate volatile and non-volatile storage configurations, enabling assignments to be stored temporarily while location records persist across restarts.
Supported Operations
- Assignment Writing: Store assignments with process/slot indexing
- Assignment Reading: Retrieve assignments by process ID and slot
- Assignment Listing: List all slots for a process
- Latest Assignment: Find the most recent assignment for a process
- Location Records: Cache scheduler location information
Dependencies
- HyperBEAM:
hb_cache,hb_store,hb_opts,hb_ao,hb_message,hb_util,hb_maps - Scheduler:
dev_scheduler_formats - Testing:
eunit - Includes:
include/hb.hrl
Public Functions Overview
%% Assignment Operations
-spec write(Assignment, Opts) -> ok | {error, Reason}.
-spec write_spawn(InitMessage, Opts) -> {ok, Path} | {error, Reason}.
-spec read(ProcID, Slot, Opts) -> {ok, Assignment} | not_found.
-spec list(ProcID, Opts) -> [Slot].
-spec latest(ProcID, Opts) -> {Slot, HashChain} | not_found.
%% Location Operations
-spec write_location(LocationMsg, Opts) -> ok | {error, Reason}.
-spec read_location(Address, Opts) -> {ok, Location} | not_found.Public Functions
1. write/2
-spec write(Assignment, Opts) -> ok | {error, Reason}
when
Assignment :: map(),
Opts :: map(),
Reason :: term().Description: Write an assignment message into the cache. Creates symlinks from process ID and slot number to the underlying data for efficient retrieval.
Test Code:-module(dev_scheduler_cache_write_test).
-include_lib("eunit/include/eunit.hrl").
write_assignment_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"write-test">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
Assignment = #{
<<"variant">> => <<"ao.N.1">>,
<<"process">> => ProcID,
<<"slot">> => 1,
<<"hash-chain">> => crypto:strong_rand_bytes(64)
},
?assertEqual(ok, dev_scheduler_cache:write(Assignment, Opts)),
% Verify it can be read back
{ok, Read} = dev_scheduler_cache:read(ProcID, 1, Opts),
?assertEqual(1, hb_ao:get(<<"slot">>, Read, Opts)).
write_multiple_slots_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"multi-slot">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
lists:foreach(
fun(Slot) ->
Assignment = #{
<<"variant">> => <<"ao.N.1">>,
<<"process">> => ProcID,
<<"slot">> => Slot,
<<"hash-chain">> => crypto:strong_rand_bytes(64)
},
?assertEqual(ok, dev_scheduler_cache:write(Assignment, Opts))
end,
lists:seq(1, 10)
),
Slots = dev_scheduler_cache:list(ProcID, Opts),
?assertEqual(10, length(Slots)).2. write_spawn/2
-spec write_spawn(InitMessage, Opts) -> {ok, Path} | {error, Reason}
when
InitMessage :: map(),
Opts :: map(),
Path :: binary(),
Reason :: term().Description: Write the initial spawn/process message to the cache. Used when starting a new scheduler server to persist the process definition.
Test Code:-module(dev_scheduler_cache_spawn_test).
-include_lib("eunit/include/eunit.hrl").
write_spawn_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"spawn-test">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
InitMessage = #{
<<"type">> => <<"Process">>,
<<"variant">> => <<"ao.N.1">>,
<<"scheduler">> => <<"test-scheduler">>
},
Result = dev_scheduler_cache:write_spawn(InitMessage, Opts),
?assertMatch({ok, _}, Result).3. read/3
-spec read(ProcID, Slot, Opts) -> {ok, Assignment} | not_found
when
ProcID :: binary(),
Slot :: integer() | binary(),
Opts :: map(),
Assignment :: map().Description: Get an assignment message from the cache by process ID and slot number. Automatically handles AOS2 format conversion for legacy assignments.
Test Code:-module(dev_scheduler_cache_read_test).
-include_lib("eunit/include/eunit.hrl").
read_existing_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"read-test">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
Assignment = #{
<<"variant">> => <<"ao.N.1">>,
<<"process">> => ProcID,
<<"slot">> => 5,
<<"hash-chain">> => <<"test-chain">>
},
ok = dev_scheduler_cache:write(Assignment, Opts),
{ok, Read} = dev_scheduler_cache:read(ProcID, 5, Opts),
?assertEqual(5, hb_ao:get(<<"slot">>, Read, Opts)).
read_not_found_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"read-notfound">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
?assertEqual(not_found, dev_scheduler_cache:read(ProcID, 999, Opts)).
read_integer_slot_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"read-int">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
Assignment = #{
<<"variant">> => <<"ao.N.1">>,
<<"process">> => ProcID,
<<"slot">> => 7,
<<"hash-chain">> => <<"chain">>
},
ok = dev_scheduler_cache:write(Assignment, Opts),
% Read with integer slot
{ok, _} = dev_scheduler_cache:read(ProcID, 7, Opts),
% Read with binary slot
{ok, _} = dev_scheduler_cache:read(ProcID, <<"7">>, Opts).4. list/2
-spec list(ProcID, Opts) -> [Slot]
when
ProcID :: binary(),
Opts :: map(),
Slot :: integer().Description: Get all slot numbers that have assignments cached for a process. Returns a list of integers.
Test Code:-module(dev_scheduler_cache_list_test).
-include_lib("eunit/include/eunit.hrl").
list_empty_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"list-empty">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
?assertEqual([], dev_scheduler_cache:list(ProcID, Opts)).
list_with_assignments_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"list-assign">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
Slots = [1, 5, 10, 15, 20],
lists:foreach(
fun(Slot) ->
Assignment = #{
<<"variant">> => <<"ao.N.1">>,
<<"process">> => ProcID,
<<"slot">> => Slot,
<<"hash-chain">> => <<"chain">>
},
ok = dev_scheduler_cache:write(Assignment, Opts)
end,
Slots
),
Listed = dev_scheduler_cache:list(ProcID, Opts),
?assertEqual(Slots, lists:sort(Listed)).5. latest/2
-spec latest(ProcID, Opts) -> {Slot, HashChain} | not_found
when
ProcID :: binary(),
Opts :: map(),
Slot :: integer(),
HashChain :: binary().Description: Get the latest (highest slot number) assignment from the cache for a process. Returns both the slot number and the hash-chain value.
Test Code:-module(dev_scheduler_cache_latest_test).
-include_lib("eunit/include/eunit.hrl").
latest_empty_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"latest-empty">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
?assertEqual(not_found, dev_scheduler_cache:latest(ProcID, Opts)).
latest_with_assignments_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"latest-assign">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
HashChain = <<"latest-hash-chain">>,
lists:foreach(
fun(Slot) ->
Assignment = #{
<<"variant">> => <<"ao.N.1">>,
<<"process">> => ProcID,
<<"slot">> => Slot,
<<"hash-chain">> =>
if Slot == 100 -> HashChain;
true -> <<"other-chain">>
end
},
ok = dev_scheduler_cache:write(Assignment, Opts)
end,
[1, 50, 100, 25, 75]
),
{LatestSlot, LatestChain} = dev_scheduler_cache:latest(ProcID, Opts),
?assertEqual(100, LatestSlot),
?assertEqual(HashChain, LatestChain).6. read_location/2
-spec read_location(Address, Opts) -> {ok, Location} | not_found
when
Address :: binary(),
Opts :: map(),
Location :: map().Description: Read the latest known scheduler location for a given address. Used for scheduler discovery.
Test Code:-module(dev_scheduler_cache_location_test).
-include_lib("eunit/include/eunit.hrl").
read_location_not_found_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"loc-notfound">>),
hb_store:start(Store),
Opts = #{ store => [Store], scheduler_store => [Store] },
Address = hb_util:human_id(crypto:strong_rand_bytes(32)),
?assertEqual(not_found, dev_scheduler_cache:read_location(Address, Opts)).
read_location_found_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"loc-found">>),
Wallet = ar_wallet:new(),
hb_store:start(Store),
Opts = #{
store => [Store],
scheduler_store => [Store],
priv_wallet => Wallet
},
Address = hb_util:human_id(ar_wallet:to_address(Wallet)),
LocationMsg = hb_message:commit(#{
<<"type">> => <<"scheduler-location">>,
<<"url">> => <<"http://scheduler.example.com">>
}, Opts),
ok = dev_scheduler_cache:write_location(LocationMsg, Opts),
{ok, Location} = dev_scheduler_cache:read_location(Address, Opts),
?assert(is_map(Location)).7. write_location/2
-spec write_location(LocationMsg, Opts) -> ok | {error, Reason}
when
LocationMsg :: map(),
Opts :: map(),
Reason :: binary() | term().Description: Write a scheduler location record to the cache. Creates symlinks for each signer address to enable lookup by scheduler address.
Test Code:-module(dev_scheduler_cache_write_location_test).
-include_lib("eunit/include/eunit.hrl").
write_location_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"write-loc">>),
Wallet = ar_wallet:new(),
hb_store:start(Store),
Opts = #{
store => [Store],
scheduler_store => [Store],
priv_wallet => Wallet
},
LocationMsg = hb_message:commit(#{
<<"type">> => <<"scheduler-location">>,
<<"url">> => <<"http://test.scheduler.com">>,
<<"nonce">> => 1
}, Opts),
?assertEqual(ok, dev_scheduler_cache:write_location(LocationMsg, Opts)).
write_location_multiple_signers_test() ->
Store = hb_test_utils:test_store(hb_store_fs, <<"write-multi">>),
Wallet1 = ar_wallet:new(),
Wallet2 = ar_wallet:new(),
hb_store:start(Store),
Opts1 = #{ store => [Store], scheduler_store => [Store], priv_wallet => Wallet1 },
Opts2 = #{ store => [Store], scheduler_store => [Store], priv_wallet => Wallet2 },
LocationMsg = hb_message:commit(
hb_message:commit(#{
<<"type">> => <<"scheduler-location">>,
<<"url">> => <<"http://multi.scheduler.com">>
}, Opts1),
Opts2
),
ok = dev_scheduler_cache:write_location(LocationMsg, Opts1),
% Both signers should be able to find the location
Addr1 = hb_util:human_id(ar_wallet:to_address(Wallet1)),
Addr2 = hb_util:human_id(ar_wallet:to_address(Wallet2)),
{ok, _} = dev_scheduler_cache:read_location(Addr1, Opts1),
{ok, _} = dev_scheduler_cache:read_location(Addr2, Opts1).Cache Path Structure
~scheduler@1.0/
├── assignments/
│ └── {ProcessID}/
│ ├── 0 -> /path/to/assignment
│ ├── 1 -> /path/to/assignment
│ └── ...
└── locations/
└── {SchedulerAddress} -> /path/to/locationCommon Patterns
%% Configure separate volatile and non-volatile stores
Opts = #{
store => [NonVolatileStore],
scheduler_store => [VolatileStore]
}.
%% Write assignment and verify
Assignment = #{
<<"variant">> => <<"ao.N.1">>,
<<"process">> => ProcessID,
<<"slot">> => Slot,
<<"hash-chain">> => HashChain,
<<"body">> => Message
},
ok = dev_scheduler_cache:write(Assignment, Opts).
%% Resume scheduling from latest assignment
case dev_scheduler_cache:latest(ProcessID, Opts) of
not_found ->
{StartSlot, InitialChain} = {-1, <<>>};
{Slot, Chain} ->
{StartSlot, InitialChain} = {Slot, Chain}
end.
%% Iterate through all assignments
Slots = dev_scheduler_cache:list(ProcessID, Opts),
Assignments = lists:map(
fun(Slot) ->
{ok, A} = dev_scheduler_cache:read(ProcessID, Slot, Opts),
A
end,
lists:sort(Slots)
).
%% Cache scheduler location
LocationMsg = hb_message:commit(#{
<<"type">> => <<"scheduler-location">>,
<<"url">> => <<"http://my-scheduler.example.com:8734">>,
<<"nonce">> => Nonce,
<<"time-to-live">> => 3600000
}, Opts),
ok = dev_scheduler_cache:write_location(LocationMsg, Opts).Storage Configuration
Volatile vs Non-Volatile Storage
The module supports separate stores for assignments (potentially volatile) and general data:
%% Assignments in RAM-backed store, other data on disk
Opts = #{
store => [#{ <<"store-module">> => hb_store_fs, <<"name">> => <<"data">> }],
scheduler_store => [#{ <<"store-module">> => hb_store_fs, <<"name">> => <<"volatile">> }]
}.Store Reset Behavior
- Resetting
scheduler_storeclears all cached assignments - Resetting main
storedoes not affect assignment symlinks - Location records persist with the scheduler store
References
- Main Scheduler -
dev_scheduler.erl - Scheduler Server -
dev_scheduler_server.erl - Format Conversion -
dev_scheduler_formats.erl - Cache System -
hb_cache.erl - Store Interface -
hb_store.erl
Notes
- Symlink Structure: Uses filesystem symlinks for efficient slot-based lookups
- Format Detection: Automatically detects and converts AOS2 (
ao.TN.1) assignments - Slot Normalization: Handles both integer and binary slot numbers
- Store Merging: Combines
scheduler_storewith main store configuration - Location Indexing: Creates symlinks for all signers of location records
- Human IDs: Uses human-readable encoded IDs for all path components
- Numbered Listing: Uses
hb_cache:list_numbered/2for slot enumeration - Hash Chain: Preserves hash-chain values for schedule continuity
- Error Handling: Returns
{error, Reason}for write failures - Cache Loading: Uses
hb_cache:ensure_all_loaded/2for lazy loading