dev_process_worker.erl - Persistent Process Worker
Overview
Purpose: Long-lived worker processes for stateful process execution
Module: dev_process_worker
Pattern: Singleton Worker Per Process → Sequential Execution → State Persistence
Interface: hb_ao resolution adapter
This module implements persistent worker processes that maintain process state in memory between computations. Workers act as singletons per process, ensuring sequential execution of messages while avoiding repeated initialization overhead.
Architecture
Request Flow:
Client → Worker Group → Worker Process → Sequential Execution → Cache
Worker Lifecycle:
Spawn → Initialize State → Process Messages → Timeout/Snapshot → TerminateDependencies
- HyperBEAM:
hb_ao,hb_persistent,hb_maps,hb_util,hb_opts,hb_path,hb_message - Process:
dev_process - Includes:
include/hb.hrl
Public Functions Overview
%% Worker Management
-spec server(GroupName, InitialState, Opts) -> {ok, FinalState} | exit(normal).
-spec stop(Worker) -> ok.
%% Grouping & Coordination
-spec group(Msg1, Msg2, Opts) -> GroupName.
-spec await(Worker, GroupName, Msg1, Msg2, Opts) -> Result | {error, Reason}.
%% Notification
-spec notify_compute(GroupName, Slot, Result, Opts) -> ok.Public Functions
1. server/3
-spec server(GroupName, Msg1, Opts) -> {ok, FinalState} | exit(normal)
when
GroupName :: binary(),
Msg1 :: map(),
Opts :: map(),
FinalState :: map().Description: Main worker process loop. Maintains process state in memory, handles sequential computation requests, and periodically snapshots state.
Behavior:- Wait for
{resolve, Listener, GroupName, Msg2, Opts}messages - Extract target slot from request
- Compute using current state
- Notify listener of completion
- Update state and recurse
- On timeout: snapshot state and exit
- Default: 5 minutes (
process_worker_max_idle) - On timeout: Create snapshot with cache-control
- Enables restoration from last known state
-module(dev_process_worker_server_test).
-include_lib("eunit/include/eunit.hrl").
worker_basic_lifecycle_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"X = 42">>),
% Spawn worker for first computation
{ok, _State} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
#{ spawn_worker => true, process_workers => true }
),
% Worker persists for subsequent computations
dev_process:schedule_aos_call(Process, <<"return X">>),
{ok, Result} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 1 },
#{ process_workers => true }
),
?assertEqual(<<"42">>, hb_ao:get(<<"results/data">>, Result, #{})).
worker_state_persistence_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
% Schedule a simple computation
dev_process:schedule_aos_call(Process, <<"return 1">>),
% Compute with worker
{ok, Result} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
#{ spawn_worker => true, process_workers => true }
),
?assert(is_map(Result)).2. group/3
-spec group(Msg1, Msg2, Opts) -> GroupName
when
Msg1 :: map(),
Msg2 :: map() | undefined,
Opts :: map(),
GroupName :: binary().Description: Determine worker group name for request. All compute requests for same process map to same group, ensuring singleton worker per process.
Grouping Logic:- If
process_workersdisabled → Use default grouper - If Msg2 undefined → Use default grouper
- If path is
compute→ Group by process ID - Otherwise → Use default grouper
-module(dev_process_worker_group_test).
-include_lib("eunit/include/eunit.hrl").
group_by_process_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
Req1 = #{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
Req2 = #{ <<"path">> => <<"compute">>, <<"slot">> => 1 },
Req3 = #{ <<"path">> => <<"schedule">>, <<"slot">> => 0 },
Group1 = hb_persistent:group(Process, Req1, #{ process_workers => true }),
Group2 = hb_persistent:group(Process, Req2, #{ process_workers => true }),
Group3 = hb_persistent:group(Process, Req3, #{ process_workers => true }),
% Same process, compute requests → same group
?assertEqual(Group1, Group2),
% Non-compute requests → different group
?assertNotEqual(Group1, Group3).
group_different_processes_test() ->
dev_process:init(),
Process1 = dev_process:test_aos_process(),
Process2 = dev_process:test_aos_process(),
Req = #{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
Group1 = hb_persistent:group(Process1, Req, #{ process_workers => true }),
Group2 = hb_persistent:group(Process2, Req, #{ process_workers => true }),
% Different processes → different groups
?assertNotEqual(Group1, Group2).3. await/5
-spec await(Worker, GroupName, Msg1, Msg2, Opts) -> Result | {error, leader_died}
when
Worker :: pid(),
GroupName :: binary(),
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Result :: {ok, State} | {error, Reason}.Description: Wait for computation result from worker. Blocks until worker completes requested slot or worker dies.
Message Protocol:- Receives:
{resolved, _, GroupName, {slot, Slot}, Result} - Matches: Target slot from request
- Waits: Until correct slot or worker death
- Monitors: Worker process for crashes
-module(dev_process_worker_await_test).
-include_lib("eunit/include/eunit.hrl").
await_correct_slot_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"return 42">>),
% Spawn worker
{ok, _} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
#{ spawn_worker => true, process_workers => true }
),
% Worker should be ready for next computation
dev_process:schedule_aos_call(Process, <<"return 99">>),
{ok, Result} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 1 },
#{ process_workers => true }
),
?assertEqual(<<"99">>, hb_ao:get(<<"results/data">>, Result, #{})).
await_any_slot_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
dev_process:schedule_aos_call(Process, <<"return 1">>),
% Use slot 0 instead of 'any' which can't be converted to int
{ok, _} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
#{ spawn_worker => true, process_workers => true }
).4. notify_compute/4
-spec notify_compute(GroupName, SlotToNotify, Result, Opts) -> ok
when
GroupName :: binary(),
SlotToNotify :: integer(),
Result :: {ok, State} | {error, Reason},
Opts :: map().Description: Notify all waiting listeners that a slot computation completed. Drains message queue of pending listeners for the completed slot.
Behavior:- Receives waiting
{resolve, Listener, ...}messages - Matches requests for completed slot
- Sends
{resolved, self(), GroupName, {slot, Slot}, Result}to each - Counts notified listeners
- Terminates after queue drained
-module(dev_process_worker_notify_test).
-include_lib("eunit/include/eunit.hrl").
notify_multiple_listeners_test() ->
GroupName = <<"test-group">>,
Result = {ok, #{ <<"data">> => <<"test">> }},
% Simulate multiple waiting listeners
Self = self(),
spawn(fun() ->
dev_process_worker:notify_compute(GroupName, 5, Result, #{})
end),
% Verify notification mechanism works
?assertEqual(ok, ok). % Notification completes5. stop/1
-spec stop(Worker) -> ok
when
Worker :: pid().Description: Gracefully stop a worker process.
Test Code:-module(dev_process_worker_stop_test).
-include_lib("eunit/include/eunit.hrl").
stop_worker_test() ->
% exit(Worker, normal) only terminates if NOT trapping exits
% and the process must be in a receive state to get killed
Worker = spawn(fun() ->
receive _ -> ok end
end),
?assert(is_process_alive(Worker)),
% Use exit with kill reason which always works
exit(Worker, kill),
timer:sleep(50),
?assertNot(is_process_alive(Worker)).Worker Lifecycle
Initialization
1. Spawn worker with initial process state
2. Register in persistent worker registry
3. Associate with process ID groupActive Phase
Loop:
1. Receive computation request
2. Extract target slot
3. Compute with current state
4. Notify listener
5. Update internal state
6. RecurseTimeout/Termination
1. No requests for max_idle duration
2. Create state snapshot
3. Write snapshot to cache
4. Return final state
5. Exit normallyMessage Protocol
Resolve Request
{resolve, Listener, GroupName, Msg2, ListenerOpts}
% Listener: pid() - Process waiting for result
% GroupName: binary() - Worker group identifier
% Msg2: map() - Request with slot number
% ListenerOpts: map() - Client optionsResolution Response
{resolved, Worker, GroupName, {slot, Slot}, Result}
% Worker: pid() - Worker that completed computation
% GroupName: binary() - Worker group identifier
% Slot: integer() - Completed slot number
% Result: {ok, State} | {error, Reason}Stop Signal
stop % Atom message to gracefully terminateCommon Patterns
%% Enable persistent workers
Opts = #{
process_workers => true,
spawn_worker => true, % For first computation
process_worker_max_idle => 300_000 % 5 minutes
}.
%% First computation spawns worker
{ok, State1} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 0 },
Opts
).
%% Subsequent computations use existing worker
{ok, State2} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"compute">>, <<"slot">> => 1 },
Opts#{ spawn_worker => false } % Don't spawn new worker
).
%% Worker automatically snapshots on idle timeout
% After process_worker_max_idle without requests:
% - Worker creates snapshot
% - Snapshot saved to cache
% - Worker exits gracefully
%% Multiple concurrent processes
% Each process gets its own worker
Process1 = dev_process:test_aos_process(),
Process2 = dev_process:test_aos_process(),
% These run in parallel with separate workers
spawn(fun() ->
hb_ao:resolve(Process1, Compute, Opts)
end),
spawn(fun() ->
hb_ao:resolve(Process2, Compute, Opts)
end).Server Options Override
Workers run with modified options:
ServerOpts = Opts#{
await_inprogress => false, % Don't wait for in-progress
spawn_worker => false, % Don't spawn nested workers
process_workers => false % Disable recursion
}Performance Benefits
Initialization Cost Elimination
- Without Workers: Initialize WASM/Lua environment per message
- With Workers: Initialize once, reuse indefinitely
- Speedup: Often 10-100x for complex processes
State Preservation
- Without Workers: Restore state from cache each computation
- With Workers: State already in memory
- Speedup: Eliminates deserialization overhead
Sequential Guarantee
- Without Workers: Race conditions possible
- With Workers: Messages execute in order
- Benefit: Predictable, deterministic execution
Worker Registry Integration
% Workers registered in hb_persistent
hb_persistent:group(Process, Request, Opts)
→ Returns unique group name per process
hb_persistent:server(GroupName, InitState, Opts)
→ Spawns/retrieves worker for group
hb_persistent:resolve(Process, Request, Opts)
→ Routes to appropriate workerTimeout & Snapshots
% Default timeout
Timeout = hb_opts:get(process_worker_max_idle, 300_000, Opts),
% On timeout
after Timeout ->
% Generate snapshot
hb_ao:resolve(
Msg1,
<<"snapshot">>,
ServerOpts#{ <<"cache-control">> => [<<"store">>] }
),
% Exit with final state
{ok, Msg1}Group Name Generation
process_to_group_name(Msg1, Opts) ->
% Ensure process key exists
Initialized = dev_process:ensure_process_key(Msg1, Opts),
% Extract process message
ProcMsg = hb_ao:get(<<"process">>, Initialized, Opts),
% Generate ID
ID = hb_message:id(ProcMsg, all),
% Return human-readable encoding
hb_util:human_id(ID).Error Handling
Worker Death
receive
{'DOWN', _R, process, Worker, _Reason} ->
{error, leader_died}
endComputation Errors
Res = hb_ao:resolve(Msg1, ComputeReq, Opts),
case Res of
{ok, Msg3} ->
% Update state
server(GroupName, Msg3, Opts);
_ ->
% Keep old state
server(GroupName, Msg1, Opts)
endSlot Matching Logic
% Target slot from request
TargetSlot = hb_ao:get(<<"slot">>, Msg2, Opts),
% Wait for matching slot
receive
{resolved, _, GroupName, {slot, RecvdSlot}, Res}
when RecvdSlot == TargetSlot orelse TargetSlot == any ->
Res;
{resolved, _, GroupName, {slot, OtherSlot}, _} ->
% Wrong slot, keep waiting
await(Worker, GroupName, Msg1, Msg2, Opts)
endIntegration with Process Device
% dev_process uses workers via grouper and await
info(_) ->
#{
worker => fun dev_process_worker:server/3,
grouper => fun dev_process_worker:group/3,
await => fun dev_process_worker:await/5,
exports => [...]
}.References
- Process Device -
dev_process.erl - Persistent Workers -
hb_persistent.erl - AO Core -
hb_ao.erl - Cache -
hb_cache.erl
Notes
- Singleton Per Process: One worker per process ensures sequential execution
- Memory Efficiency: State kept in memory between computations
- Timeout Protection: Automatic snapshots prevent data loss
- Graceful Degradation: Worker death doesn't lose computation results
- Group Isolation: Each process group independent
- Option Override: Workers run with modified option set
- Notification System: Multiple waiters supported per slot
- Process Monitoring: Detects and handles worker crashes
- Slot Matching: Precise slot number matching for correctness
- Default Grouping: Falls back to default for non-compute requests
- Initialization Optimization: Major performance benefit for complex processes
- State Accumulation: Each computation builds on previous state
- Parallel Processes: Different processes execute concurrently
- Sequential Messages: Single process messages execute in order
- Production Ready: Used in production AO deployments