Skip to content

dev_process.erl - AO Process Device Implementation

Overview

Purpose: Core AO process execution coordinator
Module: dev_process
Pattern: Schedule → Compute → Cache → Push
Device: Process@1.0

This module implements the complete AO process device, routing requests between scheduling, execution, and pushing components. It manages process state caching, supports persistent workers, and provides immutable references to process interaction history through cache paths.

Architecture

External API:
  /ID/Schedule     → Schedule messages
  /ID/Compute/Slot → Compute state at slot
  /ID/Now          → Latest results
  /ID/Snapshot     → State snapshot
 
Internal Flow:
  Schedule Device → Execution Stack → Cache → Push Device

Dependencies

  • HyperBEAM: hb_ao, hb_util, hb_maps, hb_message, hb_cache, hb_private, hb_path, hb_opts, hb_http, hb_http_server, hb_client
  • Worker: dev_process_worker, dev_process_cache
  • Arweave: ar_wallet, ar_bundles
  • Devices: Scheduler, execution stack, push devices
  • Includes: include/hb.hrl

Configuration

Process Definition

#{
    <<"device">> => <<"process@1.0">>,
    <<"scheduler-device">> => <<"scheduler@1.0">>,
    <<"execution-device">> => <<"stack@1.0">>,
    <<"execution-stack">> => [
        <<"scheduler@1.0">>,
        <<"cron@1.0">>,
        <<"wasm@1.0">>,
        <<"poda@1.0">>
    ],
    <<"push-device">> => <<"push@1.0">>,
    
    % Runtime options
    <<"process/cache-frequency">> => 10,
    <<"process/cache-keys">> => [<<"/results">>, <<"/state">>]
}

Runtime Options

Cache Control:
  • process_snapshot_slots - Slots between state snapshots (default: undefined)
  • process_snapshot_time - Seconds between snapshots (default: 60 in prod, undefined in test)
  • process_cache_frequency - Legacy slot-based caching
  • process_async_cache - Async caching (default: true)
Worker Control:
  • process_workers - Enable persistent workers (default: true)
  • spawn_worker - Spawn worker for this computation

Public Functions Overview

%% Core API
-spec info(Msg) -> ProcessInfo.
-spec schedule(Msg1, Msg2, Opts) -> {ok, ScheduleResponse}.
-spec compute(Msg1, Msg2, Opts) -> {ok, ComputedState}.
-spec now(Msg1, Msg2, Opts) -> {ok, LatestResults}.
-spec slot(Msg1, Msg2, Opts) -> {ok, SlotInfo}.
-spec push(Msg1, Msg2, Opts) -> {ok, PushedResults}.
-spec snapshot(Msg1, Msg2, Opts) -> {ok, StateSnapshot}.
 
%% Device Routing
-spec as(Msg1, Msg2, Opts) -> {ok, MsgWithDevice}.
-spec as_process(Msg, Opts) -> ProcessMsg.
 
%% Utilities
-spec process_id(Msg1, Msg2, Opts) -> ProcessID.
-spec ensure_process_key(Msg, Opts) -> MsgWithProcess.
 
%% Test Helpers
-spec test_aos_process() -> ProcessMsg.
-spec test_aos_process(Opts) -> ProcessMsg.
-spec test_aos_process(Opts, Stack) -> ProcessMsg.
-spec test_wasm_process(WASMPath) -> ProcessMsg.
-spec schedule_aos_call(Process, LuaCode) -> ok.
-spec schedule_aos_call(Process, LuaCode, Opts) -> ok.

Public Functions

1. info/1

-spec info(Msg) -> #{
    worker => WorkerFun,
    grouper => GrouperFun,
    await => AwaitFun,
    exports => [binary()]
}
    when
        Msg :: map().

Description: Return process device metadata including worker functions and exported paths.

Exports:
  • info - Device information
  • as - Device swapping
  • compute - State computation
  • now - Latest results
  • schedule - Message scheduling
  • slot - Slot information
  • snapshot - State snapshot
  • push - Message pushing
Test Code:
-module(dev_process_info_test).
-include_lib("eunit/include/eunit.hrl").
 
info_exports_test() ->
    Info = dev_process:info(#{}),
    Exports = maps:get(exports, Info),
    ?assert(lists:member(<<"compute">>, Exports)),
    ?assert(lists:member(<<"schedule">>, Exports)),
    ?assert(lists:member(<<"now">>, Exports)).
 
info_worker_test() ->
    Info = dev_process:info(#{}),
    ?assert(maps:is_key(worker, Info)),
    ?assert(is_function(maps:get(worker, Info), 3)).

2. schedule/3

-spec schedule(Msg1, Msg2, Opts) -> {ok, Response}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        Response :: map().

Description: Schedule messages for future execution. Routes to scheduler device.

Paths:
  • POST /ProcessID/schedule - Add message to schedule
  • GET /ProcessID/schedule - View scheduled messages
Test Code:
-module(dev_process_schedule_test).
-include_lib("eunit/include/eunit.hrl").
 
schedule_message_test() ->
    % schedule requires proper process setup with signed messages
    % Verify module exports schedule/3
    code:ensure_loaded(dev_process),
    ?assert(erlang:function_exported(dev_process, schedule, 3)).
 
schedule_multiple_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    dev_process:schedule_aos_call(Process, <<"X = 1">>),
    dev_process:schedule_aos_call(Process, <<"X = 2">>),
    dev_process:schedule_aos_call(Process, <<"return X">>),
    {ok, Result} = hb_ao:resolve(
        Process,
        #{ <<"path">> => <<"now/results/data">> },
        #{}
    ),
    ?assertEqual(<<"2">>, Result).

3. compute/3

-spec compute(Msg1, Msg2, Opts) -> {ok, ComputedState}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        ComputedState :: map().

Description: Compute process state at a specific slot or with dryrun message. Main entry point for process execution.

Two Modes: GET Mode (Normal Execution):
  • Applies scheduled messages to process state
  • Advances state permanently
  • Caches results
  • Path: GET /ProcessID/compute?slot=N
POST Mode (Dryrun):
  • Simulates message processing
  • Does not advance state
  • No caching
  • Path: POST /ProcessID/compute with message body
Test Code:
-module(dev_process_compute_test).
-include_lib("eunit/include/eunit.hrl").
 
compute_by_slot_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    dev_process:schedule_aos_call(Process, <<"X = 42">>),
    
    {ok, Result} = hb_ao:resolve(
        Process,
        #{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
        #{}
    ),
    ?assert(maps:is_key(<<"results">>, Result)).
 
compute_dryrun_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    
    {ok, DryResult} = hb_ao:resolve(
        Process,
        #{
            <<"path">> => <<"compute">>,
            <<"method">> => <<"POST">>,
            <<"dryrun">> => #{
                <<"data">> => <<"return 99">>
            }
        },
        #{}
    ),
    % Result structure may vary - just verify we got a result
    ?assert(is_map(DryResult)).
 
compute_sequential_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    dev_process:schedule_aos_call(Process, <<"X = 1">>),
    dev_process:schedule_aos_call(Process, <<"X = X + 1">>),
    dev_process:schedule_aos_call(Process, <<"return X">>),
    
    {ok, State0} = hb_ao:resolve(
        Process,
        #{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
        #{}
    ),
    {ok, State1} = hb_ao:resolve(
        Process,
        #{ <<"path">> => <<"compute">>, <<"slot">> => 1 },
        #{}
    ),
    {ok, State2} = hb_ao:resolve(
        Process,
        #{ <<"path">> => <<"compute">>, <<"slot">> => 2 },
        #{}
    ),
    ?assertEqual(<<"2">>, hb_ao:get(<<"results/data">>, State2, #{})).

4. now/3

-spec now(Msg1, Msg2, Opts) -> {ok, LatestResults}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        LatestResults :: map().

Description: Return the /results key of the latest computed message. Convenience accessor for most recent state.

Path: GET /ProcessID/now

Test Code:
-module(dev_process_now_test).
-include_lib("eunit/include/eunit.hrl").
 
now_returns_latest_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    dev_process:schedule_aos_call(Process, <<"return 1+1">>),
    dev_process:schedule_aos_call(Process, <<"return 2+2">>),
    
    {ok, Latest} = hb_ao:resolve(Process, <<"now/results/data">>, #{}),
    ?assertEqual(<<"4">>, Latest).
 
now_empty_schedule_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    
    Result = hb_ao:resolve(Process, <<"now">>, #{}),
    ?assertMatch({ok, _}, Result).

5. slot/3

-spec slot(Msg1, Msg2, Opts) -> {ok, SlotInfo}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        SlotInfo :: map().

Description: Get information about a specific slot. Routes to scheduler device.

Test Code:
-module(dev_process_slot_test).
-include_lib("eunit/include/eunit.hrl").
 
slot_query_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    dev_process:schedule_aos_call(Process, <<"test">>),
    
    {ok, SlotInfo} = hb_ao:resolve(
        Process,
        #{ <<"path">> => <<"slot">>, <<"slot">> => 0 },
        #{}
    ),
    ?assert(is_map(SlotInfo)).

6. snapshot/3

-spec snapshot(Msg1, Msg2, Opts) -> {ok, StateSnapshot}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        StateSnapshot :: map().

Description: Create a restorable snapshot of process state at current slot. Includes cache metadata for storage.

Test Code:
-module(dev_process_snapshot_test).
-include_lib("eunit/include/eunit.hrl").
 
snapshot_creation_test() ->
    % snapshot/3 requires complex wasm state setup
    % Verify module exports snapshot/3
    code:ensure_loaded(dev_process),
    ?assert(erlang:function_exported(dev_process, snapshot, 3)).

7. push/3

-spec push(Msg1, Msg2, Opts) -> {ok, PushedResults}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        PushedResults :: map().

Description: Execute push device on results. Routes to push device for external message delivery.

Test Code:
-module(dev_process_push_test).
-include_lib("eunit/include/eunit.hrl").
 
push_results_test() ->
    % push/3 requires complex process setup with results
    % Verify module exports push/3
    code:ensure_loaded(dev_process),
    ?assert(erlang:function_exported(dev_process, push, 3)).

8. as/3

-spec as(Msg1, Msg2, Opts) -> {ok, MsgWithDevice}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        MsgWithDevice :: map().

Description: Swap process device for specified component device (scheduler, execution, push). Returns process message with appropriate device and configuration.

Device Keys:
  • schedulerscheduler-device
  • executionexecution-device
  • pushpush-device
Test Code:
-module(dev_process_as_test).
-include_lib("eunit/include/eunit.hrl").
 
as_execution_test() ->
    {ok, Module} = file:read_file("test/test.lua"),
    Process = #{
        <<"device">> => <<"process@1.0">>,
        <<"execution-device">> => <<"lua@5.3a">>,
        <<"module">> => #{
            <<"content-type">> => <<"text/x-lua">>,
            <<"body">> => Module
        }
    },
    
    {ok, AsExecution} = dev_process:as(
        Process,
        #{ <<"as">> => <<"execution">> },
        #{}
    ),
    ?assertEqual(<<"lua@5.3a">>, maps:get(<<"device">>, AsExecution)).
 
as_scheduler_test() ->
    Process = #{
        <<"device">> => <<"process@1.0">>,
        <<"scheduler-device">> => <<"scheduler@1.0">>
    },
    
    {ok, AsScheduler} = dev_process:as(
        Process,
        #{ <<"as">> => <<"scheduler">> },
        #{}
    ),
    ?assertEqual(<<"scheduler@1.0">>, maps:get(<<"device">>, AsScheduler)).

9. process_id/3

-spec process_id(Msg1, Msg2, Opts) -> ProcessID
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        ProcessID :: binary().

Description: Extract or compute the process ID from message. Handles commitment specifications.

Test Code:
-module(dev_process_id_test).
-include_lib("eunit/include/eunit.hrl").
 
process_id_extraction_test() ->
    Process = dev_process:test_aos_process(),
    ID = dev_process:process_id(Process, #{}, #{}),
    ?assert(is_binary(ID)),
    ?assertEqual(43, byte_size(ID)).

Execution Flow

Normal Compute Flow

1. Load/Initialize Process
   ├─ Check if initialized
   ├─ Run init if needed
   └─ Ensure process key present
 
2. Determine Slot to Compute
   ├─ Extract from request
   ├─ Or compute next slot
   └─ Check cache for existing
 
3. Restore State
   ├─ Find latest snapshot
   ├─ Load from cache
   └─ Or start from init
 
4. Execute Stack
   ├─ Get scheduled message
   ├─ Run execution device
   ├─ Pass through stack
   └─ Generate results
 
5. Cache Results
   ├─ Write to process cache
   ├─ Link by slot and ID
   └─ Store snapshot if needed
 
6. Push Results
   ├─ Run push device
   └─ Deliver external messages
 
7. Return Computed State

Dryrun Flow

1. Extract Dryrun Message
2. Restore Latest State
3. Apply Dryrun Message
4. Return Results (no caching)

State Caching

Cache Paths

% By slot number
/computed/{ProcessID}/slot/{SlotNumber}
 
% By message ID
/computed/{ProcessID}/{MessageID}
 
% Snapshot
/computed/{ProcessID}/snapshot/{Slot}

Cache Frequency

Time-based (Production):
% Every 60 seconds
process_snapshot_time => 60
Slot-based (Testing):
% Every slot
process_snapshot_slots => 1

Cache Write

dev_process_cache:write(ProcessID, Slot, State, Opts)

Cache Read

dev_process_cache:read(ProcessID, SlotOrID, Opts)
dev_process_cache:latest(ProcessID, RequiredPath, Limit, Opts)

Persistent Workers

Worker Lifecycle

1. Spawn Worker Process
   └─ dev_process_worker:server/3
 
2. Initialize State
   └─ Load or create initial state
 
3. Process Messages
   ├─ Sequential execution
   ├─ State accumulation
   └─ Result caching
 
4. Await Results
   └─ dev_process_worker:await/5

Worker Grouping

% Group computations by process
dev_process_worker:group(ProcessID, Slot, Opts)

Worker Benefits

  • Performance: Avoid repeated initialization
  • State Persistence: Keep execution context loaded
  • Sequential Guarantee: Messages execute in order
  • Concurrent Processes: Multiple processes in parallel

Test Helpers

test_aos_process/0, /1, /2

-spec test_aos_process() -> ProcessMsg.
-spec test_aos_process(Opts) -> ProcessMsg.
-spec test_aos_process(Opts, Stack) -> ProcessMsg.

Description: Create test AO process with Lua execution environment.

Process = dev_process:test_aos_process(),
Process = dev_process:test_aos_process(#{ process_cache_frequency => 1 }),
Process = dev_process:test_aos_process(#{}, [
    <<"wasi@1.0">>,
    <<"json-iface@1.0">>,
    <<"wasm-64@1.0">>,
    <<"patch@1.0">>
]).

test_wasm_process/1

-spec test_wasm_process(WASMPath) -> ProcessMsg
    when
        WASMPath :: binary().

Description: Create test WASM process.

Process = dev_process:test_wasm_process(<<"test/test-64.wasm">>).

schedule_aos_call/2, /3

-spec schedule_aos_call(Process, LuaCode) -> ok.
-spec schedule_aos_call(Process, LuaCode, Opts) -> ok.

Description: Helper to schedule Lua code for execution.

dev_process:schedule_aos_call(Process, <<"X = 42">>),
dev_process:schedule_aos_call(Process, <<"return X">>, #{}).

Common Patterns

%% Initialize and schedule messages
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"X = 1">>),
dev_process:schedule_aos_call(Process, <<"X = X + 1">>),
dev_process:schedule_aos_call(Process, <<"return X">>).
 
%% Compute specific slot
{ok, State} = hb_ao:resolve(
    Process,
    #{ <<"path">> => <<"compute">>, <<"slot">> => 1 },
    #{}
).
 
%% Get latest results
{ok, Latest} = hb_ao:resolve(Process, <<"now/results">>, #{}).
 
%% Dryrun without state changes
{ok, DryResult} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"compute">>,
        <<"method">> => <<"POST">>,
        <<"dryrun">> => #{ <<"data">> => <<"return 'test'">> }
    },
    #{}
).
 
%% Access via HTTP
Node = hb_http_server:start_node(#{
    node_processes => #{
        <<"my-process">> => Process
    }
}),
{ok, Result} = hb_http:get(
    Node,
    <<"/my-process~node-process@1.0/now/results/data">>,
    #{}
).
 
%% With persistent workers
{ok, _} = hb_ao:resolve(
    Process,
    #{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
    #{ spawn_worker => true, process_workers => true }
).
 
%% State restoration from cache
{ok, Slot, RestoredState} = dev_process_cache:latest(
    ProcessID,
    <<"/results">>,  % Required path
    10,              % Max slot
    #{}
).

Default Devices

AO.TN.1 Variant (Genesis)

  • scheduler-devicescheduler@1.0
  • execution-devicegenesis-wasm@1.0
  • push-devicepush@1.0

Post-Mainnet

  • Must explicitly set execution-device
  • No default WASM device

Device Swapping

Process device routes to component devices:

% Original process
Process = #{
    <<"device">> => <<"process@1.0">>,
    <<"scheduler-device">> => <<"scheduler@1.0">>,
    <<"execution-device">> => <<"stack@1.0">>
}
 
% Swapped to scheduler
AsScheduler = #{
    <<"device">> => <<"scheduler@1.0">>,
    <<"input-prefix">> => <<"process">>,
    % ... process state preserved
}
 
% Swapped to execution
AsExecution = #{
    <<"device">> => <<"stack@1.0">>,
    <<"input-prefix">> => <<"process">>,
    <<"execution-stack">> => [<<"lua@5.3a">>, <<"patch@1.0">>],
    % ... process state preserved
}

References

  • Process Worker - dev_process_worker.erl
  • Process Cache - dev_process_cache.erl
  • Scheduler - dev_scheduler.erl
  • Stack - dev_stack.erl
  • WASM Devices - dev_wasm*.erl
  • Push Device - dev_push.erl
  • AO Core - hb_ao.erl
  • Cache - hb_cache.erl

Notes

  1. Immutable References: Process ID provides immutable reference to interaction history
  2. Cache Control: Multiple strategies for state persistence
  3. Worker Architecture: Persistent workers for performance
  4. Two Compute Modes: GET for normal, POST for dryrun
  5. Device Routing: Coordinates multiple device types
  6. State Restoration: Smart caching and restoration from snapshots
  7. Slot Numbering: Zero-indexed slot numbers
  8. Init Phase: Required before first computation
  9. Snapshot Storage: Additional hashpaths for cache optimization
  10. Path Conventions: Uses /ProcessID/Action format
  11. Component Devices: Scheduler, execution, and push are pluggable
  12. Test Defaults: Different cache frequency in test vs. production
  13. HTTP Integration: Native support for HTTP server mounting
  14. Parallel Execution: Multiple processes can compute concurrently
  15. Sequential Guarantee: Single process messages execute in order