Skip to content

dev_scheduler_server.erl - Scheduler Server Process

Overview

Purpose: Long-lived server process that sequences messages for an AO process
Module: dev_scheduler_server
Role: Bottleneck for slot assignment to prevent duplicate slot allocation
Timeout: 10 seconds default for scheduling requests

This module implements the core scheduling server that assigns messages to sequential slots for a specific AO process. It acts as a deliberate serialization point to ensure that no two messages can be assigned to the same slot, maintaining schedule integrity.

Supported Operations

  • Server Lifecycle: Start and stop scheduler servers
  • Message Scheduling: Assign messages to sequential slots
  • Server Information: Query current slot and state
  • Hash Chain: Maintain cryptographic chain linking assignments

Dependencies

  • HyperBEAM: hb_ao, hb_opts, hb_message, hb_util, hb_maps, hb_client, hb_private, hb_name
  • Scheduler: dev_scheduler, dev_scheduler_cache, dev_scheduler_registry
  • Arweave: ar_wallet, ar_timestamp
  • Testing: eunit
  • Includes: include/hb.hrl

Constants

%% Default timeout for scheduling requests (milliseconds)
-define(DEFAULT_TIMEOUT, 10000).

Public Functions Overview

%% Server Lifecycle
-spec start(ProcID, Proc, Opts) -> pid().
-spec stop(ProcID) -> ok.
 
%% Scheduling
-spec schedule(ProcID, Message) -> Assignment.
 
%% Information
-spec info(ProcID) -> State.

Public Functions

1. start/3

-spec start(ProcID, Proc, Opts) -> pid()
    when
        ProcID :: binary(),
        Proc :: map(),
        Opts :: map().

Description: Start a scheduling server for a given process. Registers the server with the name service, writes the process to cache, and initializes from the latest known assignment if resuming.

Server State Structure:
#{
    id => ProcID,
    current => CurrentSlot,        % -1 for new process
    hash_chain => HashChainBinary,
    wallets => [CommitmentWallets],
    mode => SchedulingMode,
    opts => Opts
}
Test Code:
-module(dev_scheduler_server_start_test).
-include_lib("eunit/include/eunit.hrl").
 
start_new_process_test() ->
    Wallet = ar_wallet:new(),
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => Wallet,
        scheduling_mode => local_confirmation
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
    }, Opts),
    ProcID = hb_message:id(Proc, all, Opts),
    dev_scheduler_registry:start(),
    Pid = dev_scheduler_server:start(ProcID, Proc, Opts),
    ?assert(is_pid(Pid)),
    ?assert(is_process_alive(Pid)),
    Info = dev_scheduler_server:info(Pid),
    ?assertEqual(-1, maps:get(current, Info)).

2. schedule/2

-spec schedule(ProcID, Message) -> Assignment
    when
        ProcID :: binary() | pid(),
        Message :: map(),
        Assignment :: map().

Description: Schedule a message by assigning it to the next slot. Can accept either an AO process ID (which is looked up in the registry) or an Erlang PID directly. Returns the assignment message with slot number, hash chain, and Arweave block info.

Assignment Structure:
#{
    <<"path">> => Path,
    <<"data-protocol">> => <<"ao">>,
    <<"variant">> => <<"ao.N.1">>,
    <<"process">> => ProcessID,
    <<"epoch">> => <<"0">>,
    <<"slot">> => SlotNumber,
    <<"block-height">> => ArweaveHeight,
    <<"block-hash">> => ArweaveBlockHash,
    <<"block-timestamp">> => ArweaveTimestamp,
    <<"timestamp">> => LocalTimestamp,
    <<"hash-chain">> => HashChainValue,
    <<"body">> => OriginalMessage,
    <<"type">> => <<"Assignment">>
}
Test Code:
-module(dev_scheduler_server_schedule_test).
-include_lib("eunit/include/eunit.hrl").
 
schedule_message_test() ->
    Wallet = ar_wallet:new(),
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => Wallet,
        scheduling_mode => local_confirmation
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
    }, Opts),
    dev_scheduler_registry:start(),
    ProcID = hb_message:id(Proc, all, Opts),
    Pid = dev_scheduler_registry:find(ProcID, Proc, Opts),
    % Schedule the process itself first
    Assignment0 = dev_scheduler_server:schedule(Pid, Proc),
    ?assertEqual(0, maps:get(<<"slot">>, Assignment0)),
    % Schedule a message
    Message = hb_message:commit(#{
        <<"type">> => <<"Message">>,
        <<"action">> => <<"Test">>
    }, Opts),
    Assignment1 = dev_scheduler_server:schedule(Pid, Message),
    ?assertEqual(1, maps:get(<<"slot">>, Assignment1)),
    ?assert(maps:is_key(<<"hash-chain">>, Assignment1)),
    ?assert(maps:is_key(<<"body">>, Assignment1)).
 
schedule_sequential_slots_test() ->
    Wallet = ar_wallet:new(),
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => Wallet,
        scheduling_mode => aggressive
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
    }, Opts),
    dev_scheduler_registry:start(),
    ProcID = hb_message:id(Proc, all, Opts),
    Pid = dev_scheduler_registry:find(ProcID, Proc, Opts),
    Assignments = lists:map(
        fun(_) ->
            Msg = hb_message:commit(#{ <<"data">> => <<"test">> }, Opts),
            dev_scheduler_server:schedule(Pid, Msg)
        end,
        lists:seq(1, 5)
    ),
    Slots = [maps:get(<<"slot">>, A) || A <- Assignments],
    ?assertEqual([0, 1, 2, 3, 4], Slots).

3. info/1

-spec info(ProcID) -> State
    when
        ProcID :: pid(),
        State :: map().

Description: Get the current state of the scheduling server, including current slot, hash chain, and configuration.

Test Code:
-module(dev_scheduler_server_info_test).
-include_lib("eunit/include/eunit.hrl").
 
info_test() ->
    Wallet = ar_wallet:new(),
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => Wallet,
        scheduling_mode => local_confirmation
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
    }, Opts),
    dev_scheduler_registry:start(),
    Pid = dev_scheduler_registry:find(
        hb_message:id(Proc, all, Opts),
        Proc,
        Opts
    ),
    Info = dev_scheduler_server:info(Pid),
    ?assert(is_map(Info)),
    ?assert(maps:is_key(current, Info)),
    ?assert(maps:is_key(hash_chain, Info)),
    ?assert(maps:is_key(mode, Info)).

4. stop/1

-spec stop(ProcID) -> ok
    when
        ProcID :: pid().

Description: Stop a scheduling server process.

Test Code:
-module(dev_scheduler_server_stop_test).
-include_lib("eunit/include/eunit.hrl").
 
stop_test() ->
    Wallet = ar_wallet:new(),
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => Wallet,
        scheduling_mode => local_confirmation
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
    }, Opts),
    dev_scheduler_registry:start(),
    Pid = dev_scheduler_registry:find(
        hb_message:id(Proc, all, Opts),
        Proc,
        Opts
    ),
    ?assert(is_process_alive(Pid)),
    dev_scheduler_server:stop(Pid),
    timer:sleep(100),
    ?assertNot(is_process_alive(Pid)).

Scheduling Modes

The server supports different confirmation modes that control when the scheduling response is returned:

ModeTrigger PointUse Case
aggressiveImmediately after assignment creationMaximum throughput
local_confirmationAfter local cache writeBalance of speed and durability
remote_confirmationAfter Arweave uploadMaximum durability
%% Mode determines when client receives response
maybe_inform_recipient(Mode, ReplyPID, Message, Assignment, State) ->
    case maps:get(mode, State) of
        Mode -> ReplyPID ! {scheduled, Message, Assignment};
        _ -> ok
    end.

Hash Chain

Each assignment includes a hash chain value that cryptographically links it to all previous assignments:

next_hashchain(HashChain, Message, Opts) ->
    ID = hb_message:id(Message, all, Opts),
    crypto:hash(sha256, << HashChain/binary, ID/binary >>).

This provides a tamper-evident log of the schedule.


Common Patterns

%% Start scheduler and schedule messages
dev_scheduler_registry:start(),
Wallet = ar_wallet:new(),
Opts = #{
    store => Store,
    priv_wallet => Wallet,
    scheduling_mode => local_confirmation
},
Process = hb_message:commit(#{
    <<"type">> => <<"Process">>,
    <<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
ProcID = hb_message:id(Process, all, Opts),
Pid = dev_scheduler_registry:find(ProcID, Process, Opts),
 
%% Schedule the process
ProcessAssignment = dev_scheduler_server:schedule(Pid, Process),
 
%% Schedule messages
lists:foreach(
    fun(Msg) ->
        Assignment = dev_scheduler_server:schedule(Pid, Msg),
        io:format("Assigned to slot ~p~n", [maps:get(<<"slot">>, Assignment)])
    end,
    Messages
).
 
%% Check current state
Info = dev_scheduler_server:info(Pid),
CurrentSlot = maps:get(current, Info),
io:format("Current slot: ~p~n", [CurrentSlot]).
 
%% Graceful shutdown
dev_scheduler_server:stop(Pid).

Timeout Handling

The server implements timeout handling to prevent stale requests from being processed:

%% Client sends abort time with request
AbortTime = scheduler_time() + ?DEFAULT_TIMEOUT,
ErlangProcID ! {schedule, Message, self(), AbortTime}.
 
%% Server ignores requests past their abort time
case scheduler_time() > AbortTime of
    true ->
        % Request too old, ignore
        server(State);
    false ->
        server(assign(State, Message, Reply))
end.

Multi-Wallet Commitment

Processes can specify multiple schedulers, and the server will commit assignments with all appropriate wallets:

commitment_wallets(ProcMsg, Opts) ->
    SchedulerVal = hb_ao:get_first([
        {ProcMsg, <<"scheduler">>},
        {ProcMsg, <<"scheduler-location">>}
    ], [], Opts),
    lists:filtermap(
        fun(Scheduler) ->
            case hb_opts:as(Scheduler, Opts) of
                {ok, #{ priv_wallet := Wallet }} -> {true, Wallet};
                _ -> false
            end
        end,
        dev_scheduler:parse_schedulers(SchedulerVal)
    ).

References

  • Main Scheduler - dev_scheduler.erl
  • Scheduler Registry - dev_scheduler_registry.erl
  • Scheduler Cache - dev_scheduler_cache.erl
  • Client Upload - hb_client.erl
  • Message Handling - hb_message.erl

Notes

  1. Serialization Point: Single process ensures no duplicate slot assignments
  2. Linked Process: Server is linked to creator via spawn_link
  3. Name Registration: Throws if another scheduler already registered for process
  4. Hash Chain Continuity: Resume from latest assignment preserves chain integrity
  5. Async Upload: Arweave uploads happen asynchronously in background
  6. Only Committed: Only attested keys from messages are included in assignments
  7. Timestamp Sources: Includes both Arweave block time and local scheduler time
  8. Error Recovery: Assignment errors are caught and logged, server continues
  9. Mode Flexibility: Different modes trade off latency vs durability
  10. Path Preservation: Original request path is preserved in assignment