hb_persistent.erl - Long-Lived AO-Core Resolution Processes
Overview
Purpose: Create and manage persistent worker processes for expensive or serialized message resolutions
Module: hb_persistent
Pattern: Process group-based execution deduplication with PG (distributed process groups)
This module creates long-lived Erlang processes for AO-Core message resolution. It's useful for expensive computations that should be shared across multiple requests, or when executions must be deliberately serialized to avoid parallel processing conflicts. Built on Erlang's pg module for distributed process group management.
Dependencies
- HyperBEAM:
hb_ao,hb_ao_device,hb_name,hb_maps,hb_opts - Erlang/OTP:
pg(process groups),timer - Records:
#tx{}frominclude/hb.hrl
Public Functions Overview
%% Monitor Management
-spec start_monitor() -> PID.
-spec start_monitor(Group) -> PID.
-spec stop_monitor(PID) -> stop.
%% Execution Registration
-spec find_or_register(Msg1, Msg2, Opts) -> {leader | wait | infinite_recursion, Term}.
-spec unregister_notify(GroupName, Msg2, Msg3, Opts) -> ok.
%% Waiting and Notification
-spec await(Worker, Msg1, Msg2, Opts) -> Result.
-spec notify(GroupName, Msg2, Msg3, Opts) -> ok.
%% Worker Management
-spec start_worker(Msg1, Opts) -> PID.
-spec start_worker(WorkerFun, Msg1, Opts) -> PID.
-spec forward_work(Msg, Opts) -> ok.
%% Group Management
-spec group(Msg1, Msg2, Opts) -> GroupName.
-spec default_grouper(Msg1, Msg2, Opts) -> GroupName.
-spec default_worker(GroupName, Msg1, Opts) -> never_returns.
-spec default_await(Worker, GroupName, Msg1, Msg2, Opts) -> Result.Public Functions
1. start_monitor/0, start_monitor/1, stop_monitor/1
-spec start_monitor() -> PID
when
PID :: pid().
-spec start_monitor(Group) -> PID
when
Group :: global | term(),
PID :: pid().
-spec stop_monitor(PID) -> stop
when
PID :: pid().Description: Start/stop a monitoring process that prints process group statistics every second. Useful for debugging and observing worker activity. Note: stop_monitor/1 returns the atom stop (the message sent to the monitor process). The monitor checks for stop messages after each 1-second sleep interval.
== Sitrep ==> 5 named processes. 2 changes.
[my_process: <0.123.0>] #M: 3
[other_process: <0.124.0>] #M: 0-module(hb_persistent_monitor_test).
-include_lib("eunit/include/eunit.hrl").
%% Helper function for tests - defined once, used by all concatenated tests
test_device() ->
#{
info => fun() ->
#{
grouper => fun(M1, _M2, _Opts) ->
erlang:phash2(M1)
end
}
end
}.
start_stop_monitor_test() ->
Monitor = hb_persistent:start_monitor(),
?assert(is_pid(Monitor)),
?assert(erlang:is_process_alive(Monitor)),
% stop_monitor/1 returns the message sent (stop), not ok
stop = hb_persistent:stop_monitor(Monitor),
% Monitor sleeps 1000ms between checking for stop message
timer:sleep(1200),
?assertNot(erlang:is_process_alive(Monitor)).
start_monitor_specific_group_test() ->
Monitor = hb_persistent:start_monitor(my_group),
?assert(is_pid(Monitor)),
hb_persistent:stop_monitor(Monitor).2. find_or_register/3
-spec find_or_register(Msg1, Msg2, Opts) -> {Status, GroupName}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Status :: leader | wait | infinite_recursion,
GroupName :: term().Description: Register as the leader for an execution if none exists, otherwise return the existing leader to wait for. Prevents duplicate execution of expensive operations.
Return Values:{leader, GroupName}: This process should execute{wait, LeaderPID}: Another process is executing, wait for result{infinite_recursion, GroupName}: This process is already the leader (recursion detected)
-module(hb_persistent_find_test).
-include_lib("eunit/include/eunit.hrl").
find_or_register_leader_test() ->
% Use unique path to avoid conflicts with other tests
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"find_leader_test">>},
Msg2 = #{<<"path">> => <<"test">>},
Opts = #{await_inprogress => named},
{Status, GroupName} = hb_persistent:find_or_register(Msg1, Msg2, Opts),
?assertEqual(leader, Status),
% Clean up: unregister so subsequent tests don't see us as leader
hb_persistent:unregister_notify(GroupName, Msg2, {ok, #{}}, Opts).
find_or_register_disabled_test() ->
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"find_disabled_test">>},
Msg2 = #{<<"path">> => <<"test">>},
Opts = #{await_inprogress => false},
{Status, _} = hb_persistent:find_or_register(Msg1, Msg2, Opts),
% Always becomes leader when await disabled
?assertEqual(leader, Status).3. unregister_notify/4
-spec unregister_notify(GroupName, Msg2, Msg3, Opts) -> ok
when
GroupName :: term() | ungrouped_exec,
Msg2 :: map(),
Msg3 :: term(),
Opts :: map().Description: Unregister as the leader for an execution and notify all waiting processes of completion. Called after execution finishes.
Test Code:-module(hb_persistent_unregister_test).
-include_lib("eunit/include/eunit.hrl").
unregister_notify_test() ->
% Use unique id to avoid conflicts with other tests
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"unregister_test">>},
Msg2 = #{<<"path">> => <<"test">>},
Opts = #{await_inprogress => named},
{leader, GroupName} = hb_persistent:find_or_register(Msg1, Msg2, Opts),
Result = {ok, #{<<"result">> => <<"success">>}},
ok = hb_persistent:unregister_notify(GroupName, Msg2, Result, Opts).
unregister_ungrouped_test() ->
% ungrouped_exec is a no-op
?assertEqual(ok, hb_persistent:unregister_notify(ungrouped_exec, #{}, {ok, #{}}, #{})).4. await/4
-spec await(Worker, Msg1, Msg2, Opts) -> Result
when
Worker :: pid(),
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Result :: term().Description: Wait for a worker process to complete execution. Registers with the worker and blocks until result is received or worker dies.
Test Code:-module(hb_persistent_await_test).
-include_lib("eunit/include/eunit.hrl").
await_worker_death_test() ->
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"await_death_test">>},
Msg2 = #{<<"path">> => <<"test">>},
Worker = spawn(fun() -> exit(normal) end),
timer:sleep(10),
Result = hb_persistent:await(Worker, Msg1, Msg2, #{}),
?assertEqual({error, leader_died}, Result).5. notify/4
-spec notify(GroupName, Msg2, Msg3, Opts) -> ok
when
GroupName :: term(),
Msg2 :: map(),
Msg3 :: term(),
Opts :: map().Description: Check inbox for waiting processes and send them the execution result. Called automatically by unregister_notify/4.
-module(hb_persistent_notify_test).
-include_lib("eunit/include/eunit.hrl").
notify_waiting_processes_test() ->
GroupName = notify_test_group,
Msg2 = #{<<"path">> => <<"test">>},
Result = {ok, #{<<"result">> => <<"done">>}},
% Send a resolve request to ourselves (simulating a waiting process)
% The Listener should be self() so we receive the resolved message
TestPID = self(),
self() ! {resolve, TestPID, GroupName, Msg2, #{}},
% notify/4 checks inbox for {resolve, Listener, ...} and sends {resolved, ...} to Listener
ok = hb_persistent:notify(GroupName, Msg2, Result, #{}),
% Check we received the result
receive
{resolved, _, GroupName, Msg2, Result} -> ok
after 100 -> ?assert(false)
end.6. start_worker/2, start_worker/3
-spec start_worker(Msg1, Opts) -> PID
when
Msg1 :: map(),
Opts :: map(),
PID :: pid().
-spec start_worker(WorkerFun, Msg1, Opts) -> PID
when
WorkerFun :: function(),
Msg1 :: map(),
Opts :: map(),
PID :: pid().Description: Spawn a persistent worker process. The 2-arity version uses the device's worker function or default_worker/3. The 3-arity version uses a custom worker function.
- Registers itself with a group name
- Waits for incoming execution requests
- Processes requests and notifies waiters
- Re-registers for next execution (if configured)
-module(hb_persistent_worker_test).
-include_lib("eunit/include/eunit.hrl").
start_worker_basic_test() ->
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"worker_basic_test">>},
Worker = hb_persistent:start_worker(Msg1, #{static_worker => true}),
?assert(is_pid(Worker)),
?assert(erlang:is_process_alive(Worker)).
start_worker_custom_function_test() ->
CustomWorker = fun(_GroupName, _Msg, _Opts) ->
receive stop -> ok end
end,
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"worker_custom_test">>},
Worker = hb_persistent:start_worker(CustomWorker, Msg1, #{}),
?assert(is_pid(Worker)).7. forward_work/2
-spec forward_work(Msg, Opts) -> ok
when
Msg :: map(),
Opts :: map().Description: Forward work to the appropriate worker if spawn_worker option is set. Creates worker processes dynamically.
-module(hb_persistent_forward_test).
-include_lib("eunit/include/eunit.hrl").
forward_work_noop_test() ->
% forward_work is a no-op when spawn_worker is false
Msg = #{<<"device">> => test_device(), <<"id">> => <<"forward_noop_test">>},
Opts = #{spawn_worker => false},
ok = hb_persistent:forward_work(Msg, Opts).8. group/3
-spec group(Msg1, Msg2, Opts) -> GroupName
when
Msg1 :: map(),
Msg2 :: map() | undefined,
Opts :: map(),
GroupName :: term().Description: Calculate the group name for a message pair. Uses the device's custom grouper function if available, otherwise uses default_grouper/3.
-module(hb_persistent_group_test).
-include_lib("eunit/include/eunit.hrl").
group_default_test() ->
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"group_default_test">>},
Msg2 = #{<<"path">> => <<"execute">>},
Opts = #{await_inprogress => true},
Group = hb_persistent:group(Msg1, Msg2, Opts),
?assert(is_integer(Group)).
group_disabled_test() ->
% Test default_grouper directly since test_device() has custom grouper
Msg1 = #{<<"id">> => <<"group_disabled_test">>},
Msg2 = #{<<"path">> => <<"execute">>},
Opts = #{await_inprogress => false},
?assertEqual(ungrouped_exec, hb_persistent:default_grouper(Msg1, Msg2, Opts)).
group_custom_grouper_test() ->
CustomDevice = #{
info => fun() ->
#{grouper => fun(M1, _M2, _Opts) -> {custom, M1} end}
end
},
Msg1 = #{<<"device">> => CustomDevice, <<"id">> => <<"group_custom_test">>},
Msg2 = #{<<"path">> => <<"test">>},
Group = hb_persistent:group(Msg1, Msg2, #{}),
?assertMatch({custom, _}, Group).9. default_grouper/3
-spec default_grouper(Msg1, Msg2, Opts) -> GroupName
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
GroupName :: integer() | ungrouped_exec.Description: Default group name generator using phash2 hash of message pair (excluding priv fields). Returns ungrouped_exec when await_inprogress is disabled.
Warning: Hash collisions possible - not suitable for production without larger hash range.
Test Code:-module(hb_persistent_grouper_test).
-include_lib("eunit/include/eunit.hrl").
default_grouper_deterministic_test() ->
Msg1 = #{<<"key">> => <<"value1">>},
Msg2 = #{<<"path">> => <<"test">>},
Opts = #{await_inprogress => true},
Group1 = hb_persistent:default_grouper(Msg1, Msg2, Opts),
Group2 = hb_persistent:default_grouper(Msg1, Msg2, Opts),
?assertEqual(Group1, Group2).
default_grouper_different_messages_test() ->
Msg1 = #{<<"key">> => <<"value1">>},
Msg2a = #{<<"path">> => <<"test1">>},
Msg2b = #{<<"path">> => <<"test2">>},
Opts = #{await_inprogress => true},
Group1 = hb_persistent:default_grouper(Msg1, Msg2a, Opts),
Group2 = hb_persistent:default_grouper(Msg1, Msg2b, Opts),
?assertNotEqual(Group1, Group2).
default_grouper_ignores_priv_test() ->
Msg1a = #{<<"key">> => <<"value">>, <<"priv">> => #{<<"data">> => <<"a">>}},
Msg1b = #{<<"key">> => <<"value">>, <<"priv">> => #{<<"data">> => <<"b">>}},
Msg2 = #{<<"path">> => <<"test">>},
Opts = #{await_inprogress => true},
Group1 = hb_persistent:default_grouper(Msg1a, Msg2, Opts),
Group2 = hb_persistent:default_grouper(Msg1b, Msg2, Opts),
?assertEqual(Group1, Group2).10. default_worker/3
-spec default_worker(GroupName, Msg1, Opts) -> never_returns
when
GroupName :: term(),
Msg1 :: map(),
Opts :: map().Description: Default worker server function. Waits for incoming requests, executes them, notifies waiters, and either re-registers for the same group (static) or new group (dynamic).
Behavior:- Wait for
{resolve, Listener, GroupName, Msg2, ListenerOpts}message - Execute
hb_ao:resolve(Msg1, Msg2, Opts) - Send result to listener
- Notify all waiting processes
- Re-register and loop (or exit after timeout)
static_worker => true: Re-register with same GroupNamestatic_worker => false: Re-register with new Msg3-based GroupNameworker_timeout: Milliseconds before unregistering (default 10000)
-module(hb_persistent_default_worker_test).
-include_lib("eunit/include/eunit.hrl").
default_worker_timeout_test() ->
% Test that worker unregisters after timeout
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"default_worker_timeout_test">>},
GroupName = erlang:phash2(Msg1),
% Start worker with short timeout (it will timeout and exit)
Worker = spawn(fun() ->
hb_persistent:default_worker(GroupName, Msg1, #{
worker_timeout => 50,
error_strategy => ignore
})
end),
?assert(is_pid(Worker)),
timer:sleep(100),
?assertNot(erlang:is_process_alive(Worker)).11. default_await/5
-spec default_await(Worker, GroupName, Msg1, Msg2, Opts) -> Result
when
Worker :: pid(),
GroupName :: term(),
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Result :: term().Description: Default await function that blocks until receiving result or worker death notification. Used by devices that don't provide custom await logic. Note: Caller must set up a monitor on the Worker process before calling this function (done by await/4).
-module(hb_persistent_default_await_test).
-include_lib("eunit/include/eunit.hrl").
default_await_success_test() ->
TestPID = self(),
GroupName = default_await_test_group,
Msg2 = #{<<"path">> => <<"test">>},
Worker = spawn(fun() ->
receive
{resolve, Listener, GN, M2, _} ->
Listener ! {resolved, self(), GN, M2, {ok, <<"result">>}}
end
end),
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"await_success_test">>},
% Set up monitor (normally done by await/4)
erlang:monitor(process, Worker),
% Test process sends the resolve request (normally done by await/4)
Worker ! {resolve, TestPID, GroupName, Msg2, #{}},
% Now wait for the response
Result = hb_persistent:default_await(Worker, GroupName, Msg1, Msg2, #{}),
?assertEqual({ok, <<"result">>}, Result).
default_await_worker_death_test() ->
Worker = spawn(fun() -> timer:sleep(50), exit(killed) end),
Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"await_death_test2">>},
Msg2 = #{<<"path">> => <<"test">>},
% Set up monitor (normally done by await/4)
erlang:monitor(process, Worker),
Result = hb_persistent:default_await(Worker, default_await_death_group, Msg1, Msg2, #{}),
?assertEqual({error, leader_died}, Result).Common Patterns
%% Deduplicated execution
Msg1 = #{<<"device">> => <<"Counter@1.0">>},
Msg2 = #{<<"path">> => <<"increment">>},
Opts = #{await_inprogress => named},
case hb_persistent:find_or_register(Msg1, Msg2, Opts) of
{leader, GroupName} ->
% Execute the work
Result = hb_ao:resolve(Msg1, Msg2, Opts),
hb_persistent:unregister_notify(GroupName, Msg2, Result, Opts),
Result;
{wait, LeaderPID} ->
% Wait for leader to finish
hb_persistent:await(LeaderPID, Msg1, Msg2, Opts)
end.
%% Persistent worker for repeated executions
Msg1 = #{<<"device">> => <<"Process@1.0">>, <<"process">> => ProcID},
Worker = hb_persistent:start_worker(Msg1, #{
static_worker => true,
worker_timeout => 60000 % 1 minute
}),
% Later, send requests to the worker
Msg2 = #{<<"action">> => <<"cron">>},
Result = hb_persistent:await(Worker, Msg1, Msg2, #{}).
%% Custom grouper for fine-grained control
CustomDevice = #{
info => fun() ->
#{
grouper => fun(Msg1, _Msg2, _Opts) ->
% Group by process ID only
maps:get(<<"process">>, Msg1)
end
}
end
},
Msg1 = #{<<"device">> => CustomDevice, <<"process">> => <<"MyProc">>}.
%% Monitor all workers
Monitor = hb_persistent:start_monitor(),
% ... do work ...
hb_persistent:stop_monitor(Monitor).
%% Spawn worker after first execution
Msg1 = #{<<"device">> => <<"Process@1.0">>},
Msg2 = #{<<"action">> => <<"init">>},
Result = hb_ao:resolve(Msg1, Msg2, #{
spawn_worker => true,
static_worker => false % Creates new workers for each state
}),
% Worker now handles subsequent requests.Execution Deduplication
How It Works
% Process A starts execution
{leader, Group1} = find_or_register(Msg1, Msg2, Opts),
% Group1 = 123456789
% Process B arrives while A is executing
{wait, ProcessA} = find_or_register(Msg1, Msg2, Opts),
% Waits for Process A to finish
% Process A completes
unregister_notify(Group1, Msg2, Result, Opts),
% Sends Result to Process B
% Both A and B have the same resultBenefits
- Performance: Expensive operations executed once
- Consistency: All waiters get identical results
- Resource Management: Prevents parallel execution storms
- Serialization: Enforces sequential processing when needed
Worker Lifecycle
Static Worker (Re-registers with same group)
Start → Register(Group1) → Wait → Receive(Msg2) → Execute
→ Notify → Re-register(Group1) → Wait → ...Dynamic Worker (Re-registers with new group)
Start → Register(Group1) → Wait → Receive(Msg2) → Execute(→Msg3)
→ Notify → Re-register(Group3) → Wait → ...Timeout
Start → Register(Group) → Wait(10s) → Timeout → Unregister → ExitConfiguration Options
#{
% Enable/disable execution waiting
await_inprogress => false, % Never wait
await_inprogress => named, % Wait for named processes
await_inprogress => true, % Wait for all processes
% Worker behavior
static_worker => true, % Re-use same group name
static_worker => false, % Create new group after each execution
% Worker timeout
worker_timeout => 10000, % Milliseconds (default 10s)
% Spawn workers automatically
spawn_worker => true, % Create worker after first execution
spawn_worker => false, % Manual worker creation
% Error handling
error_strategy => ignore, % Continue on errors
error_strategy => throw, % Throw on errors
% Worker context
is_worker => true, % Running inside worker
allow_infinite => true % Allow infinite recursion detection
}Custom Device Integration
% Custom grouper function
#{
info => fun() ->
#{
grouper => fun(Msg1, Msg2, Opts) ->
% Custom group name logic
ProcessID = maps:get(<<"process">>, Msg1),
Action = maps:get(<<"action">>, Msg2),
{ProcessID, Action}
end
}
end
}
% Custom worker function
#{
info => fun() ->
#{
worker => fun(GroupName, Msg1, Opts) ->
% Custom worker logic
custom_worker_loop(GroupName, Msg1, Opts)
end
}
end
}
% Custom await function
#{
info => fun() ->
#{
await => fun(Worker, GroupName, Msg1, Msg2, Opts) ->
% Custom await logic with timeout
receive
{resolved, _, GroupName, Msg2, Res} -> Res
after 5000 ->
{error, timeout}
end
end
}
end
}Performance Characteristics
Deduplication Benefits
Without Deduplication:Request A: 1000ms execution
Request B: 1000ms execution (parallel)
Request C: 1000ms execution (parallel)
Total Wall Time: ~1000ms
Total CPU Time: 3000msRequest A: 1000ms execution (leader)
Request B: Waits for A
Request C: Waits for A
Total Wall Time: ~1000ms
Total CPU Time: 1000ms
Results: All identicalWorker Persistence Benefits
Without Workers:Request 1: Load state (100ms) + Execute (50ms) = 150ms
Request 2: Load state (100ms) + Execute (50ms) = 150ms
Request 3: Load state (100ms) + Execute (50ms) = 150ms
Total: 450msRequest 1: Load state (100ms) + Execute (50ms) = 150ms
Request 2: Execute (50ms) = 50ms (state in memory)
Request 3: Execute (50ms) = 50ms (state in memory)
Total: 250ms
Speedup: 1.8xReferences
- Process Groups - Erlang
pgmodule - AO-Core Resolution -
hb_ao.erl - Device System -
hb_ao_device.erl - Name Registration -
hb_name.erl - Configuration -
hb_opts.erl
Notes
- Process Groups: Uses Erlang's
pgmodule for distributed process registration - Hash Collisions: Default grouper uses
phash2which has collision risk - Recursion Detection: Detects when a process tries to wait for itself
- Worker Lifecycle: Workers timeout after configurable period of inactivity
- Static vs Dynamic: Static workers stay on same group, dynamic transition to new groups
- Monitor Output: Monitor prints process stats every second for debugging
- Notification: Workers notify all waiting processes when execution completes
- Error Handling: Configurable strategy for handling execution errors
- Performance: Significant benefits for expensive or repeated operations
- Distribution: Works across distributed Erlang nodes via
pg