Skip to content

dev_scheduler_cache.erl - Scheduler Assignment Cache

Overview

Purpose: Provide persistent caching for scheduler assignments and location records
Module: dev_scheduler_cache
Cache Prefix: ~scheduler@1.0
Storage: Configurable via scheduler_store option

This module provides a caching layer for scheduler assignments and scheduler location records. It supports separate volatile and non-volatile storage configurations, enabling assignments to be stored temporarily while location records persist across restarts.

Supported Operations

  • Assignment Writing: Store assignments with process/slot indexing
  • Assignment Reading: Retrieve assignments by process ID and slot
  • Assignment Listing: List all slots for a process
  • Latest Assignment: Find the most recent assignment for a process
  • Location Records: Cache scheduler location information

Dependencies

  • HyperBEAM: hb_cache, hb_store, hb_opts, hb_ao, hb_message, hb_util, hb_maps
  • Scheduler: dev_scheduler_formats
  • Testing: eunit
  • Includes: include/hb.hrl

Public Functions Overview

%% Assignment Operations
-spec write(Assignment, Opts) -> ok | {error, Reason}.
-spec write_spawn(InitMessage, Opts) -> {ok, Path} | {error, Reason}.
-spec read(ProcID, Slot, Opts) -> {ok, Assignment} | not_found.
-spec list(ProcID, Opts) -> [Slot].
-spec latest(ProcID, Opts) -> {Slot, HashChain} | not_found.
 
%% Location Operations
-spec write_location(LocationMsg, Opts) -> ok | {error, Reason}.
-spec read_location(Address, Opts) -> {ok, Location} | not_found.

Public Functions

1. write/2

-spec write(Assignment, Opts) -> ok | {error, Reason}
    when
        Assignment :: map(),
        Opts :: map(),
        Reason :: term().

Description: Write an assignment message into the cache. Creates symlinks from process ID and slot number to the underlying data for efficient retrieval.

Test Code:
-module(dev_scheduler_cache_write_test).
-include_lib("eunit/include/eunit.hrl").
 
write_assignment_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"write-test">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    Assignment = #{
        <<"variant">> => <<"ao.N.1">>,
        <<"process">> => ProcID,
        <<"slot">> => 1,
        <<"hash-chain">> => crypto:strong_rand_bytes(64)
    },
    ?assertEqual(ok, dev_scheduler_cache:write(Assignment, Opts)),
    % Verify it can be read back
    {ok, Read} = dev_scheduler_cache:read(ProcID, 1, Opts),
    ?assertEqual(1, hb_ao:get(<<"slot">>, Read, Opts)).
 
write_multiple_slots_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"multi-slot">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    lists:foreach(
        fun(Slot) ->
            Assignment = #{
                <<"variant">> => <<"ao.N.1">>,
                <<"process">> => ProcID,
                <<"slot">> => Slot,
                <<"hash-chain">> => crypto:strong_rand_bytes(64)
            },
            ?assertEqual(ok, dev_scheduler_cache:write(Assignment, Opts))
        end,
        lists:seq(1, 10)
    ),
    Slots = dev_scheduler_cache:list(ProcID, Opts),
    ?assertEqual(10, length(Slots)).

2. write_spawn/2

-spec write_spawn(InitMessage, Opts) -> {ok, Path} | {error, Reason}
    when
        InitMessage :: map(),
        Opts :: map(),
        Path :: binary(),
        Reason :: term().

Description: Write the initial spawn/process message to the cache. Used when starting a new scheduler server to persist the process definition.

Test Code:
-module(dev_scheduler_cache_spawn_test).
-include_lib("eunit/include/eunit.hrl").
 
write_spawn_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"spawn-test">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    InitMessage = #{
        <<"type">> => <<"Process">>,
        <<"variant">> => <<"ao.N.1">>,
        <<"scheduler">> => <<"test-scheduler">>
    },
    Result = dev_scheduler_cache:write_spawn(InitMessage, Opts),
    ?assertMatch({ok, _}, Result).

3. read/3

-spec read(ProcID, Slot, Opts) -> {ok, Assignment} | not_found
    when
        ProcID :: binary(),
        Slot :: integer() | binary(),
        Opts :: map(),
        Assignment :: map().

Description: Get an assignment message from the cache by process ID and slot number. Automatically handles AOS2 format conversion for legacy assignments.

Test Code:
-module(dev_scheduler_cache_read_test).
-include_lib("eunit/include/eunit.hrl").
 
read_existing_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"read-test">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    Assignment = #{
        <<"variant">> => <<"ao.N.1">>,
        <<"process">> => ProcID,
        <<"slot">> => 5,
        <<"hash-chain">> => <<"test-chain">>
    },
    ok = dev_scheduler_cache:write(Assignment, Opts),
    {ok, Read} = dev_scheduler_cache:read(ProcID, 5, Opts),
    ?assertEqual(5, hb_ao:get(<<"slot">>, Read, Opts)).
 
read_not_found_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"read-notfound">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    ?assertEqual(not_found, dev_scheduler_cache:read(ProcID, 999, Opts)).
 
read_integer_slot_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"read-int">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    Assignment = #{
        <<"variant">> => <<"ao.N.1">>,
        <<"process">> => ProcID,
        <<"slot">> => 7,
        <<"hash-chain">> => <<"chain">>
    },
    ok = dev_scheduler_cache:write(Assignment, Opts),
    % Read with integer slot
    {ok, _} = dev_scheduler_cache:read(ProcID, 7, Opts),
    % Read with binary slot
    {ok, _} = dev_scheduler_cache:read(ProcID, <<"7">>, Opts).

4. list/2

-spec list(ProcID, Opts) -> [Slot]
    when
        ProcID :: binary(),
        Opts :: map(),
        Slot :: integer().

Description: Get all slot numbers that have assignments cached for a process. Returns a list of integers.

Test Code:
-module(dev_scheduler_cache_list_test).
-include_lib("eunit/include/eunit.hrl").
 
list_empty_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"list-empty">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    ?assertEqual([], dev_scheduler_cache:list(ProcID, Opts)).
 
list_with_assignments_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"list-assign">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    Slots = [1, 5, 10, 15, 20],
    lists:foreach(
        fun(Slot) ->
            Assignment = #{
                <<"variant">> => <<"ao.N.1">>,
                <<"process">> => ProcID,
                <<"slot">> => Slot,
                <<"hash-chain">> => <<"chain">>
            },
            ok = dev_scheduler_cache:write(Assignment, Opts)
        end,
        Slots
    ),
    Listed = dev_scheduler_cache:list(ProcID, Opts),
    ?assertEqual(Slots, lists:sort(Listed)).

5. latest/2

-spec latest(ProcID, Opts) -> {Slot, HashChain} | not_found
    when
        ProcID :: binary(),
        Opts :: map(),
        Slot :: integer(),
        HashChain :: binary().

Description: Get the latest (highest slot number) assignment from the cache for a process. Returns both the slot number and the hash-chain value.

Test Code:
-module(dev_scheduler_cache_latest_test).
-include_lib("eunit/include/eunit.hrl").
 
latest_empty_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"latest-empty">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    ?assertEqual(not_found, dev_scheduler_cache:latest(ProcID, Opts)).
 
latest_with_assignments_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"latest-assign">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    ProcID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    HashChain = <<"latest-hash-chain">>,
    lists:foreach(
        fun(Slot) ->
            Assignment = #{
                <<"variant">> => <<"ao.N.1">>,
                <<"process">> => ProcID,
                <<"slot">> => Slot,
                <<"hash-chain">> => 
                    if Slot == 100 -> HashChain;
                       true -> <<"other-chain">>
                    end
            },
            ok = dev_scheduler_cache:write(Assignment, Opts)
        end,
        [1, 50, 100, 25, 75]
    ),
    {LatestSlot, LatestChain} = dev_scheduler_cache:latest(ProcID, Opts),
    ?assertEqual(100, LatestSlot),
    ?assertEqual(HashChain, LatestChain).

6. read_location/2

-spec read_location(Address, Opts) -> {ok, Location} | not_found
    when
        Address :: binary(),
        Opts :: map(),
        Location :: map().

Description: Read the latest known scheduler location for a given address. Used for scheduler discovery.

Test Code:
-module(dev_scheduler_cache_location_test).
-include_lib("eunit/include/eunit.hrl").
 
read_location_not_found_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"loc-notfound">>),
    hb_store:start(Store),
    Opts = #{ store => [Store], scheduler_store => [Store] },
    Address = hb_util:human_id(crypto:strong_rand_bytes(32)),
    ?assertEqual(not_found, dev_scheduler_cache:read_location(Address, Opts)).
 
read_location_found_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"loc-found">>),
    Wallet = ar_wallet:new(),
    hb_store:start(Store),
    Opts = #{
        store => [Store],
        scheduler_store => [Store],
        priv_wallet => Wallet
    },
    Address = hb_util:human_id(ar_wallet:to_address(Wallet)),
    LocationMsg = hb_message:commit(#{
        <<"type">> => <<"scheduler-location">>,
        <<"url">> => <<"http://scheduler.example.com">>
    }, Opts),
    ok = dev_scheduler_cache:write_location(LocationMsg, Opts),
    {ok, Location} = dev_scheduler_cache:read_location(Address, Opts),
    ?assert(is_map(Location)).

7. write_location/2

-spec write_location(LocationMsg, Opts) -> ok | {error, Reason}
    when
        LocationMsg :: map(),
        Opts :: map(),
        Reason :: binary() | term().

Description: Write a scheduler location record to the cache. Creates symlinks for each signer address to enable lookup by scheduler address.

Test Code:
-module(dev_scheduler_cache_write_location_test).
-include_lib("eunit/include/eunit.hrl").
 
write_location_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"write-loc">>),
    Wallet = ar_wallet:new(),
    hb_store:start(Store),
    Opts = #{
        store => [Store],
        scheduler_store => [Store],
        priv_wallet => Wallet
    },
    LocationMsg = hb_message:commit(#{
        <<"type">> => <<"scheduler-location">>,
        <<"url">> => <<"http://test.scheduler.com">>,
        <<"nonce">> => 1
    }, Opts),
    ?assertEqual(ok, dev_scheduler_cache:write_location(LocationMsg, Opts)).
 
write_location_multiple_signers_test() ->
    Store = hb_test_utils:test_store(hb_store_fs, <<"write-multi">>),
    Wallet1 = ar_wallet:new(),
    Wallet2 = ar_wallet:new(),
    hb_store:start(Store),
    Opts1 = #{ store => [Store], scheduler_store => [Store], priv_wallet => Wallet1 },
    Opts2 = #{ store => [Store], scheduler_store => [Store], priv_wallet => Wallet2 },
    LocationMsg = hb_message:commit(
        hb_message:commit(#{
            <<"type">> => <<"scheduler-location">>,
            <<"url">> => <<"http://multi.scheduler.com">>
        }, Opts1),
        Opts2
    ),
    ok = dev_scheduler_cache:write_location(LocationMsg, Opts1),
    % Both signers should be able to find the location
    Addr1 = hb_util:human_id(ar_wallet:to_address(Wallet1)),
    Addr2 = hb_util:human_id(ar_wallet:to_address(Wallet2)),
    {ok, _} = dev_scheduler_cache:read_location(Addr1, Opts1),
    {ok, _} = dev_scheduler_cache:read_location(Addr2, Opts1).

Cache Path Structure

~scheduler@1.0/
├── assignments/
│   └── {ProcessID}/
│       ├── 0 -> /path/to/assignment
│       ├── 1 -> /path/to/assignment
│       └── ...
└── locations/
    └── {SchedulerAddress} -> /path/to/location

Common Patterns

%% Configure separate volatile and non-volatile stores
Opts = #{
    store => [NonVolatileStore],
    scheduler_store => [VolatileStore]
}.
 
%% Write assignment and verify
Assignment = #{
    <<"variant">> => <<"ao.N.1">>,
    <<"process">> => ProcessID,
    <<"slot">> => Slot,
    <<"hash-chain">> => HashChain,
    <<"body">> => Message
},
ok = dev_scheduler_cache:write(Assignment, Opts).
 
%% Resume scheduling from latest assignment
case dev_scheduler_cache:latest(ProcessID, Opts) of
    not_found ->
        {StartSlot, InitialChain} = {-1, <<>>};
    {Slot, Chain} ->
        {StartSlot, InitialChain} = {Slot, Chain}
end.
 
%% Iterate through all assignments
Slots = dev_scheduler_cache:list(ProcessID, Opts),
Assignments = lists:map(
    fun(Slot) ->
        {ok, A} = dev_scheduler_cache:read(ProcessID, Slot, Opts),
        A
    end,
    lists:sort(Slots)
).
 
%% Cache scheduler location
LocationMsg = hb_message:commit(#{
    <<"type">> => <<"scheduler-location">>,
    <<"url">> => <<"http://my-scheduler.example.com:8734">>,
    <<"nonce">> => Nonce,
    <<"time-to-live">> => 3600000
}, Opts),
ok = dev_scheduler_cache:write_location(LocationMsg, Opts).

Storage Configuration

Volatile vs Non-Volatile Storage

The module supports separate stores for assignments (potentially volatile) and general data:

%% Assignments in RAM-backed store, other data on disk
Opts = #{
    store => [#{ <<"store-module">> => hb_store_fs, <<"name">> => <<"data">> }],
    scheduler_store => [#{ <<"store-module">> => hb_store_fs, <<"name">> => <<"volatile">> }]
}.

Store Reset Behavior

  • Resetting scheduler_store clears all cached assignments
  • Resetting main store does not affect assignment symlinks
  • Location records persist with the scheduler store

References

  • Main Scheduler - dev_scheduler.erl
  • Scheduler Server - dev_scheduler_server.erl
  • Format Conversion - dev_scheduler_formats.erl
  • Cache System - hb_cache.erl
  • Store Interface - hb_store.erl

Notes

  1. Symlink Structure: Uses filesystem symlinks for efficient slot-based lookups
  2. Format Detection: Automatically detects and converts AOS2 (ao.TN.1) assignments
  3. Slot Normalization: Handles both integer and binary slot numbers
  4. Store Merging: Combines scheduler_store with main store configuration
  5. Location Indexing: Creates symlinks for all signers of location records
  6. Human IDs: Uses human-readable encoded IDs for all path components
  7. Numbered Listing: Uses hb_cache:list_numbered/2 for slot enumeration
  8. Hash Chain: Preserves hash-chain values for schedule continuity
  9. Error Handling: Returns {error, Reason} for write failures
  10. Cache Loading: Uses hb_cache:ensure_all_loaded/2 for lazy loading