dev_scheduler.erl - AO Scheduler Device
Overview
Purpose: Simple scheduler scheme for AO processes
Module: dev_scheduler
Device Name: scheduler@1.0
Protocol: AO message scheduling and assignment
This device implements the scheduling functionality for AO processes. It manages message ordering, slot assignments, and coordination between local and remote schedulers. The scheduler ensures deterministic message ordering for process execution.
Supported Operations
- Schedule Messages: POST new messages to a process's schedule
- Get Schedule: Retrieve assignments in cursor-traversable format
- Get Slot: Get current slot information for a process
- Location Management: Register and discover scheduler locations
- Status: Get scheduler information
- Next Assignment: Load schedule and return next assignment for CU flow
Dependencies
- Erlang/OTP:
application,crypto,rand - HyperBEAM:
hb_ao,hb_http,hb_cache,hb_maps,hb_opts,hb_util,hb_message,hb_private,hb_client,hb_json - Related:
dev_scheduler_server,dev_scheduler_cache,dev_scheduler_registry,dev_scheduler_formats,dev_process,dev_whois - External:
ar_timestamp,ar_wallet,hb_gateway_client - Includes:
include/hb.hrl - Testing:
eunit
Public Functions Overview
%% Device Info
-spec info() -> InfoMap.
%% Local Scheduling
-spec schedule(Msg1, Msg2, Opts) -> {ok, Assignment} | {ok, Redirect} | {error, Reason}.
-spec router(Key, Msg1, Msg2, Opts) -> Result.
-spec location(Msg1, Msg2, Opts) -> {ok, Location} | {error, Reason}.
%% CU-Flow Functions
-spec slot(M1, M2, Opts) -> {ok, SlotInfo} | {ok, Redirect}.
-spec status(M1, M2, Opts) -> {ok, StatusInfo}.
-spec next(Msg1, Msg2, Opts) -> {ok, NextResult} | {error, Reason}.
%% Lifecycle
-spec start() -> ok.
-spec checkpoint(Msg) -> ok.
%% Utilities
-spec parse_schedulers(SchedLoc) -> [Scheduler].
-spec test_process() -> ProcessMsg.Public Functions
1. info/0
-spec info() -> InfoMap
when
InfoMap :: map().Description: Returns device information including exported functions and configuration. The device uses a default handler to route requests.
Exports: location, status, next, schedule, slot, init, checkpoint
-module(dev_scheduler_info_test).
-include_lib("eunit/include/eunit.hrl").
info_exports_test() ->
Info = dev_scheduler:info(),
Exports = maps:get(exports, Info),
?assert(lists:member(<<"schedule">>, Exports)),
?assert(lists:member(<<"slot">>, Exports)),
?assert(lists:member(<<"next">>, Exports)).2. schedule/3
-spec schedule(Msg1, Msg2, Opts) -> {ok, Assignment} | {ok, Redirect} | {error, Reason}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Assignment :: map(),
Redirect :: map(),
Reason :: map().Description: Router for scheduling operations. Handles both GET (retrieve schedule) and POST (schedule new message) methods based on the method key in Msg2.
-module(dev_scheduler_schedule_test).
-include_lib("eunit/include/eunit.hrl").
schedule_post_test() ->
dev_scheduler:start(),
Opts = #{priv_wallet => hb:wallet(), store => hb_opts:get(store)},
Process = dev_scheduler:test_process(),
ProcMsg = hb_message:commit(Process, Opts),
Msg2 = #{
<<"method">> => <<"POST">>,
<<"body">> => ProcMsg
},
{ok, Result} = dev_scheduler:schedule(#{}, Msg2, Opts),
?assertMatch(#{<<"slot">> := _}, Result).
schedule_get_test() ->
% GET requires a fully registered process - verify function is exported
code:ensure_loaded(dev_scheduler),
?assert(erlang:function_exported(dev_scheduler, schedule, 3)).3. router/4
-spec router(Key, Msg1, Msg2, Opts) -> Result
when
Key :: binary(),
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Result :: {ok, term()} | {error, term()}.Description: Routes scheduler requests to the appropriate handler based on the key. Used internally by the device to dispatch to schedule, slot, status, next, and location handlers.
-module(dev_scheduler_router_test).
-include_lib("eunit/include/eunit.hrl").
router_test() ->
% router/4 routes requests to appropriate handlers
code:ensure_loaded(dev_scheduler),
?assert(erlang:function_exported(dev_scheduler, router, 4)).4. slot/3
-spec slot(M1, M2, Opts) -> {ok, SlotInfo} | {ok, Redirect}
when
M1 :: map(),
M2 :: map(),
Opts :: map(),
SlotInfo :: map(),
Redirect :: map().Description: Returns information about the current slot for a process. Includes process ID, current slot number, timestamp, block height, and block hash.
Response Fields:process- Process IDcurrent- Current slot numbertimestamp- Current timestampblock-height- Current block heightblock-hash- Current block hashaddresses- Scheduler wallet addressescache-control- Set tono-store
-module(dev_scheduler_slot_test).
-include_lib("eunit/include/eunit.hrl").
slot_test() ->
% slot/3 requires a fully registered process - verify function is exported
code:ensure_loaded(dev_scheduler),
?assert(erlang:function_exported(dev_scheduler, slot, 3)).4. status/3
-spec status(M1, M2, Opts) -> {ok, StatusInfo}
when
M1 :: map(),
M2 :: map(),
Opts :: map(),
StatusInfo :: map().Description: Returns information about the entire scheduler, including its address and list of registered processes.
Response Fields:address- Scheduler wallet addressprocesses- List of registered process IDscache-control- Set tono-store
-module(dev_scheduler_status_test).
-include_lib("eunit/include/eunit.hrl").
status_test() ->
dev_scheduler:start(),
{ok, Status} = dev_scheduler:status(#{}, #{}, #{}),
?assertMatch(#{<<"address">> := _, <<"processes">> := _}, Status).5. next/3
-spec next(Msg1, Msg2, Opts) -> {ok, NextResult} | {error, Reason}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
NextResult :: map(),
Reason :: map().Description: Load the schedule for a process into the cache, then return the next assignment. Used by Compute Units (CUs) to get the next message to process. Maintains a local cache of assignments in priv/To-Process.
body- The next assignment messagestate- Updated state with cached assignments
-module(dev_scheduler_next_test).
-include_lib("eunit/include/eunit.hrl").
next_test() ->
dev_scheduler:start(),
Opts = #{priv_wallet => hb:wallet(), store => hb_opts:get(store)},
Process = dev_scheduler:test_process(),
ProcMsg = hb_message:commit(Process, Opts),
% Create process and schedule a message
{ok, _} = dev_scheduler:schedule(
#{},
#{<<"method">> => <<"POST">>, <<"body">> => ProcMsg},
Opts
),
MsgToSchedule = hb_message:commit(#{
<<"type">> => <<"Message">>,
<<"target">> => hb_message:id(ProcMsg, all, Opts)
}, Opts),
{ok, _} = dev_scheduler:schedule(
#{},
#{<<"method">> => <<"POST">>, <<"body">> => MsgToSchedule},
Opts
),
% Get next assignment
Msg1 = ProcMsg#{<<"at-slot">> => -1},
{ok, Result} = dev_scheduler:next(Msg1, #{}, Opts),
?assertMatch(#{<<"body">> := _}, Result).6. location/3
-spec location(Msg1, Msg2, Opts) -> {ok, Location} | {error, Reason}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Location :: map(),
Reason :: map().Description: Router for scheduler location operations. GET retrieves location, POST registers a new location. Location records include URL, nonce, and time-to-live.
Location Fields:data-protocol-<<"ao">>variant-<<"ao.N.1">>type-<<"scheduler-location">>url- Scheduler URLnonce- Version noncetime-to-live- TTL in milliseconds
-module(dev_scheduler_location_test).
-include_lib("eunit/include/eunit.hrl").
get_location_test() ->
dev_scheduler:start(),
Opts = #{priv_wallet => Wallet = hb:wallet()},
Address = hb_util:human_id(ar_wallet:to_address(Wallet)),
% First register a location
{ok, _} = dev_scheduler:location(
#{},
#{<<"method">> => <<"POST">>},
Opts
),
% Then get it
{ok, Result} = dev_scheduler:location(
#{},
#{<<"method">> => <<"GET">>, <<"address">> => Address},
Opts
),
?assertMatch(#{<<"body">> := _}, Result).7. start/0
-spec start() -> ok.Description: Helper to ensure the environment is started. Initializes the HyperBEAM application and seeds the random number generator.
Test Code:-module(dev_scheduler_start_test).
-include_lib("eunit/include/eunit.hrl").
start_test() ->
?assertEqual(ok, dev_scheduler:start()).8. checkpoint/1
-spec checkpoint(Msg) -> ok
when
Msg :: map().Description: Create a checkpoint for a process's scheduler state. Used for persistence and recovery.
Test Code:checkpoint_test() ->
% checkpoint/1 requires a valid process message - verify function is exported
code:ensure_loaded(dev_scheduler),
?assert(erlang:function_exported(dev_scheduler, checkpoint, 1)).9. parse_schedulers/1
-spec parse_schedulers(SchedLoc) -> [Scheduler]
when
SchedLoc :: binary() | [binary()],
Scheduler :: binary().Description: Parse a scheduler location string into a list of scheduler addresses. Handles both list and comma-separated binary formats.
Test Code:-module(dev_scheduler_parse_test).
-include_lib("eunit/include/eunit.hrl").
parse_list_test() ->
Input = [<<"addr1">>, <<"addr2">>],
?assertEqual(Input, dev_scheduler:parse_schedulers(Input)).
parse_binary_test() ->
Input = <<"addr1,addr2,addr3">>,
?assertEqual([<<"addr1">>, <<"addr2">>, <<"addr3">>],
dev_scheduler:parse_schedulers(Input)).
parse_quoted_test() ->
Input = <<"\"addr1\",\"addr2\"">>,
?assertEqual([<<"addr1">>, <<"addr2">>],
dev_scheduler:parse_schedulers(Input)).11. test_process/0
-spec test_process() -> ProcessMsg
when
ProcessMsg :: map().Description: Creates a test process message for use in tests. Returns a properly structured process message with standard AO fields.
Test Code:-module(dev_scheduler_test_process_test).
-include_lib("eunit/include/eunit.hrl").
test_process_test() ->
Process = dev_scheduler:test_process(),
?assert(is_map(Process)),
?assertEqual(<<"Process">>, maps:get(<<"type">>, Process)).HTTP API
GET /~scheduler@1.0/slot
Get current slot information for a process.
Parameters:process- Process ID
{
"process": "process-id",
"current": 42,
"timestamp": 1234567890,
"block-height": 1000000,
"block-hash": "hash...",
"addresses": ["scheduler-address"]
}GET /~scheduler@1.0/schedule
Get the schedule for a process in cursor-traversable format.
Parameters:process- Process IDfrom- (Optional) Starting slot (default: 0)to- (Optional) Ending slotaccept- (Optional) Response format
POST /~scheduler@1.0/schedule
Schedule a new message for a process or start a new scheduler.
Body: Signed message to schedule
Response:{
"slot": 42,
"timestamp": 1234567890,
"block-height": 1000000
}GET /~scheduler@1.0/status
Get scheduler status information.
GET/POST /~scheduler@1.0/location
Get or register scheduler location records.
Common Patterns
%% Start scheduler environment
dev_scheduler:start().
%% Create and schedule a new process
Process = #{
<<"type">> => <<"Process">>,
<<"scheduler">> => SchedulerAddress,
<<"module">> => ModuleID
},
ProcMsg = hb_message:commit(Process, Opts),
{ok, Assignment} = dev_scheduler:schedule(
#{},
#{<<"method">> => <<"POST">>, <<"body">> => ProcMsg},
Opts
).
%% Schedule a message to a process
Message = #{
<<"type">> => <<"Message">>,
<<"target">> => ProcessID,
<<"action">> => <<"Eval">>,
<<"data">> => <<"code here">>
},
MsgSigned = hb_message:commit(Message, Opts),
{ok, Assignment} = dev_scheduler:schedule(
#{},
#{<<"method">> => <<"POST">>, <<"body">> => MsgSigned},
Opts
).
%% Get current slot
{ok, SlotInfo} = dev_scheduler:slot(
#{},
#{<<"process">> => ProcessID},
Opts
).
%% Get schedule with range
{ok, Schedule} = dev_scheduler:schedule(
#{},
#{
<<"method">> => <<"GET">>,
<<"process">> => ProcessID,
<<"from">> => 0,
<<"to">> => 100
},
Opts
).
%% Get next assignment for CU processing
ProcessState = #{
<<"at-slot">> => CurrentSlot,
<<"id">> => ProcessID
},
{ok, #{<<"body">> := NextAssignment, <<"state">> := NewState}} =
dev_scheduler:next(ProcessState, #{}, Opts).Configuration Options
| Option | Default | Description |
|---|---|---|
verify_assignments | true | Verify message signatures before scheduling |
scheduler_follow_redirects | true | Follow redirects to remote schedulers |
scheduler_in_memory_cache | true | Cache assignments in process memory |
scheduler_ignore_local_cache | false | Skip local cache lookups |
scheduler_lookahead | true | Enable lookahead worker for next assignments |
scheduler_location_ttl | 3600000 | Location record time-to-live (ms) |
scheduler_location_notify_peers | [] | Peers to notify on location changes |
Scheduling Flow
POST Schedule Flow
1. Extract message to schedule from request
2. Determine Process ID (from message or target)
3. Filter to only committed keys
4. Find appropriate scheduler server
5. If local: schedule via dev_scheduler_server
6. If remote: follow redirect or proxy
7. Verify message if required
8. For Process type: cache and upload to Arweave
9. Return assignment with slot numberGET Schedule Flow
1. Extract Process ID from request
2. Parse from/to slot range
3. Find appropriate scheduler server
4. If local: generate local schedule
5. If remote: fetch from remote and merge with local cache
6. Return assignments in requested formatNext Assignment Flow
1. Get cached assignments from process state
2. Get last processed slot
3. Check message cache, lookahead worker, or local cache
4. If not found: fetch from scheduler server
5. Validate next slot matches expected
6. Update process state with remaining assignments
7. Return next assignment and updated stateRedirect Handling
When a process's scheduler is remote, the device returns or follows redirects:
#{
<<"status">> => 307,
<<"location">> => <<"https://remote-scheduler.com/process-id">>,
<<"body">> => <<"Redirecting to scheduler: ...">>,
<<"variant">> => <<"ao.N.1">>
}Error Handling
Common Errors
Invalid Message:{error, #{
<<"status">> => 400,
<<"body">> => <<"Message invalid: Committed components cannot be validated.">>,
<<"reason">> => Err
}}{error, #{
<<"status">> => 400,
<<"body">> => <<"Message is not valid.">>,
<<"reason">> => <<"Given message is invalid.">>
}}{error, #{
<<"status">> => 404,
<<"reason">> => <<"Requested slot not yet available in schedule.">>
}}{error, #{
<<"status">> => 404,
<<"reason">> => <<"Received assignment slot does not match expected slot.">>,
<<"unexpected-slot">> => UnexpectedSlot,
<<"expected-slot">> => ExpectedSlot
}}References
- Scheduler Server -
dev_scheduler_server.erl - Scheduler Cache -
dev_scheduler_cache.erl - Scheduler Registry -
dev_scheduler_registry.erl - Format Conversion -
dev_scheduler_formats.erl - Process Device -
dev_process.erl - Gateway Client -
hb_gateway_client.erl
Notes
- Deterministic Ordering: Messages are assigned monotonically increasing slot numbers
- Lookahead Workers: Background processes prefetch next assignments for performance
- Local Caching: Assignments cached locally to reduce network requests
- Redirect Support: Transparent proxying to remote schedulers
- Verification Options: Configurable message verification before scheduling
- Arweave Upload: Processes automatically uploaded to Arweave for permanence
- Location Records: Scheduler locations registered and cached for discovery
- Nonce Validation: Location updates require increasing nonce values
- Format Support: Multiple response formats including AOS-2
- Checkpoint Support: State checkpointing for recovery
- Multi-Scheduler: Processes can specify multiple scheduler authorities
- TTL Management: Location records have configurable time-to-live