Process & Scheduling
A beginner's guide to stateful computation units in HyperBEAM
What You'll Learn
By the end of this tutorial, you'll understand:
- dev_process — The core AO process execution coordinator
- dev_scheduler — Message ordering and slot assignment
- dev_push — Recursive message delivery between processes
- dev_cron — Time-based scheduled execution
- Sub-devices — Workers, caching, and registry helpers
These devices form the computation layer that enables stateful, deterministic execution.
The Big Picture
Processes are the fundamental unit of computation in HyperBEAM:
┌─────────────────────────────────────────────┐
│ Process Lifecycle │
│ │
Message ──→ Scheduler ──→ Compute ──→ Results ──→ Push │
│ │ │ │ │
Assign Execute Cache Deliver │
Slot Code State Outbox │
│ │ │ │ │
┌──┴──┐ ┌──┴──┐ ┌──┴──┐ ┌──┴──┐ │
│Slot │ │WASM │ │Cache│ │Next │ │
│ 0-N │ │Lua │ │Store│ │Proc │ │
└─────┘ └─────┘ └─────┘ └─────┘ │
└─────────────────────────────────────────────┘Think of it like a factory assembly line:
- dev_scheduler = Ticket dispenser (assigns order numbers)
- dev_process = Assembly line manager (coordinates everything)
- dev_push = Delivery truck (sends outputs to other processes)
- dev_cron = Timer (triggers actions at intervals)
Let's build each piece.
Part 1: The Process Device
📖 Reference: dev_process
dev_process is the coordinator for AO process execution. It orchestrates scheduling, computation, caching, and message pushing.
Process Definition
Process = #{
<<"device">> => <<"process@1.0">>,
<<"scheduler-device">> => <<"scheduler@1.0">>,
<<"execution-device">> => <<"stack@1.0">>,
<<"execution-stack">> => [
<<"lua@5.3a">>,
<<"patch@1.0">>
],
<<"push-device">> => <<"push@1.0">>
}.Creating a Test Process
%% Initialize the process system
dev_process:init(),
%% Create a Lua process for testing
Process = dev_process:test_aos_process().
%% Or with custom options
Process = dev_process:test_aos_process(#{
process_cache_frequency => 1 % Cache every slot
}).
%% Or with custom execution stack
Process = dev_process:test_aos_process(#{}, [
<<"wasi@1.0">>,
<<"json-iface@1.0">>,
<<"wasm-64@1.0">>
]).Scheduling Messages
%% Schedule Lua code for execution
dev_process:schedule_aos_call(Process, <<"X = 42">>),
dev_process:schedule_aos_call(Process, <<"X = X + 1">>),
dev_process:schedule_aos_call(Process, <<"return X">>).Computing State
GET Mode — Normal Execution:%% Compute state at specific slot
{ok, State0} = hb_ao:resolve(
Process,
#{<<"path">> => <<"compute">>, <<"slot">> => 0},
#{}
).
%% Compute multiple slots
{ok, State1} = hb_ao:resolve(
Process,
#{<<"path">> => <<"compute">>, <<"slot">> => 1},
#{}
).
%% Get results
Result = hb_ao:get(<<"results/data">>, State1, #{}).
%% => <<"43">>%% Simulate execution without advancing state
{ok, DryResult} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"compute">>,
<<"method">> => <<"POST">>,
<<"dryrun">> => #{<<"data">> => <<"return 'test'">>}
},
#{}
).Getting Latest Results
%% Get most recent computed state
{ok, Latest} = hb_ao:resolve(Process, <<"now/results/data">>, #{}).Process API Endpoints
| Endpoint | Method | Description |
|---|---|---|
/ProcessID/schedule | POST | Add message to schedule |
/ProcessID/schedule | GET | View scheduled messages |
/ProcessID/compute?slot=N | GET | Compute state at slot N |
/ProcessID/compute | POST | Dryrun with message body |
/ProcessID/now | GET | Latest computed results |
/ProcessID/slot | GET | Current slot information |
/ProcessID/snapshot | GET | State snapshot |
/ProcessID/push | POST | Push outbox messages |
Part 2: The Scheduler Device
📖 Reference: dev_scheduler
dev_scheduler assigns monotonically increasing slot numbers to messages, ensuring deterministic ordering.
How Scheduling Works
Message 1 → Slot 0
Message 2 → Slot 1
Message 3 → Slot 2
...Scheduling a Message
%% Start scheduler
dev_scheduler:start(),
%% Create and schedule a process
Process = dev_scheduler:test_process(),
SignedProcess = hb_message:commit(Process, #{priv_wallet => Wallet}),
{ok, Assignment} = dev_scheduler:schedule(
#{},
#{<<"method">> => <<"POST">>, <<"body">> => SignedProcess},
#{priv_wallet => Wallet}
).
%% => #{<<"slot">> => 0, <<"timestamp">> => ..., <<"block-height">> => ...}Scheduling Messages to a Process
%% Schedule a message to target process
Message = hb_message:commit(#{
<<"type">> => <<"Message">>,
<<"target">> => ProcessID,
<<"action">> => <<"Eval">>,
<<"data">> => <<"return 42">>
}, #{priv_wallet => Wallet}),
{ok, Assignment} = dev_scheduler:schedule(
#{},
#{<<"method">> => <<"POST">>, <<"body">> => Message},
#{priv_wallet => Wallet}
).Getting Slot Information
{ok, SlotInfo} = dev_scheduler:slot(
#{},
#{<<"process">> => ProcessID},
#{}
).
%% => #{
%% <<"process">> => ProcessID,
%% <<"current">> => 42,
%% <<"timestamp">> => 1234567890,
%% <<"block-height">> => 1000000,
%% <<"block-hash">> => <<"hash...">>
%% }Getting the Schedule
%% Get assignments in a range
{ok, Schedule} = dev_scheduler:schedule(
#{},
#{
<<"method">> => <<"GET">>,
<<"process">> => ProcessID,
<<"from">> => 0,
<<"to">> => 100
},
#{}
).Scheduler Status
{ok, Status} = dev_scheduler:status(#{}, #{}, #{}).
%% => #{
%% <<"address">> => SchedulerAddress,
%% <<"processes">> => [ProcessID1, ProcessID2, ...]
%% }Sub-devices
| Sub-device | Purpose |
|---|---|
| dev_scheduler_server | Local schedule management |
| dev_scheduler_cache | Assignment caching |
| dev_scheduler_registry | Process registration |
| dev_scheduler_formats | Response format conversion |
Part 3: The Push Device
📖 Reference: dev_push
dev_push delivers messages from one process's outbox to other processes, recursively processing the entire message tree.
How Pushing Works
Process A computes → Outbox has messages for B, C
↓
Push extracts outbox
↓
Schedules messages to B, C
↓
Recursively pushes B's outbox, C's outbox
↓
Continues until no more messagesPush a Computed Slot
%% Push all outbox messages from slot 0
{ok, PushResult} = hb_ao:resolve(
Process,
#{<<"path">> => <<"push">>, <<"slot">> => 0},
#{}
).Push with Initial Message
%% Push a message directly
{ok, Result} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"push">>,
<<"method">> => <<"POST">>,
<<"body">> => hb_message:commit(#{
<<"target">> => TargetProcessID,
<<"action">> => <<"Transfer">>,
<<"amount">> => <<"100">>
}, #{priv_wallet => Wallet})
},
#{}
).Recursive Ping-Pong Example
%% Process that sends messages to itself
PingScript = <<"
Handlers.add('Ping',
{ Action = 'Ping' },
function(m)
Count = tonumber(m.Count) or 0
if Count < 10 then
Send({
Target = ao.id,
Action = 'Ping',
Count = Count + 1
})
end
end
)
Send({ Target = ao.id, Action = 'Ping', Count = 1 })
">>,
dev_process:schedule_aos_call(Process, PingScript),
{ok, Result} = hb_ao:resolve(
Process,
#{<<"path">> => <<"push">>, <<"slot">> => 0},
#{}
).
%% Recursively processes all 10 messagesPush Modes
| Mode | Behavior | Use Case |
|---|---|---|
sync | Wait for completion, return full tree | Interactive requests |
async | Fire and forget, return immediately | Background processing |
%% Async push (returns immediately)
{ok, _} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"push">>,
<<"slot">> => 0,
<<"push-mode">> => <<"async">>
},
#{}
).Result Depth Control
%% Only tree structure (minimal data)
{ok, TreeOnly} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"push">>,
<<"slot">> => 0,
<<"result-depth">> => 0
},
#{}
).
%% Full results for first level
{ok, WithResults} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"push">>,
<<"slot">> => 0,
<<"result-depth">> => 1
},
#{}
).Part 4: The Cron Device
📖 Reference: dev_cron
dev_cron enables time-based scheduling — processes can call themselves at intervals without external triggers.
One-Time Execution
%% Execute path once
{ok, TaskID} = dev_cron:once(
#{},
#{
<<"cron-path">> => <<"/process/notify">>,
<<"message">> => <<"Hello">>
},
#{}
).Recurring Execution
%% Execute every 30 seconds
{ok, TaskID} = dev_cron:every(
#{},
#{
<<"cron-path">> => <<"/process/heartbeat">>,
<<"interval">> => <<"30-seconds">>
},
#{}
).Interval Formats
| Format | Milliseconds | Example |
|---|---|---|
N-milliseconds | N | 100-milliseconds |
N-seconds | N × 1,000 | 30-seconds |
N-minutes | N × 60,000 | 5-minutes |
N-hours | N × 3,600,000 | 2-hours |
N-days | N × 86,400,000 | 1-day |
Stopping a Task
%% Stop by task ID
{ok, Response} = dev_cron:stop(
#{},
#{<<"task">> => TaskID},
#{}
).Common Use Cases
%% Periodic health check
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/process/health-check">>,
<<"interval">> => <<"5-minutes">>
}, #{}).
%% Daily cleanup
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/storage/cleanup">>,
<<"interval">> => <<"1-day">>
}, #{}).
%% Heartbeat/keep-alive
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/connection/heartbeat">>,
<<"interval">> => <<"30-seconds">>
}, #{}).Part 5: Workers and Caching
Process Workers
📖 Reference: dev_process_worker
Workers are persistent Erlang processes that maintain state in memory, avoiding repeated initialization.
%% Enable workers for computation
{ok, State} = hb_ao:resolve(
Process,
#{<<"path">> => <<"compute">>, <<"slot">> => 0},
#{spawn_worker => true, process_workers => true}
).- Avoid repeated WASM/Lua initialization
- Keep execution context loaded in memory
- Guarantee sequential message execution
- Enable concurrent process computation
Process Cache
📖 Reference: dev_process_cache
Process state is cached by slot number and message ID:
%% Cache paths
/computed/{ProcessID}/slot/{SlotNumber}
/computed/{ProcessID}/{MessageID}
/computed/{ProcessID}/snapshot/{Slot}#{
%% Time-based snapshots (production)
process_snapshot_time => 60, % Every 60 seconds
%% Slot-based snapshots (testing)
process_snapshot_slots => 1, % Every slot
%% Async caching
process_async_cache => true
}Try It: Complete Workflow
%%% File: test_dev4.erl
-module(test_dev4).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
%% Run with: rebar3 eunit --module=test_dev4
process_basic_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
%% Schedule some computations
dev_process:schedule_aos_call(Process, <<"X = 10">>),
dev_process:schedule_aos_call(Process, <<"X = X * 2">>),
dev_process:schedule_aos_call(Process, <<"return X">>),
%% Compute each slot
{ok, _} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 0}, #{}),
{ok, _} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 1}, #{}),
{ok, State2} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 2}, #{}),
?assertEqual(<<"20">>, hb_ao:get(<<"results/data">>, State2, #{})),
?debugFmt("Process compute: X=10, X*2, return X => ~s", [<<"20">>]).
process_now_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"return 'hello'">>),
{ok, Result} = hb_ao:resolve(Process, <<"now/results/data">>, #{}),
?assertEqual(<<"hello">>, Result),
?debugFmt("Process now: OK", []).
process_dryrun_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
%% Dryrun doesn't advance state
{ok, DryResult} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"compute">>,
<<"method">> => <<"POST">>,
<<"dryrun">> => #{<<"data">> => <<"return 99">>}
},
#{}
),
?assert(is_map(DryResult)),
?debugFmt("Dryrun: OK", []).
scheduler_test() ->
dev_scheduler:start(),
Opts = #{priv_wallet => hb:wallet(), store => hb_opts:get(store)},
%% Create and schedule a process
Process = dev_scheduler:test_process(),
SignedProcess = hb_message:commit(Process, Opts),
{ok, Assignment} = dev_scheduler:schedule(
#{},
#{<<"method">> => <<"POST">>, <<"body">> => SignedProcess},
Opts
),
?assert(maps:is_key(<<"slot">>, Assignment)),
?debugFmt("Scheduler assignment: slot=~p", [maps:get(<<"slot">>, Assignment)]).
scheduler_status_test() ->
dev_scheduler:start(),
{ok, Status} = dev_scheduler:status(#{}, #{}, #{}),
?assert(maps:is_key(<<"address">>, Status)),
?assert(maps:is_key(<<"processes">>, Status)),
?debugFmt("Scheduler status: OK", []).
cron_once_test() ->
{ok, TaskID} = dev_cron:once(
#{},
#{<<"cron-path">> => <<"/test/path">>},
#{}
),
?assert(is_binary(TaskID)),
%% Stop it
{ok, _} = dev_cron:stop(#{}, #{<<"task">> => TaskID}, #{}),
?debugFmt("Cron once: OK", []).
cron_every_test() ->
{ok, TaskID} = dev_cron:every(
#{},
#{
<<"cron-path">> => <<"/test/heartbeat">>,
<<"interval">> => <<"500-milliseconds">>
},
#{}
),
?assert(is_binary(TaskID)),
%% Let it run briefly
timer:sleep(100),
%% Stop it
{ok, _} = dev_cron:stop(#{}, #{<<"task">> => TaskID}, #{}),
?debugFmt("Cron every: OK", []).
complete_workflow_test() ->
?debugFmt("=== Complete Process Workflow ===", []),
%% 1. Initialize
dev_process:init(),
Process = dev_process:test_aos_process(),
?debugFmt("1. Created AOS process", []),
%% 2. Schedule a handler that sends messages
Script = <<"
Counter = Counter or 0
Counter = Counter + 1
return 'Count: ' .. Counter
">>,
dev_process:schedule_aos_call(Process, Script),
dev_process:schedule_aos_call(Process, Script),
dev_process:schedule_aos_call(Process, Script),
?debugFmt("2. Scheduled 3 computations", []),
%% 3. Compute slots
{ok, _} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 0}, #{}),
{ok, _} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 1}, #{}),
{ok, State} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 2}, #{}),
?debugFmt("3. Computed all slots", []),
%% 4. Check result
Result = hb_ao:get(<<"results/data">>, State, #{}),
?assertEqual(<<"Count: 3">>, Result),
?debugFmt("4. Result: ~s", [Result]),
?debugFmt("=== All tests passed! ===", []).Run the Tests
rebar3 eunit --module=test_dev4Common Patterns
Pattern 1: Stateful Counter
Process = dev_process:test_aos_process(),
%% Initialize counter
dev_process:schedule_aos_call(Process, <<"
Counter = 0
Handlers.add('Increment',
{ Action = 'Increment' },
function(m)
Counter = Counter + 1
return Counter
end
)
">>),
%% Increment multiple times
dev_process:schedule_aos_call(Process, <<"return Counter">>). % 0
%% ... send Increment action messagesPattern 2: Inter-Process Communication
%% Process A sends to Process B
ScriptA = <<"
Handlers.add('Trigger',
{ Action = 'Trigger' },
function(m)
Send({
Target = '", ProcessB_ID/binary, "',
Action = 'DoWork',
Data = 'from A'
})
end
)
">>,
dev_process:schedule_aos_call(ProcessA, ScriptA),
%% Push will deliver to ProcessB
{ok, _} = hb_ao:resolve(ProcessA, #{<<"path">> => <<"push">>, <<"slot">> => 0}, #{}).Pattern 3: Scheduled Cleanup
%% Setup cleanup cron job
{ok, TaskID} = dev_cron:every(
#{},
#{
<<"cron-path">> => <<"/", ProcessID/binary, "~process@1.0/compute">>,
<<"interval">> => <<"1-hour">>
},
#{}
),
%% The process has a cleanup handler
CleanupScript = <<"
Handlers.add('Cleanup',
function(m) return true end,
function(m)
-- Remove old entries
OldEntries = {}
for k, v in pairs(State) do
if v.timestamp < (os.time() - 86400) then
OldEntries[k] = nil
end
end
end
)
">>.Pattern 4: Worker-Optimized Batch
%% Process many messages with persistent worker
dev_process:init(),
Process = dev_process:test_aos_process(),
%% Schedule 100 messages
lists:foreach(fun(I) ->
Code = iolist_to_binary([<<"N = ">>, integer_to_binary(I)]),
dev_process:schedule_aos_call(Process, Code)
end, lists:seq(1, 100)),
%% Compute with worker (stays in memory)
lists:foreach(fun(Slot) ->
hb_ao:resolve(
Process,
#{<<"path">> => <<"compute">>, <<"slot">> => Slot},
#{process_workers => true}
)
end, lists:seq(0, 99)).Quick Reference Card
📖 Reference: dev_process | dev_scheduler | dev_push | dev_cron
%% === PROCESS DEVICE ===
%% Create test process
dev_process:init(),
Process = dev_process:test_aos_process().
%% Schedule Lua code
dev_process:schedule_aos_call(Process, <<"return 42">>).
%% Compute at slot
{ok, State} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 0}, #{}).
%% Get latest
{ok, Latest} = hb_ao:resolve(Process, <<"now/results/data">>, #{}).
%% Dryrun (no state change)
{ok, Dry} = hb_ao:resolve(Process, #{
<<"path">> => <<"compute">>,
<<"method">> => <<"POST">>,
<<"dryrun">> => #{<<"data">> => Code}
}, #{}).
%% With worker
{ok, State} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 0}, #{
process_workers => true, spawn_worker => true
}).
%% === SCHEDULER DEVICE ===
dev_scheduler:start().
%% Schedule message
{ok, Assignment} = dev_scheduler:schedule(#{}, #{
<<"method">> => <<"POST">>,
<<"body">> => SignedMessage
}, Opts).
%% Get slot info
{ok, SlotInfo} = dev_scheduler:slot(#{}, #{<<"process">> => ProcessID}, #{}).
%% Get schedule range
{ok, Schedule} = dev_scheduler:schedule(#{}, #{
<<"method">> => <<"GET">>,
<<"process">> => ProcessID,
<<"from">> => 0, <<"to">> => 100
}, #{}).
%% Scheduler status
{ok, Status} = dev_scheduler:status(#{}, #{}, #{}).
%% === PUSH DEVICE ===
%% Push slot
{ok, Result} = hb_ao:resolve(Process, #{<<"path">> => <<"push">>, <<"slot">> => 0}, #{}).
%% Push message
{ok, Result} = hb_ao:resolve(Process, #{
<<"path">> => <<"push">>,
<<"method">> => <<"POST">>,
<<"body">> => SignedMessage
}, #{}).
%% Async push
{ok, _} = hb_ao:resolve(Process, #{
<<"path">> => <<"push">>,
<<"slot">> => 0,
<<"push-mode">> => <<"async">>
}, #{}).
%% === CRON DEVICE ===
%% One-time
{ok, TaskID} = dev_cron:once(#{}, #{<<"cron-path">> => <<"/path">>}, #{}).
%% Recurring
{ok, TaskID} = dev_cron:every(#{}, #{
<<"cron-path">> => <<"/path">>,
<<"interval">> => <<"30-seconds">>
}, #{}).
%% Stop task
{ok, _} = dev_cron:stop(#{}, #{<<"task">> => TaskID}, #{}).What's Next?
You now understand the computation layer:
| Device | Purpose | Key Feature |
|---|---|---|
| dev_process | Execution coordinator | schedule/compute/now/push |
| dev_scheduler | Message ordering | Slot assignment |
| dev_push | Message delivery | Recursive outbox processing |
| dev_cron | Time-based execution | once/every intervals |
Sub-devices
| Sub-device | Purpose |
|---|---|
| dev_process_worker | Persistent workers |
| dev_process_cache | State caching |
| dev_scheduler_server | Local scheduling |
| dev_scheduler_cache | Assignment cache |
| dev_scheduler_registry | Process registry |
| dev_scheduler_formats | Format conversion |
Going Further
- Store — Data persistence and caching (Tutorial)
- Runtimes — Execute WASM and Lua code (Tutorial)
- Payment — Metering and economics (Tutorial)