dev_process.erl - AO Process Device Implementation
Overview
Purpose: Core AO process execution coordinator
Module: dev_process
Pattern: Schedule → Compute → Cache → Push
Device: Process@1.0
This module implements the complete AO process device, routing requests between scheduling, execution, and pushing components. It manages process state caching, supports persistent workers, and provides immutable references to process interaction history through cache paths.
Architecture
External API:
/ID/Schedule → Schedule messages
/ID/Compute/Slot → Compute state at slot
/ID/Now → Latest results
/ID/Snapshot → State snapshot
Internal Flow:
Schedule Device → Execution Stack → Cache → Push DeviceDependencies
- HyperBEAM:
hb_ao,hb_util,hb_maps,hb_message,hb_cache,hb_private,hb_path,hb_opts,hb_http,hb_http_server,hb_client - Worker:
dev_process_worker,dev_process_cache - Arweave:
ar_wallet,ar_bundles - Devices: Scheduler, execution stack, push devices
- Includes:
include/hb.hrl
Configuration
Process Definition
#{
<<"device">> => <<"process@1.0">>,
<<"scheduler-device">> => <<"scheduler@1.0">>,
<<"execution-device">> => <<"stack@1.0">>,
<<"execution-stack">> => [
<<"scheduler@1.0">>,
<<"cron@1.0">>,
<<"wasm@1.0">>,
<<"poda@1.0">>
],
<<"push-device">> => <<"push@1.0">>,
% Runtime options
<<"process/cache-frequency">> => 10,
<<"process/cache-keys">> => [<<"/results">>, <<"/state">>]
}Runtime Options
Cache Control:process_snapshot_slots- Slots between state snapshots (default: undefined)process_snapshot_time- Seconds between snapshots (default: 60 in prod, undefined in test)process_cache_frequency- Legacy slot-based cachingprocess_async_cache- Async caching (default: true)
process_workers- Enable persistent workers (default: true)spawn_worker- Spawn worker for this computation
Public Functions Overview
%% Core API
-spec info(Msg) -> ProcessInfo.
-spec schedule(Msg1, Msg2, Opts) -> {ok, ScheduleResponse}.
-spec compute(Msg1, Msg2, Opts) -> {ok, ComputedState}.
-spec now(Msg1, Msg2, Opts) -> {ok, LatestResults}.
-spec slot(Msg1, Msg2, Opts) -> {ok, SlotInfo}.
-spec push(Msg1, Msg2, Opts) -> {ok, PushedResults}.
-spec snapshot(Msg1, Msg2, Opts) -> {ok, StateSnapshot}.
%% Device Routing
-spec as(Msg1, Msg2, Opts) -> {ok, MsgWithDevice}.
-spec as_process(Msg, Opts) -> ProcessMsg.
%% Utilities
-spec process_id(Msg1, Msg2, Opts) -> ProcessID.
-spec ensure_process_key(Msg, Opts) -> MsgWithProcess.
%% Test Helpers
-spec test_aos_process() -> ProcessMsg.
-spec test_aos_process(Opts) -> ProcessMsg.
-spec test_aos_process(Opts, Stack) -> ProcessMsg.
-spec test_wasm_process(WASMPath) -> ProcessMsg.
-spec schedule_aos_call(Process, LuaCode) -> ok.
-spec schedule_aos_call(Process, LuaCode, Opts) -> ok.Public Functions
1. info/1
-spec info(Msg) -> #{
worker => WorkerFun,
grouper => GrouperFun,
await => AwaitFun,
exports => [binary()]
}
when
Msg :: map().Description: Return process device metadata including worker functions and exported paths.
Exports:info- Device informationas- Device swappingcompute- State computationnow- Latest resultsschedule- Message schedulingslot- Slot informationsnapshot- State snapshotpush- Message pushing
-module(dev_process_info_test).
-include_lib("eunit/include/eunit.hrl").
info_exports_test() ->
Info = dev_process:info(#{}),
Exports = maps:get(exports, Info),
?assert(lists:member(<<"compute">>, Exports)),
?assert(lists:member(<<"schedule">>, Exports)),
?assert(lists:member(<<"now">>, Exports)).
info_worker_test() ->
Info = dev_process:info(#{}),
?assert(maps:is_key(worker, Info)),
?assert(is_function(maps:get(worker, Info), 3)).2. schedule/3
-spec schedule(Msg1, Msg2, Opts) -> {ok, Response}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Response :: map().Description: Schedule messages for future execution. Routes to scheduler device.
Paths:POST /ProcessID/schedule- Add message to scheduleGET /ProcessID/schedule- View scheduled messages
-module(dev_process_schedule_test).
-include_lib("eunit/include/eunit.hrl").
schedule_message_test() ->
% schedule requires proper process setup with signed messages
% Verify module exports schedule/3
code:ensure_loaded(dev_process),
?assert(erlang:function_exported(dev_process, schedule, 3)).
schedule_multiple_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"X = 1">>),
dev_process:schedule_aos_call(Process, <<"X = 2">>),
dev_process:schedule_aos_call(Process, <<"return X">>),
{ok, Result} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"now/results/data">> },
#{}
),
?assertEqual(<<"2">>, Result).3. compute/3
-spec compute(Msg1, Msg2, Opts) -> {ok, ComputedState}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
ComputedState :: map().Description: Compute process state at a specific slot or with dryrun message. Main entry point for process execution.
Two Modes: GET Mode (Normal Execution):- Applies scheduled messages to process state
- Advances state permanently
- Caches results
- Path:
GET /ProcessID/compute?slot=N
- Simulates message processing
- Does not advance state
- No caching
- Path:
POST /ProcessID/computewith message body
-module(dev_process_compute_test).
-include_lib("eunit/include/eunit.hrl").
compute_by_slot_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"X = 42">>),
{ok, Result} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
#{}
),
?assert(maps:is_key(<<"results">>, Result)).
compute_dryrun_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
{ok, DryResult} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"compute">>,
<<"method">> => <<"POST">>,
<<"dryrun">> => #{
<<"data">> => <<"return 99">>
}
},
#{}
),
% Result structure may vary - just verify we got a result
?assert(is_map(DryResult)).
compute_sequential_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"X = 1">>),
dev_process:schedule_aos_call(Process, <<"X = X + 1">>),
dev_process:schedule_aos_call(Process, <<"return X">>),
{ok, State0} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
#{}
),
{ok, State1} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 1 },
#{}
),
{ok, State2} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 2 },
#{}
),
?assertEqual(<<"2">>, hb_ao:get(<<"results/data">>, State2, #{})).4. now/3
-spec now(Msg1, Msg2, Opts) -> {ok, LatestResults}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
LatestResults :: map().Description: Return the /results key of the latest computed message. Convenience accessor for most recent state.
Path: GET /ProcessID/now
-module(dev_process_now_test).
-include_lib("eunit/include/eunit.hrl").
now_returns_latest_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"return 1+1">>),
dev_process:schedule_aos_call(Process, <<"return 2+2">>),
{ok, Latest} = hb_ao:resolve(Process, <<"now/results/data">>, #{}),
?assertEqual(<<"4">>, Latest).
now_empty_schedule_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
Result = hb_ao:resolve(Process, <<"now">>, #{}),
?assertMatch({ok, _}, Result).5. slot/3
-spec slot(Msg1, Msg2, Opts) -> {ok, SlotInfo}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
SlotInfo :: map().Description: Get information about a specific slot. Routes to scheduler device.
Test Code:-module(dev_process_slot_test).
-include_lib("eunit/include/eunit.hrl").
slot_query_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"test">>),
{ok, SlotInfo} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"slot">>, <<"slot">> => 0 },
#{}
),
?assert(is_map(SlotInfo)).6. snapshot/3
-spec snapshot(Msg1, Msg2, Opts) -> {ok, StateSnapshot}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
StateSnapshot :: map().Description: Create a restorable snapshot of process state at current slot. Includes cache metadata for storage.
Test Code:-module(dev_process_snapshot_test).
-include_lib("eunit/include/eunit.hrl").
snapshot_creation_test() ->
% snapshot/3 requires complex wasm state setup
% Verify module exports snapshot/3
code:ensure_loaded(dev_process),
?assert(erlang:function_exported(dev_process, snapshot, 3)).7. push/3
-spec push(Msg1, Msg2, Opts) -> {ok, PushedResults}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
PushedResults :: map().Description: Execute push device on results. Routes to push device for external message delivery.
Test Code:-module(dev_process_push_test).
-include_lib("eunit/include/eunit.hrl").
push_results_test() ->
% push/3 requires complex process setup with results
% Verify module exports push/3
code:ensure_loaded(dev_process),
?assert(erlang:function_exported(dev_process, push, 3)).8. as/3
-spec as(Msg1, Msg2, Opts) -> {ok, MsgWithDevice}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
MsgWithDevice :: map().Description: Swap process device for specified component device (scheduler, execution, push). Returns process message with appropriate device and configuration.
Device Keys:scheduler→scheduler-deviceexecution→execution-devicepush→push-device
-module(dev_process_as_test).
-include_lib("eunit/include/eunit.hrl").
as_execution_test() ->
{ok, Module} = file:read_file("test/test.lua"),
Process = #{
<<"device">> => <<"process@1.0">>,
<<"execution-device">> => <<"lua@5.3a">>,
<<"module">> => #{
<<"content-type">> => <<"text/x-lua">>,
<<"body">> => Module
}
},
{ok, AsExecution} = dev_process:as(
Process,
#{ <<"as">> => <<"execution">> },
#{}
),
?assertEqual(<<"lua@5.3a">>, maps:get(<<"device">>, AsExecution)).
as_scheduler_test() ->
Process = #{
<<"device">> => <<"process@1.0">>,
<<"scheduler-device">> => <<"scheduler@1.0">>
},
{ok, AsScheduler} = dev_process:as(
Process,
#{ <<"as">> => <<"scheduler">> },
#{}
),
?assertEqual(<<"scheduler@1.0">>, maps:get(<<"device">>, AsScheduler)).9. process_id/3
-spec process_id(Msg1, Msg2, Opts) -> ProcessID
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
ProcessID :: binary().Description: Extract or compute the process ID from message. Handles commitment specifications.
Test Code:-module(dev_process_id_test).
-include_lib("eunit/include/eunit.hrl").
process_id_extraction_test() ->
Process = dev_process:test_aos_process(),
ID = dev_process:process_id(Process, #{}, #{}),
?assert(is_binary(ID)),
?assertEqual(43, byte_size(ID)).Execution Flow
Normal Compute Flow
1. Load/Initialize Process
├─ Check if initialized
├─ Run init if needed
└─ Ensure process key present
2. Determine Slot to Compute
├─ Extract from request
├─ Or compute next slot
└─ Check cache for existing
3. Restore State
├─ Find latest snapshot
├─ Load from cache
└─ Or start from init
4. Execute Stack
├─ Get scheduled message
├─ Run execution device
├─ Pass through stack
└─ Generate results
5. Cache Results
├─ Write to process cache
├─ Link by slot and ID
└─ Store snapshot if needed
6. Push Results
├─ Run push device
└─ Deliver external messages
7. Return Computed StateDryrun Flow
1. Extract Dryrun Message
2. Restore Latest State
3. Apply Dryrun Message
4. Return Results (no caching)State Caching
Cache Paths
% By slot number
/computed/{ProcessID}/slot/{SlotNumber}
% By message ID
/computed/{ProcessID}/{MessageID}
% Snapshot
/computed/{ProcessID}/snapshot/{Slot}Cache Frequency
Time-based (Production):% Every 60 seconds
process_snapshot_time => 60% Every slot
process_snapshot_slots => 1Cache Write
dev_process_cache:write(ProcessID, Slot, State, Opts)Cache Read
dev_process_cache:read(ProcessID, SlotOrID, Opts)
dev_process_cache:latest(ProcessID, RequiredPath, Limit, Opts)Persistent Workers
Worker Lifecycle
1. Spawn Worker Process
└─ dev_process_worker:server/3
2. Initialize State
└─ Load or create initial state
3. Process Messages
├─ Sequential execution
├─ State accumulation
└─ Result caching
4. Await Results
└─ dev_process_worker:await/5Worker Grouping
% Group computations by process
dev_process_worker:group(ProcessID, Slot, Opts)Worker Benefits
- Performance: Avoid repeated initialization
- State Persistence: Keep execution context loaded
- Sequential Guarantee: Messages execute in order
- Concurrent Processes: Multiple processes in parallel
Test Helpers
test_aos_process/0, /1, /2
-spec test_aos_process() -> ProcessMsg.
-spec test_aos_process(Opts) -> ProcessMsg.
-spec test_aos_process(Opts, Stack) -> ProcessMsg.Description: Create test AO process with Lua execution environment.
Process = dev_process:test_aos_process(),
Process = dev_process:test_aos_process(#{ process_cache_frequency => 1 }),
Process = dev_process:test_aos_process(#{}, [
<<"wasi@1.0">>,
<<"json-iface@1.0">>,
<<"wasm-64@1.0">>,
<<"patch@1.0">>
]).test_wasm_process/1
-spec test_wasm_process(WASMPath) -> ProcessMsg
when
WASMPath :: binary().Description: Create test WASM process.
Process = dev_process:test_wasm_process(<<"test/test-64.wasm">>).schedule_aos_call/2, /3
-spec schedule_aos_call(Process, LuaCode) -> ok.
-spec schedule_aos_call(Process, LuaCode, Opts) -> ok.Description: Helper to schedule Lua code for execution.
dev_process:schedule_aos_call(Process, <<"X = 42">>),
dev_process:schedule_aos_call(Process, <<"return X">>, #{}).Common Patterns
%% Initialize and schedule messages
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"X = 1">>),
dev_process:schedule_aos_call(Process, <<"X = X + 1">>),
dev_process:schedule_aos_call(Process, <<"return X">>).
%% Compute specific slot
{ok, State} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 1 },
#{}
).
%% Get latest results
{ok, Latest} = hb_ao:resolve(Process, <<"now/results">>, #{}).
%% Dryrun without state changes
{ok, DryResult} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"compute">>,
<<"method">> => <<"POST">>,
<<"dryrun">> => #{ <<"data">> => <<"return 'test'">> }
},
#{}
).
%% Access via HTTP
Node = hb_http_server:start_node(#{
node_processes => #{
<<"my-process">> => Process
}
}),
{ok, Result} = hb_http:get(
Node,
<<"/my-process~node-process@1.0/now/results/data">>,
#{}
).
%% With persistent workers
{ok, _} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
#{ spawn_worker => true, process_workers => true }
).
%% State restoration from cache
{ok, Slot, RestoredState} = dev_process_cache:latest(
ProcessID,
<<"/results">>, % Required path
10, % Max slot
#{}
).Default Devices
AO.TN.1 Variant (Genesis)
scheduler-device→scheduler@1.0execution-device→genesis-wasm@1.0push-device→push@1.0
Post-Mainnet
- Must explicitly set
execution-device - No default WASM device
Device Swapping
Process device routes to component devices:
% Original process
Process = #{
<<"device">> => <<"process@1.0">>,
<<"scheduler-device">> => <<"scheduler@1.0">>,
<<"execution-device">> => <<"stack@1.0">>
}
% Swapped to scheduler
AsScheduler = #{
<<"device">> => <<"scheduler@1.0">>,
<<"input-prefix">> => <<"process">>,
% ... process state preserved
}
% Swapped to execution
AsExecution = #{
<<"device">> => <<"stack@1.0">>,
<<"input-prefix">> => <<"process">>,
<<"execution-stack">> => [<<"lua@5.3a">>, <<"patch@1.0">>],
% ... process state preserved
}References
- Process Worker -
dev_process_worker.erl - Process Cache -
dev_process_cache.erl - Scheduler -
dev_scheduler.erl - Stack -
dev_stack.erl - WASM Devices -
dev_wasm*.erl - Push Device -
dev_push.erl - AO Core -
hb_ao.erl - Cache -
hb_cache.erl
Notes
- Immutable References: Process ID provides immutable reference to interaction history
- Cache Control: Multiple strategies for state persistence
- Worker Architecture: Persistent workers for performance
- Two Compute Modes: GET for normal, POST for dryrun
- Device Routing: Coordinates multiple device types
- State Restoration: Smart caching and restoration from snapshots
- Slot Numbering: Zero-indexed slot numbers
- Init Phase: Required before first computation
- Snapshot Storage: Additional hashpaths for cache optimization
- Path Conventions: Uses
/ProcessID/Actionformat - Component Devices: Scheduler, execution, and push are pluggable
- Test Defaults: Different cache frequency in test vs. production
- HTTP Integration: Native support for HTTP server mounting
- Parallel Execution: Multiple processes can compute concurrently
- Sequential Guarantee: Single process messages execute in order