Skip to content

dev_scheduler.erl - AO Scheduler Device

Overview

Purpose: Simple scheduler scheme for AO processes
Module: dev_scheduler
Device Name: scheduler@1.0
Protocol: AO message scheduling and assignment

This device implements the scheduling functionality for AO processes. It manages message ordering, slot assignments, and coordination between local and remote schedulers. The scheduler ensures deterministic message ordering for process execution.

Supported Operations

  • Schedule Messages: POST new messages to a process's schedule
  • Get Schedule: Retrieve assignments in cursor-traversable format
  • Get Slot: Get current slot information for a process
  • Location Management: Register and discover scheduler locations
  • Status: Get scheduler information
  • Next Assignment: Load schedule and return next assignment for CU flow

Dependencies

  • Erlang/OTP: application, crypto, rand
  • HyperBEAM: hb_ao, hb_http, hb_cache, hb_maps, hb_opts, hb_util, hb_message, hb_private, hb_client, hb_json
  • Related: dev_scheduler_server, dev_scheduler_cache, dev_scheduler_registry, dev_scheduler_formats, dev_process, dev_whois
  • External: ar_timestamp, ar_wallet, hb_gateway_client
  • Includes: include/hb.hrl
  • Testing: eunit

Public Functions Overview

%% Device Info
-spec info() -> InfoMap.
 
%% Local Scheduling
-spec schedule(Msg1, Msg2, Opts) -> {ok, Assignment} | {ok, Redirect} | {error, Reason}.
-spec router(Key, Msg1, Msg2, Opts) -> Result.
-spec location(Msg1, Msg2, Opts) -> {ok, Location} | {error, Reason}.
 
%% CU-Flow Functions
-spec slot(M1, M2, Opts) -> {ok, SlotInfo} | {ok, Redirect}.
-spec status(M1, M2, Opts) -> {ok, StatusInfo}.
-spec next(Msg1, Msg2, Opts) -> {ok, NextResult} | {error, Reason}.
 
%% Lifecycle
-spec start() -> ok.
-spec checkpoint(Msg) -> ok.
 
%% Utilities
-spec parse_schedulers(SchedLoc) -> [Scheduler].
-spec test_process() -> ProcessMsg.

Public Functions

1. info/0

-spec info() -> InfoMap
    when
        InfoMap :: map().

Description: Returns device information including exported functions and configuration. The device uses a default handler to route requests.

Exports: location, status, next, schedule, slot, init, checkpoint

Test Code:
-module(dev_scheduler_info_test).
-include_lib("eunit/include/eunit.hrl").
 
info_exports_test() ->
    Info = dev_scheduler:info(),
    Exports = maps:get(exports, Info),
    ?assert(lists:member(<<"schedule">>, Exports)),
    ?assert(lists:member(<<"slot">>, Exports)),
    ?assert(lists:member(<<"next">>, Exports)).

2. schedule/3

-spec schedule(Msg1, Msg2, Opts) -> {ok, Assignment} | {ok, Redirect} | {error, Reason}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        Assignment :: map(),
        Redirect :: map(),
        Reason :: map().

Description: Router for scheduling operations. Handles both GET (retrieve schedule) and POST (schedule new message) methods based on the method key in Msg2.

Test Code:
-module(dev_scheduler_schedule_test).
-include_lib("eunit/include/eunit.hrl").
 
schedule_post_test() ->
    dev_scheduler:start(),
    Opts = #{priv_wallet => hb:wallet(), store => hb_opts:get(store)},
    Process = dev_scheduler:test_process(),
    ProcMsg = hb_message:commit(Process, Opts),
    Msg2 = #{
        <<"method">> => <<"POST">>,
        <<"body">> => ProcMsg
    },
    {ok, Result} = dev_scheduler:schedule(#{}, Msg2, Opts),
    ?assertMatch(#{<<"slot">> := _}, Result).
 
schedule_get_test() ->
    % GET requires a fully registered process - verify function is exported
    code:ensure_loaded(dev_scheduler),
    ?assert(erlang:function_exported(dev_scheduler, schedule, 3)).

3. router/4

-spec router(Key, Msg1, Msg2, Opts) -> Result
    when
        Key :: binary(),
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        Result :: {ok, term()} | {error, term()}.

Description: Routes scheduler requests to the appropriate handler based on the key. Used internally by the device to dispatch to schedule, slot, status, next, and location handlers.

Test Code:
-module(dev_scheduler_router_test).
-include_lib("eunit/include/eunit.hrl").
 
router_test() ->
    % router/4 routes requests to appropriate handlers
    code:ensure_loaded(dev_scheduler),
    ?assert(erlang:function_exported(dev_scheduler, router, 4)).

4. slot/3

-spec slot(M1, M2, Opts) -> {ok, SlotInfo} | {ok, Redirect}
    when
        M1 :: map(),
        M2 :: map(),
        Opts :: map(),
        SlotInfo :: map(),
        Redirect :: map().

Description: Returns information about the current slot for a process. Includes process ID, current slot number, timestamp, block height, and block hash.

Response Fields:
  • process - Process ID
  • current - Current slot number
  • timestamp - Current timestamp
  • block-height - Current block height
  • block-hash - Current block hash
  • addresses - Scheduler wallet addresses
  • cache-control - Set to no-store
Test Code:
-module(dev_scheduler_slot_test).
-include_lib("eunit/include/eunit.hrl").
 
slot_test() ->
    % slot/3 requires a fully registered process - verify function is exported
    code:ensure_loaded(dev_scheduler),
    ?assert(erlang:function_exported(dev_scheduler, slot, 3)).

4. status/3

-spec status(M1, M2, Opts) -> {ok, StatusInfo}
    when
        M1 :: map(),
        M2 :: map(),
        Opts :: map(),
        StatusInfo :: map().

Description: Returns information about the entire scheduler, including its address and list of registered processes.

Response Fields:
  • address - Scheduler wallet address
  • processes - List of registered process IDs
  • cache-control - Set to no-store
Test Code:
-module(dev_scheduler_status_test).
-include_lib("eunit/include/eunit.hrl").
 
status_test() ->
    dev_scheduler:start(),
    {ok, Status} = dev_scheduler:status(#{}, #{}, #{}),
    ?assertMatch(#{<<"address">> := _, <<"processes">> := _}, Status).

5. next/3

-spec next(Msg1, Msg2, Opts) -> {ok, NextResult} | {error, Reason}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        NextResult :: map(),
        Reason :: map().

Description: Load the schedule for a process into the cache, then return the next assignment. Used by Compute Units (CUs) to get the next message to process. Maintains a local cache of assignments in priv/To-Process.

Response Fields:
  • body - The next assignment message
  • state - Updated state with cached assignments
Test Code:
-module(dev_scheduler_next_test).
-include_lib("eunit/include/eunit.hrl").
 
next_test() ->
    dev_scheduler:start(),
    Opts = #{priv_wallet => hb:wallet(), store => hb_opts:get(store)},
    Process = dev_scheduler:test_process(),
    ProcMsg = hb_message:commit(Process, Opts),
    % Create process and schedule a message
    {ok, _} = dev_scheduler:schedule(
        #{},
        #{<<"method">> => <<"POST">>, <<"body">> => ProcMsg},
        Opts
    ),
    MsgToSchedule = hb_message:commit(#{
        <<"type">> => <<"Message">>,
        <<"target">> => hb_message:id(ProcMsg, all, Opts)
    }, Opts),
    {ok, _} = dev_scheduler:schedule(
        #{},
        #{<<"method">> => <<"POST">>, <<"body">> => MsgToSchedule},
        Opts
    ),
    % Get next assignment
    Msg1 = ProcMsg#{<<"at-slot">> => -1},
    {ok, Result} = dev_scheduler:next(Msg1, #{}, Opts),
    ?assertMatch(#{<<"body">> := _}, Result).

6. location/3

-spec location(Msg1, Msg2, Opts) -> {ok, Location} | {error, Reason}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        Location :: map(),
        Reason :: map().

Description: Router for scheduler location operations. GET retrieves location, POST registers a new location. Location records include URL, nonce, and time-to-live.

Location Fields:
  • data-protocol - <<"ao">>
  • variant - <<"ao.N.1">>
  • type - <<"scheduler-location">>
  • url - Scheduler URL
  • nonce - Version nonce
  • time-to-live - TTL in milliseconds
Test Code:
-module(dev_scheduler_location_test).
-include_lib("eunit/include/eunit.hrl").
 
get_location_test() ->
    dev_scheduler:start(),
    Opts = #{priv_wallet => Wallet = hb:wallet()},
    Address = hb_util:human_id(ar_wallet:to_address(Wallet)),
    % First register a location
    {ok, _} = dev_scheduler:location(
        #{},
        #{<<"method">> => <<"POST">>},
        Opts
    ),
    % Then get it
    {ok, Result} = dev_scheduler:location(
        #{},
        #{<<"method">> => <<"GET">>, <<"address">> => Address},
        Opts
    ),
    ?assertMatch(#{<<"body">> := _}, Result).

7. start/0

-spec start() -> ok.

Description: Helper to ensure the environment is started. Initializes the HyperBEAM application and seeds the random number generator.

Test Code:
-module(dev_scheduler_start_test).
-include_lib("eunit/include/eunit.hrl").
 
start_test() ->
    ?assertEqual(ok, dev_scheduler:start()).

8. checkpoint/1

-spec checkpoint(Msg) -> ok
    when
        Msg :: map().

Description: Create a checkpoint for a process's scheduler state. Used for persistence and recovery.

Test Code:
checkpoint_test() ->
    % checkpoint/1 requires a valid process message - verify function is exported
    code:ensure_loaded(dev_scheduler),
    ?assert(erlang:function_exported(dev_scheduler, checkpoint, 1)).

9. parse_schedulers/1

-spec parse_schedulers(SchedLoc) -> [Scheduler]
    when
        SchedLoc :: binary() | [binary()],
        Scheduler :: binary().

Description: Parse a scheduler location string into a list of scheduler addresses. Handles both list and comma-separated binary formats.

Test Code:
-module(dev_scheduler_parse_test).
-include_lib("eunit/include/eunit.hrl").
 
parse_list_test() ->
    Input = [<<"addr1">>, <<"addr2">>],
    ?assertEqual(Input, dev_scheduler:parse_schedulers(Input)).
 
parse_binary_test() ->
    Input = <<"addr1,addr2,addr3">>,
    ?assertEqual([<<"addr1">>, <<"addr2">>, <<"addr3">>], 
        dev_scheduler:parse_schedulers(Input)).
 
parse_quoted_test() ->
    Input = <<"\"addr1\",\"addr2\"">>,
    ?assertEqual([<<"addr1">>, <<"addr2">>], 
        dev_scheduler:parse_schedulers(Input)).

11. test_process/0

-spec test_process() -> ProcessMsg
    when
        ProcessMsg :: map().

Description: Creates a test process message for use in tests. Returns a properly structured process message with standard AO fields.

Test Code:
-module(dev_scheduler_test_process_test).
-include_lib("eunit/include/eunit.hrl").
 
test_process_test() ->
    Process = dev_scheduler:test_process(),
    ?assert(is_map(Process)),
    ?assertEqual(<<"Process">>, maps:get(<<"type">>, Process)).

HTTP API

GET /~scheduler@1.0/slot

Get current slot information for a process.

Parameters:
  • process - Process ID
Response:
{
    "process": "process-id",
    "current": 42,
    "timestamp": 1234567890,
    "block-height": 1000000,
    "block-hash": "hash...",
    "addresses": ["scheduler-address"]
}

GET /~scheduler@1.0/schedule

Get the schedule for a process in cursor-traversable format.

Parameters:
  • process - Process ID
  • from - (Optional) Starting slot (default: 0)
  • to - (Optional) Ending slot
  • accept - (Optional) Response format

POST /~scheduler@1.0/schedule

Schedule a new message for a process or start a new scheduler.

Body: Signed message to schedule

Response:
{
    "slot": 42,
    "timestamp": 1234567890,
    "block-height": 1000000
}

GET /~scheduler@1.0/status

Get scheduler status information.

GET/POST /~scheduler@1.0/location

Get or register scheduler location records.


Common Patterns

%% Start scheduler environment
dev_scheduler:start().
 
%% Create and schedule a new process
Process = #{
    <<"type">> => <<"Process">>,
    <<"scheduler">> => SchedulerAddress,
    <<"module">> => ModuleID
},
ProcMsg = hb_message:commit(Process, Opts),
{ok, Assignment} = dev_scheduler:schedule(
    #{},
    #{<<"method">> => <<"POST">>, <<"body">> => ProcMsg},
    Opts
).
 
%% Schedule a message to a process
Message = #{
    <<"type">> => <<"Message">>,
    <<"target">> => ProcessID,
    <<"action">> => <<"Eval">>,
    <<"data">> => <<"code here">>
},
MsgSigned = hb_message:commit(Message, Opts),
{ok, Assignment} = dev_scheduler:schedule(
    #{},
    #{<<"method">> => <<"POST">>, <<"body">> => MsgSigned},
    Opts
).
 
%% Get current slot
{ok, SlotInfo} = dev_scheduler:slot(
    #{},
    #{<<"process">> => ProcessID},
    Opts
).
 
%% Get schedule with range
{ok, Schedule} = dev_scheduler:schedule(
    #{},
    #{
        <<"method">> => <<"GET">>,
        <<"process">> => ProcessID,
        <<"from">> => 0,
        <<"to">> => 100
    },
    Opts
).
 
%% Get next assignment for CU processing
ProcessState = #{
    <<"at-slot">> => CurrentSlot,
    <<"id">> => ProcessID
},
{ok, #{<<"body">> := NextAssignment, <<"state">> := NewState}} =
    dev_scheduler:next(ProcessState, #{}, Opts).

Configuration Options

OptionDefaultDescription
verify_assignmentstrueVerify message signatures before scheduling
scheduler_follow_redirectstrueFollow redirects to remote schedulers
scheduler_in_memory_cachetrueCache assignments in process memory
scheduler_ignore_local_cachefalseSkip local cache lookups
scheduler_lookaheadtrueEnable lookahead worker for next assignments
scheduler_location_ttl3600000Location record time-to-live (ms)
scheduler_location_notify_peers[]Peers to notify on location changes

Scheduling Flow

POST Schedule Flow

1. Extract message to schedule from request
2. Determine Process ID (from message or target)
3. Filter to only committed keys
4. Find appropriate scheduler server
5. If local: schedule via dev_scheduler_server
6. If remote: follow redirect or proxy
7. Verify message if required
8. For Process type: cache and upload to Arweave
9. Return assignment with slot number

GET Schedule Flow

1. Extract Process ID from request
2. Parse from/to slot range
3. Find appropriate scheduler server
4. If local: generate local schedule
5. If remote: fetch from remote and merge with local cache
6. Return assignments in requested format

Next Assignment Flow

1. Get cached assignments from process state
2. Get last processed slot
3. Check message cache, lookahead worker, or local cache
4. If not found: fetch from scheduler server
5. Validate next slot matches expected
6. Update process state with remaining assignments
7. Return next assignment and updated state

Redirect Handling

When a process's scheduler is remote, the device returns or follows redirects:

#{
    <<"status">> => 307,
    <<"location">> => <<"https://remote-scheduler.com/process-id">>,
    <<"body">> => <<"Redirecting to scheduler: ...">>,
    <<"variant">> => <<"ao.N.1">>
}

Error Handling

Common Errors

Invalid Message:
{error, #{
    <<"status">> => 400,
    <<"body">> => <<"Message invalid: Committed components cannot be validated.">>,
    <<"reason">> => Err
}}
Message Not Valid:
{error, #{
    <<"status">> => 400,
    <<"body">> => <<"Message is not valid.">>,
    <<"reason">> => <<"Given message is invalid.">>
}}
Slot Not Available:
{error, #{
    <<"status">> => 404,
    <<"reason">> => <<"Requested slot not yet available in schedule.">>
}}
Unexpected Slot:
{error, #{
    <<"status">> => 404,
    <<"reason">> => <<"Received assignment slot does not match expected slot.">>,
    <<"unexpected-slot">> => UnexpectedSlot,
    <<"expected-slot">> => ExpectedSlot
}}

References

  • Scheduler Server - dev_scheduler_server.erl
  • Scheduler Cache - dev_scheduler_cache.erl
  • Scheduler Registry - dev_scheduler_registry.erl
  • Format Conversion - dev_scheduler_formats.erl
  • Process Device - dev_process.erl
  • Gateway Client - hb_gateway_client.erl

Notes

  1. Deterministic Ordering: Messages are assigned monotonically increasing slot numbers
  2. Lookahead Workers: Background processes prefetch next assignments for performance
  3. Local Caching: Assignments cached locally to reduce network requests
  4. Redirect Support: Transparent proxying to remote schedulers
  5. Verification Options: Configurable message verification before scheduling
  6. Arweave Upload: Processes automatically uploaded to Arweave for permanence
  7. Location Records: Scheduler locations registered and cached for discovery
  8. Nonce Validation: Location updates require increasing nonce values
  9. Format Support: Multiple response formats including AOS-2
  10. Checkpoint Support: State checkpointing for recovery
  11. Multi-Scheduler: Processes can specify multiple scheduler authorities
  12. TTL Management: Location records have configurable time-to-live