Skip to content

Process & Scheduling

A beginner's guide to stateful computation units in HyperBEAM


What You'll Learn

By the end of this tutorial, you'll understand:

  1. dev_process — The core AO process execution coordinator
  2. dev_scheduler — Message ordering and slot assignment
  3. dev_push — Recursive message delivery between processes
  4. dev_cron — Time-based scheduled execution
  5. Sub-devices — Workers, caching, and registry helpers

These devices form the computation layer that enables stateful, deterministic execution.


The Big Picture

Processes are the fundamental unit of computation in HyperBEAM:

                    ┌─────────────────────────────────────────────┐
                    │               Process Lifecycle             │
                    │                                             │
   Message ──→ Scheduler ──→ Compute ──→ Results ──→ Push        │
                 │              │           │          │          │
              Assign         Execute     Cache      Deliver       │
              Slot           Code        State      Outbox        │
                 │              │           │          │          │
              ┌──┴──┐        ┌──┴──┐     ┌──┴──┐    ┌──┴──┐      │
              │Slot │        │WASM │     │Cache│    │Next │      │
              │ 0-N │        │Lua  │     │Store│    │Proc │      │
              └─────┘        └─────┘     └─────┘    └─────┘      │
                    └─────────────────────────────────────────────┘

Think of it like a factory assembly line:

  • dev_scheduler = Ticket dispenser (assigns order numbers)
  • dev_process = Assembly line manager (coordinates everything)
  • dev_push = Delivery truck (sends outputs to other processes)
  • dev_cron = Timer (triggers actions at intervals)

Let's build each piece.


Part 1: The Process Device

📖 Reference: dev_process

dev_process is the coordinator for AO process execution. It orchestrates scheduling, computation, caching, and message pushing.

Process Definition

Process = #{
    <<"device">> => <<"process@1.0">>,
    <<"scheduler-device">> => <<"scheduler@1.0">>,
    <<"execution-device">> => <<"stack@1.0">>,
    <<"execution-stack">> => [
        <<"lua@5.3a">>,
        <<"patch@1.0">>
    ],
    <<"push-device">> => <<"push@1.0">>
}.

Creating a Test Process

%% Initialize the process system
dev_process:init(),
 
%% Create a Lua process for testing
Process = dev_process:test_aos_process().
 
%% Or with custom options
Process = dev_process:test_aos_process(#{
    process_cache_frequency => 1  % Cache every slot
}).
 
%% Or with custom execution stack
Process = dev_process:test_aos_process(#{}, [
    <<"wasi@1.0">>,
    <<"json-iface@1.0">>,
    <<"wasm-64@1.0">>
]).

Scheduling Messages

%% Schedule Lua code for execution
dev_process:schedule_aos_call(Process, <<"X = 42">>),
dev_process:schedule_aos_call(Process, <<"X = X + 1">>),
dev_process:schedule_aos_call(Process, <<"return X">>).

Computing State

GET Mode — Normal Execution:
%% Compute state at specific slot
{ok, State0} = hb_ao:resolve(
    Process,
    #{<<"path">> => <<"compute">>, <<"slot">> => 0},
    #{}
).
 
%% Compute multiple slots
{ok, State1} = hb_ao:resolve(
    Process,
    #{<<"path">> => <<"compute">>, <<"slot">> => 1},
    #{}
).
 
%% Get results
Result = hb_ao:get(<<"results/data">>, State1, #{}).
%% => <<"43">>
POST Mode — Dryrun (no state change):
%% Simulate execution without advancing state
{ok, DryResult} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"compute">>,
        <<"method">> => <<"POST">>,
        <<"dryrun">> => #{<<"data">> => <<"return 'test'">>}
    },
    #{}
).

Getting Latest Results

%% Get most recent computed state
{ok, Latest} = hb_ao:resolve(Process, <<"now/results/data">>, #{}).

Process API Endpoints

EndpointMethodDescription
/ProcessID/schedulePOSTAdd message to schedule
/ProcessID/scheduleGETView scheduled messages
/ProcessID/compute?slot=NGETCompute state at slot N
/ProcessID/computePOSTDryrun with message body
/ProcessID/nowGETLatest computed results
/ProcessID/slotGETCurrent slot information
/ProcessID/snapshotGETState snapshot
/ProcessID/pushPOSTPush outbox messages

Part 2: The Scheduler Device

📖 Reference: dev_scheduler

dev_scheduler assigns monotonically increasing slot numbers to messages, ensuring deterministic ordering.

How Scheduling Works

Message 1 → Slot 0
Message 2 → Slot 1
Message 3 → Slot 2
...

Scheduling a Message

%% Start scheduler
dev_scheduler:start(),
 
%% Create and schedule a process
Process = dev_scheduler:test_process(),
SignedProcess = hb_message:commit(Process, #{priv_wallet => Wallet}),
 
{ok, Assignment} = dev_scheduler:schedule(
    #{},
    #{<<"method">> => <<"POST">>, <<"body">> => SignedProcess},
    #{priv_wallet => Wallet}
).
%% => #{<<"slot">> => 0, <<"timestamp">> => ..., <<"block-height">> => ...}

Scheduling Messages to a Process

%% Schedule a message to target process
Message = hb_message:commit(#{
    <<"type">> => <<"Message">>,
    <<"target">> => ProcessID,
    <<"action">> => <<"Eval">>,
    <<"data">> => <<"return 42">>
}, #{priv_wallet => Wallet}),
 
{ok, Assignment} = dev_scheduler:schedule(
    #{},
    #{<<"method">> => <<"POST">>, <<"body">> => Message},
    #{priv_wallet => Wallet}
).

Getting Slot Information

{ok, SlotInfo} = dev_scheduler:slot(
    #{},
    #{<<"process">> => ProcessID},
    #{}
).
%% => #{
%%     <<"process">> => ProcessID,
%%     <<"current">> => 42,
%%     <<"timestamp">> => 1234567890,
%%     <<"block-height">> => 1000000,
%%     <<"block-hash">> => <<"hash...">>
%% }

Getting the Schedule

%% Get assignments in a range
{ok, Schedule} = dev_scheduler:schedule(
    #{},
    #{
        <<"method">> => <<"GET">>,
        <<"process">> => ProcessID,
        <<"from">> => 0,
        <<"to">> => 100
    },
    #{}
).

Scheduler Status

{ok, Status} = dev_scheduler:status(#{}, #{}, #{}).
%% => #{
%%     <<"address">> => SchedulerAddress,
%%     <<"processes">> => [ProcessID1, ProcessID2, ...]
%% }

Sub-devices

Sub-devicePurpose
dev_scheduler_serverLocal schedule management
dev_scheduler_cacheAssignment caching
dev_scheduler_registryProcess registration
dev_scheduler_formatsResponse format conversion

Part 3: The Push Device

📖 Reference: dev_push

dev_push delivers messages from one process's outbox to other processes, recursively processing the entire message tree.

How Pushing Works

Process A computes → Outbox has messages for B, C

Push extracts outbox

Schedules messages to B, C

Recursively pushes B's outbox, C's outbox

Continues until no more messages

Push a Computed Slot

%% Push all outbox messages from slot 0
{ok, PushResult} = hb_ao:resolve(
    Process,
    #{<<"path">> => <<"push">>, <<"slot">> => 0},
    #{}
).

Push with Initial Message

%% Push a message directly
{ok, Result} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"push">>,
        <<"method">> => <<"POST">>,
        <<"body">> => hb_message:commit(#{
            <<"target">> => TargetProcessID,
            <<"action">> => <<"Transfer">>,
            <<"amount">> => <<"100">>
        }, #{priv_wallet => Wallet})
    },
    #{}
).

Recursive Ping-Pong Example

%% Process that sends messages to itself
PingScript = <<"
    Handlers.add('Ping',
        { Action = 'Ping' },
        function(m)
            Count = tonumber(m.Count) or 0
            if Count < 10 then
                Send({ 
                    Target = ao.id, 
                    Action = 'Ping', 
                    Count = Count + 1 
                })
            end
        end
    )
    Send({ Target = ao.id, Action = 'Ping', Count = 1 })
">>,
 
dev_process:schedule_aos_call(Process, PingScript),
{ok, Result} = hb_ao:resolve(
    Process,
    #{<<"path">> => <<"push">>, <<"slot">> => 0},
    #{}
).
%% Recursively processes all 10 messages

Push Modes

ModeBehaviorUse Case
syncWait for completion, return full treeInteractive requests
asyncFire and forget, return immediatelyBackground processing
%% Async push (returns immediately)
{ok, _} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"push">>,
        <<"slot">> => 0,
        <<"push-mode">> => <<"async">>
    },
    #{}
).

Result Depth Control

%% Only tree structure (minimal data)
{ok, TreeOnly} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"push">>,
        <<"slot">> => 0,
        <<"result-depth">> => 0
    },
    #{}
).
 
%% Full results for first level
{ok, WithResults} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"push">>,
        <<"slot">> => 0,
        <<"result-depth">> => 1
    },
    #{}
).

Part 4: The Cron Device

📖 Reference: dev_cron

dev_cron enables time-based scheduling — processes can call themselves at intervals without external triggers.

One-Time Execution

%% Execute path once
{ok, TaskID} = dev_cron:once(
    #{},
    #{
        <<"cron-path">> => <<"/process/notify">>,
        <<"message">> => <<"Hello">>
    },
    #{}
).

Recurring Execution

%% Execute every 30 seconds
{ok, TaskID} = dev_cron:every(
    #{},
    #{
        <<"cron-path">> => <<"/process/heartbeat">>,
        <<"interval">> => <<"30-seconds">>
    },
    #{}
).

Interval Formats

FormatMillisecondsExample
N-millisecondsN100-milliseconds
N-secondsN × 1,00030-seconds
N-minutesN × 60,0005-minutes
N-hoursN × 3,600,0002-hours
N-daysN × 86,400,0001-day

Stopping a Task

%% Stop by task ID
{ok, Response} = dev_cron:stop(
    #{},
    #{<<"task">> => TaskID},
    #{}
).

Common Use Cases

%% Periodic health check
dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/process/health-check">>,
    <<"interval">> => <<"5-minutes">>
}, #{}).
 
%% Daily cleanup
dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/storage/cleanup">>,
    <<"interval">> => <<"1-day">>
}, #{}).
 
%% Heartbeat/keep-alive
dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/connection/heartbeat">>,
    <<"interval">> => <<"30-seconds">>
}, #{}).

Part 5: Workers and Caching

Process Workers

📖 Reference: dev_process_worker

Workers are persistent Erlang processes that maintain state in memory, avoiding repeated initialization.

%% Enable workers for computation
{ok, State} = hb_ao:resolve(
    Process,
    #{<<"path">> => <<"compute">>, <<"slot">> => 0},
    #{spawn_worker => true, process_workers => true}
).
Benefits:
  • Avoid repeated WASM/Lua initialization
  • Keep execution context loaded in memory
  • Guarantee sequential message execution
  • Enable concurrent process computation

Process Cache

📖 Reference: dev_process_cache

Process state is cached by slot number and message ID:

%% Cache paths
/computed/{ProcessID}/slot/{SlotNumber}
/computed/{ProcessID}/{MessageID}
/computed/{ProcessID}/snapshot/{Slot}
Cache Options:
#{
    %% Time-based snapshots (production)
    process_snapshot_time => 60,  % Every 60 seconds
    
    %% Slot-based snapshots (testing)
    process_snapshot_slots => 1,  % Every slot
    
    %% Async caching
    process_async_cache => true
}

Try It: Complete Workflow

%%% File: test_dev4.erl
-module(test_dev4).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
 
%% Run with: rebar3 eunit --module=test_dev4
 
process_basic_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    
    %% Schedule some computations
    dev_process:schedule_aos_call(Process, <<"X = 10">>),
    dev_process:schedule_aos_call(Process, <<"X = X * 2">>),
    dev_process:schedule_aos_call(Process, <<"return X">>),
    
    %% Compute each slot
    {ok, _} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 0}, #{}),
    {ok, _} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 1}, #{}),
    {ok, State2} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 2}, #{}),
    
    ?assertEqual(<<"20">>, hb_ao:get(<<"results/data">>, State2, #{})),
    ?debugFmt("Process compute: X=10, X*2, return X => ~s", [<<"20">>]).
 
process_now_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    
    dev_process:schedule_aos_call(Process, <<"return 'hello'">>),
    
    {ok, Result} = hb_ao:resolve(Process, <<"now/results/data">>, #{}),
    ?assertEqual(<<"hello">>, Result),
    ?debugFmt("Process now: OK", []).
 
process_dryrun_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    
    %% Dryrun doesn't advance state
    {ok, DryResult} = hb_ao:resolve(
        Process,
        #{
            <<"path">> => <<"compute">>,
            <<"method">> => <<"POST">>,
            <<"dryrun">> => #{<<"data">> => <<"return 99">>}
        },
        #{}
    ),
    ?assert(is_map(DryResult)),
    ?debugFmt("Dryrun: OK", []).
 
scheduler_test() ->
    dev_scheduler:start(),
    Opts = #{priv_wallet => hb:wallet(), store => hb_opts:get(store)},
    
    %% Create and schedule a process
    Process = dev_scheduler:test_process(),
    SignedProcess = hb_message:commit(Process, Opts),
    
    {ok, Assignment} = dev_scheduler:schedule(
        #{},
        #{<<"method">> => <<"POST">>, <<"body">> => SignedProcess},
        Opts
    ),
    ?assert(maps:is_key(<<"slot">>, Assignment)),
    ?debugFmt("Scheduler assignment: slot=~p", [maps:get(<<"slot">>, Assignment)]).
 
scheduler_status_test() ->
    dev_scheduler:start(),
    {ok, Status} = dev_scheduler:status(#{}, #{}, #{}),
    ?assert(maps:is_key(<<"address">>, Status)),
    ?assert(maps:is_key(<<"processes">>, Status)),
    ?debugFmt("Scheduler status: OK", []).
 
cron_once_test() ->
    {ok, TaskID} = dev_cron:once(
        #{},
        #{<<"cron-path">> => <<"/test/path">>},
        #{}
    ),
    ?assert(is_binary(TaskID)),
    
    %% Stop it
    {ok, _} = dev_cron:stop(#{}, #{<<"task">> => TaskID}, #{}),
    ?debugFmt("Cron once: OK", []).
 
cron_every_test() ->
    {ok, TaskID} = dev_cron:every(
        #{},
        #{
            <<"cron-path">> => <<"/test/heartbeat">>,
            <<"interval">> => <<"500-milliseconds">>
        },
        #{}
    ),
    ?assert(is_binary(TaskID)),
    
    %% Let it run briefly
    timer:sleep(100),
    
    %% Stop it
    {ok, _} = dev_cron:stop(#{}, #{<<"task">> => TaskID}, #{}),
    ?debugFmt("Cron every: OK", []).
 
complete_workflow_test() ->
    ?debugFmt("=== Complete Process Workflow ===", []),
    
    %% 1. Initialize
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    ?debugFmt("1. Created AOS process", []),
    
    %% 2. Schedule a handler that sends messages
    Script = <<"
        Counter = Counter or 0
        Counter = Counter + 1
        return 'Count: ' .. Counter
    ">>,
    dev_process:schedule_aos_call(Process, Script),
    dev_process:schedule_aos_call(Process, Script),
    dev_process:schedule_aos_call(Process, Script),
    ?debugFmt("2. Scheduled 3 computations", []),
    
    %% 3. Compute slots
    {ok, _} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 0}, #{}),
    {ok, _} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 1}, #{}),
    {ok, State} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 2}, #{}),
    ?debugFmt("3. Computed all slots", []),
    
    %% 4. Check result
    Result = hb_ao:get(<<"results/data">>, State, #{}),
    ?assertEqual(<<"Count: 3">>, Result),
    ?debugFmt("4. Result: ~s", [Result]),
    
    ?debugFmt("=== All tests passed! ===", []).

Run the Tests

rebar3 eunit --module=test_dev4

Common Patterns

Pattern 1: Stateful Counter

Process = dev_process:test_aos_process(),
 
%% Initialize counter
dev_process:schedule_aos_call(Process, <<"
    Counter = 0
    Handlers.add('Increment',
        { Action = 'Increment' },
        function(m)
            Counter = Counter + 1
            return Counter
        end
    )
">>),
 
%% Increment multiple times
dev_process:schedule_aos_call(Process, <<"return Counter">>).  % 0
%% ... send Increment action messages

Pattern 2: Inter-Process Communication

%% Process A sends to Process B
ScriptA = <<"
    Handlers.add('Trigger',
        { Action = 'Trigger' },
        function(m)
            Send({
                Target = '", ProcessB_ID/binary, "',
                Action = 'DoWork',
                Data = 'from A'
            })
        end
    )
">>,
 
dev_process:schedule_aos_call(ProcessA, ScriptA),
 
%% Push will deliver to ProcessB
{ok, _} = hb_ao:resolve(ProcessA, #{<<"path">> => <<"push">>, <<"slot">> => 0}, #{}).

Pattern 3: Scheduled Cleanup

%% Setup cleanup cron job
{ok, TaskID} = dev_cron:every(
    #{},
    #{
        <<"cron-path">> => <<"/", ProcessID/binary, "~process@1.0/compute">>,
        <<"interval">> => <<"1-hour">>
    },
    #{}
),
 
%% The process has a cleanup handler
CleanupScript = <<"
    Handlers.add('Cleanup',
        function(m) return true end,
        function(m)
            -- Remove old entries
            OldEntries = {}
            for k, v in pairs(State) do
                if v.timestamp < (os.time() - 86400) then
                    OldEntries[k] = nil
                end
            end
        end
    )
">>.

Pattern 4: Worker-Optimized Batch

%% Process many messages with persistent worker
dev_process:init(),
Process = dev_process:test_aos_process(),
 
%% Schedule 100 messages
lists:foreach(fun(I) ->
    Code = iolist_to_binary([<<"N = ">>, integer_to_binary(I)]),
    dev_process:schedule_aos_call(Process, Code)
end, lists:seq(1, 100)),
 
%% Compute with worker (stays in memory)
lists:foreach(fun(Slot) ->
    hb_ao:resolve(
        Process,
        #{<<"path">> => <<"compute">>, <<"slot">> => Slot},
        #{process_workers => true}
    )
end, lists:seq(0, 99)).

Quick Reference Card

📖 Reference: dev_process | dev_scheduler | dev_push | dev_cron

%% === PROCESS DEVICE ===
%% Create test process
dev_process:init(),
Process = dev_process:test_aos_process().
 
%% Schedule Lua code
dev_process:schedule_aos_call(Process, <<"return 42">>).
 
%% Compute at slot
{ok, State} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 0}, #{}).
 
%% Get latest
{ok, Latest} = hb_ao:resolve(Process, <<"now/results/data">>, #{}).
 
%% Dryrun (no state change)
{ok, Dry} = hb_ao:resolve(Process, #{
    <<"path">> => <<"compute">>,
    <<"method">> => <<"POST">>,
    <<"dryrun">> => #{<<"data">> => Code}
}, #{}).
 
%% With worker
{ok, State} = hb_ao:resolve(Process, #{<<"path">> => <<"compute">>, <<"slot">> => 0}, #{
    process_workers => true, spawn_worker => true
}).
 
%% === SCHEDULER DEVICE ===
dev_scheduler:start().
 
%% Schedule message
{ok, Assignment} = dev_scheduler:schedule(#{}, #{
    <<"method">> => <<"POST">>,
    <<"body">> => SignedMessage
}, Opts).
 
%% Get slot info
{ok, SlotInfo} = dev_scheduler:slot(#{}, #{<<"process">> => ProcessID}, #{}).
 
%% Get schedule range
{ok, Schedule} = dev_scheduler:schedule(#{}, #{
    <<"method">> => <<"GET">>,
    <<"process">> => ProcessID,
    <<"from">> => 0, <<"to">> => 100
}, #{}).
 
%% Scheduler status
{ok, Status} = dev_scheduler:status(#{}, #{}, #{}).
 
%% === PUSH DEVICE ===
%% Push slot
{ok, Result} = hb_ao:resolve(Process, #{<<"path">> => <<"push">>, <<"slot">> => 0}, #{}).
 
%% Push message
{ok, Result} = hb_ao:resolve(Process, #{
    <<"path">> => <<"push">>,
    <<"method">> => <<"POST">>,
    <<"body">> => SignedMessage
}, #{}).
 
%% Async push
{ok, _} = hb_ao:resolve(Process, #{
    <<"path">> => <<"push">>,
    <<"slot">> => 0,
    <<"push-mode">> => <<"async">>
}, #{}).
 
%% === CRON DEVICE ===
%% One-time
{ok, TaskID} = dev_cron:once(#{}, #{<<"cron-path">> => <<"/path">>}, #{}).
 
%% Recurring
{ok, TaskID} = dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/path">>,
    <<"interval">> => <<"30-seconds">>
}, #{}).
 
%% Stop task
{ok, _} = dev_cron:stop(#{}, #{<<"task">> => TaskID}, #{}).

What's Next?

You now understand the computation layer:

DevicePurposeKey Feature
dev_processExecution coordinatorschedule/compute/now/push
dev_schedulerMessage orderingSlot assignment
dev_pushMessage deliveryRecursive outbox processing
dev_cronTime-based executiononce/every intervals

Sub-devices

Sub-devicePurpose
dev_process_workerPersistent workers
dev_process_cacheState caching
dev_scheduler_serverLocal scheduling
dev_scheduler_cacheAssignment cache
dev_scheduler_registryProcess registry
dev_scheduler_formatsFormat conversion

Going Further

  1. Store — Data persistence and caching (Tutorial)
  2. Runtimes — Execute WASM and Lua code (Tutorial)
  3. Payment — Metering and economics (Tutorial)

Resources

HyperBEAM Documentation

Related Tutorials