dev_scheduler_server.erl - Scheduler Server Process
Overview
Purpose: Long-lived server process that sequences messages for an AO process
Module: dev_scheduler_server
Role: Bottleneck for slot assignment to prevent duplicate slot allocation
Timeout: 10 seconds default for scheduling requests
This module implements the core scheduling server that assigns messages to sequential slots for a specific AO process. It acts as a deliberate serialization point to ensure that no two messages can be assigned to the same slot, maintaining schedule integrity.
Supported Operations
- Server Lifecycle: Start and stop scheduler servers
- Message Scheduling: Assign messages to sequential slots
- Server Information: Query current slot and state
- Hash Chain: Maintain cryptographic chain linking assignments
Dependencies
- HyperBEAM:
hb_ao,hb_opts,hb_message,hb_util,hb_maps,hb_client,hb_private,hb_name - Scheduler:
dev_scheduler,dev_scheduler_cache,dev_scheduler_registry - Arweave:
ar_wallet,ar_timestamp - Testing:
eunit - Includes:
include/hb.hrl
Constants
%% Default timeout for scheduling requests (milliseconds)
-define(DEFAULT_TIMEOUT, 10000).Public Functions Overview
%% Server Lifecycle
-spec start(ProcID, Proc, Opts) -> pid().
-spec stop(ProcID) -> ok.
%% Scheduling
-spec schedule(ProcID, Message) -> Assignment.
%% Information
-spec info(ProcID) -> State.Public Functions
1. start/3
-spec start(ProcID, Proc, Opts) -> pid()
when
ProcID :: binary(),
Proc :: map(),
Opts :: map().Description: Start a scheduling server for a given process. Registers the server with the name service, writes the process to cache, and initializes from the latest known assignment if resuming.
Server State Structure:#{
id => ProcID,
current => CurrentSlot, % -1 for new process
hash_chain => HashChainBinary,
wallets => [CommitmentWallets],
mode => SchedulingMode,
opts => Opts
}-module(dev_scheduler_server_start_test).
-include_lib("eunit/include/eunit.hrl").
start_new_process_test() ->
Wallet = ar_wallet:new(),
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => Wallet,
scheduling_mode => local_confirmation
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
ProcID = hb_message:id(Proc, all, Opts),
dev_scheduler_registry:start(),
Pid = dev_scheduler_server:start(ProcID, Proc, Opts),
?assert(is_pid(Pid)),
?assert(is_process_alive(Pid)),
Info = dev_scheduler_server:info(Pid),
?assertEqual(-1, maps:get(current, Info)).2. schedule/2
-spec schedule(ProcID, Message) -> Assignment
when
ProcID :: binary() | pid(),
Message :: map(),
Assignment :: map().Description: Schedule a message by assigning it to the next slot. Can accept either an AO process ID (which is looked up in the registry) or an Erlang PID directly. Returns the assignment message with slot number, hash chain, and Arweave block info.
Assignment Structure:#{
<<"path">> => Path,
<<"data-protocol">> => <<"ao">>,
<<"variant">> => <<"ao.N.1">>,
<<"process">> => ProcessID,
<<"epoch">> => <<"0">>,
<<"slot">> => SlotNumber,
<<"block-height">> => ArweaveHeight,
<<"block-hash">> => ArweaveBlockHash,
<<"block-timestamp">> => ArweaveTimestamp,
<<"timestamp">> => LocalTimestamp,
<<"hash-chain">> => HashChainValue,
<<"body">> => OriginalMessage,
<<"type">> => <<"Assignment">>
}-module(dev_scheduler_server_schedule_test).
-include_lib("eunit/include/eunit.hrl").
schedule_message_test() ->
Wallet = ar_wallet:new(),
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => Wallet,
scheduling_mode => local_confirmation
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
dev_scheduler_registry:start(),
ProcID = hb_message:id(Proc, all, Opts),
Pid = dev_scheduler_registry:find(ProcID, Proc, Opts),
% Schedule the process itself first
Assignment0 = dev_scheduler_server:schedule(Pid, Proc),
?assertEqual(0, maps:get(<<"slot">>, Assignment0)),
% Schedule a message
Message = hb_message:commit(#{
<<"type">> => <<"Message">>,
<<"action">> => <<"Test">>
}, Opts),
Assignment1 = dev_scheduler_server:schedule(Pid, Message),
?assertEqual(1, maps:get(<<"slot">>, Assignment1)),
?assert(maps:is_key(<<"hash-chain">>, Assignment1)),
?assert(maps:is_key(<<"body">>, Assignment1)).
schedule_sequential_slots_test() ->
Wallet = ar_wallet:new(),
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => Wallet,
scheduling_mode => aggressive
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
dev_scheduler_registry:start(),
ProcID = hb_message:id(Proc, all, Opts),
Pid = dev_scheduler_registry:find(ProcID, Proc, Opts),
Assignments = lists:map(
fun(_) ->
Msg = hb_message:commit(#{ <<"data">> => <<"test">> }, Opts),
dev_scheduler_server:schedule(Pid, Msg)
end,
lists:seq(1, 5)
),
Slots = [maps:get(<<"slot">>, A) || A <- Assignments],
?assertEqual([0, 1, 2, 3, 4], Slots).3. info/1
-spec info(ProcID) -> State
when
ProcID :: pid(),
State :: map().Description: Get the current state of the scheduling server, including current slot, hash chain, and configuration.
Test Code:-module(dev_scheduler_server_info_test).
-include_lib("eunit/include/eunit.hrl").
info_test() ->
Wallet = ar_wallet:new(),
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => Wallet,
scheduling_mode => local_confirmation
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
dev_scheduler_registry:start(),
Pid = dev_scheduler_registry:find(
hb_message:id(Proc, all, Opts),
Proc,
Opts
),
Info = dev_scheduler_server:info(Pid),
?assert(is_map(Info)),
?assert(maps:is_key(current, Info)),
?assert(maps:is_key(hash_chain, Info)),
?assert(maps:is_key(mode, Info)).4. stop/1
-spec stop(ProcID) -> ok
when
ProcID :: pid().Description: Stop a scheduling server process.
Test Code:-module(dev_scheduler_server_stop_test).
-include_lib("eunit/include/eunit.hrl").
stop_test() ->
Wallet = ar_wallet:new(),
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => Wallet,
scheduling_mode => local_confirmation
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
dev_scheduler_registry:start(),
Pid = dev_scheduler_registry:find(
hb_message:id(Proc, all, Opts),
Proc,
Opts
),
?assert(is_process_alive(Pid)),
dev_scheduler_server:stop(Pid),
timer:sleep(100),
?assertNot(is_process_alive(Pid)).Scheduling Modes
The server supports different confirmation modes that control when the scheduling response is returned:
| Mode | Trigger Point | Use Case |
|---|---|---|
aggressive | Immediately after assignment creation | Maximum throughput |
local_confirmation | After local cache write | Balance of speed and durability |
remote_confirmation | After Arweave upload | Maximum durability |
%% Mode determines when client receives response
maybe_inform_recipient(Mode, ReplyPID, Message, Assignment, State) ->
case maps:get(mode, State) of
Mode -> ReplyPID ! {scheduled, Message, Assignment};
_ -> ok
end.Hash Chain
Each assignment includes a hash chain value that cryptographically links it to all previous assignments:
next_hashchain(HashChain, Message, Opts) ->
ID = hb_message:id(Message, all, Opts),
crypto:hash(sha256, << HashChain/binary, ID/binary >>).This provides a tamper-evident log of the schedule.
Common Patterns
%% Start scheduler and schedule messages
dev_scheduler_registry:start(),
Wallet = ar_wallet:new(),
Opts = #{
store => Store,
priv_wallet => Wallet,
scheduling_mode => local_confirmation
},
Process = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
ProcID = hb_message:id(Process, all, Opts),
Pid = dev_scheduler_registry:find(ProcID, Process, Opts),
%% Schedule the process
ProcessAssignment = dev_scheduler_server:schedule(Pid, Process),
%% Schedule messages
lists:foreach(
fun(Msg) ->
Assignment = dev_scheduler_server:schedule(Pid, Msg),
io:format("Assigned to slot ~p~n", [maps:get(<<"slot">>, Assignment)])
end,
Messages
).
%% Check current state
Info = dev_scheduler_server:info(Pid),
CurrentSlot = maps:get(current, Info),
io:format("Current slot: ~p~n", [CurrentSlot]).
%% Graceful shutdown
dev_scheduler_server:stop(Pid).Timeout Handling
The server implements timeout handling to prevent stale requests from being processed:
%% Client sends abort time with request
AbortTime = scheduler_time() + ?DEFAULT_TIMEOUT,
ErlangProcID ! {schedule, Message, self(), AbortTime}.
%% Server ignores requests past their abort time
case scheduler_time() > AbortTime of
true ->
% Request too old, ignore
server(State);
false ->
server(assign(State, Message, Reply))
end.Multi-Wallet Commitment
Processes can specify multiple schedulers, and the server will commit assignments with all appropriate wallets:
commitment_wallets(ProcMsg, Opts) ->
SchedulerVal = hb_ao:get_first([
{ProcMsg, <<"scheduler">>},
{ProcMsg, <<"scheduler-location">>}
], [], Opts),
lists:filtermap(
fun(Scheduler) ->
case hb_opts:as(Scheduler, Opts) of
{ok, #{ priv_wallet := Wallet }} -> {true, Wallet};
_ -> false
end
end,
dev_scheduler:parse_schedulers(SchedulerVal)
).References
- Main Scheduler -
dev_scheduler.erl - Scheduler Registry -
dev_scheduler_registry.erl - Scheduler Cache -
dev_scheduler_cache.erl - Client Upload -
hb_client.erl - Message Handling -
hb_message.erl
Notes
- Serialization Point: Single process ensures no duplicate slot assignments
- Linked Process: Server is linked to creator via
spawn_link - Name Registration: Throws if another scheduler already registered for process
- Hash Chain Continuity: Resume from latest assignment preserves chain integrity
- Async Upload: Arweave uploads happen asynchronously in background
- Only Committed: Only attested keys from messages are included in assignments
- Timestamp Sources: Includes both Arweave block time and local scheduler time
- Error Recovery: Assignment errors are caught and logged, server continues
- Mode Flexibility: Different modes trade off latency vs durability
- Path Preservation: Original request path is preserved in assignment