Skip to content

hb_persistent.erl - Long-Lived AO-Core Resolution Processes

Overview

Purpose: Create and manage persistent worker processes for expensive or serialized message resolutions
Module: hb_persistent
Pattern: Process group-based execution deduplication with PG (distributed process groups)

This module creates long-lived Erlang processes for AO-Core message resolution. It's useful for expensive computations that should be shared across multiple requests, or when executions must be deliberately serialized to avoid parallel processing conflicts. Built on Erlang's pg module for distributed process group management.

Dependencies

  • HyperBEAM: hb_ao, hb_ao_device, hb_name, hb_maps, hb_opts
  • Erlang/OTP: pg (process groups), timer
  • Records: #tx{} from include/hb.hrl

Public Functions Overview

%% Monitor Management
-spec start_monitor() -> PID.
-spec start_monitor(Group) -> PID.
-spec stop_monitor(PID) -> stop.
 
%% Execution Registration
-spec find_or_register(Msg1, Msg2, Opts) -> {leader | wait | infinite_recursion, Term}.
-spec unregister_notify(GroupName, Msg2, Msg3, Opts) -> ok.
 
%% Waiting and Notification
-spec await(Worker, Msg1, Msg2, Opts) -> Result.
-spec notify(GroupName, Msg2, Msg3, Opts) -> ok.
 
%% Worker Management
-spec start_worker(Msg1, Opts) -> PID.
-spec start_worker(WorkerFun, Msg1, Opts) -> PID.
-spec forward_work(Msg, Opts) -> ok.
 
%% Group Management
-spec group(Msg1, Msg2, Opts) -> GroupName.
-spec default_grouper(Msg1, Msg2, Opts) -> GroupName.
-spec default_worker(GroupName, Msg1, Opts) -> never_returns.
-spec default_await(Worker, GroupName, Msg1, Msg2, Opts) -> Result.

Public Functions

1. start_monitor/0, start_monitor/1, stop_monitor/1

-spec start_monitor() -> PID
    when
        PID :: pid().
 
-spec start_monitor(Group) -> PID
    when
        Group :: global | term(),
        PID :: pid().
 
-spec stop_monitor(PID) -> stop
    when
        PID :: pid().

Description: Start/stop a monitoring process that prints process group statistics every second. Useful for debugging and observing worker activity. Note: stop_monitor/1 returns the atom stop (the message sent to the monitor process). The monitor checks for stop messages after each 1-second sleep interval.

Monitor Output:
== Sitrep ==> 5 named processes. 2 changes.
[my_process: <0.123.0>] #M: 3
[other_process: <0.124.0>] #M: 0
Test Code:
-module(hb_persistent_monitor_test).
-include_lib("eunit/include/eunit.hrl").
 
%% Helper function for tests - defined once, used by all concatenated tests
test_device() ->
    #{
        info => fun() ->
            #{
                grouper => fun(M1, _M2, _Opts) ->
                    erlang:phash2(M1)
                end
            }
        end
    }.
 
start_stop_monitor_test() ->
    Monitor = hb_persistent:start_monitor(),
    ?assert(is_pid(Monitor)),
    ?assert(erlang:is_process_alive(Monitor)),
    % stop_monitor/1 returns the message sent (stop), not ok
    stop = hb_persistent:stop_monitor(Monitor),
    % Monitor sleeps 1000ms between checking for stop message
    timer:sleep(1200),
    ?assertNot(erlang:is_process_alive(Monitor)).
 
start_monitor_specific_group_test() ->
    Monitor = hb_persistent:start_monitor(my_group),
    ?assert(is_pid(Monitor)),
    hb_persistent:stop_monitor(Monitor).

2. find_or_register/3

-spec find_or_register(Msg1, Msg2, Opts) -> {Status, GroupName}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        Status :: leader | wait | infinite_recursion,
        GroupName :: term().

Description: Register as the leader for an execution if none exists, otherwise return the existing leader to wait for. Prevents duplicate execution of expensive operations.

Return Values:
  • {leader, GroupName}: This process should execute
  • {wait, LeaderPID}: Another process is executing, wait for result
  • {infinite_recursion, GroupName}: This process is already the leader (recursion detected)
Test Code:
-module(hb_persistent_find_test).
-include_lib("eunit/include/eunit.hrl").
 
find_or_register_leader_test() ->
    % Use unique path to avoid conflicts with other tests
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"find_leader_test">>},
    Msg2 = #{<<"path">> => <<"test">>},
    Opts = #{await_inprogress => named},
    {Status, GroupName} = hb_persistent:find_or_register(Msg1, Msg2, Opts),
    ?assertEqual(leader, Status),
    % Clean up: unregister so subsequent tests don't see us as leader
    hb_persistent:unregister_notify(GroupName, Msg2, {ok, #{}}, Opts).
 
find_or_register_disabled_test() ->
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"find_disabled_test">>},
    Msg2 = #{<<"path">> => <<"test">>},
    Opts = #{await_inprogress => false},
    {Status, _} = hb_persistent:find_or_register(Msg1, Msg2, Opts),
    % Always becomes leader when await disabled
    ?assertEqual(leader, Status).

3. unregister_notify/4

-spec unregister_notify(GroupName, Msg2, Msg3, Opts) -> ok
    when
        GroupName :: term() | ungrouped_exec,
        Msg2 :: map(),
        Msg3 :: term(),
        Opts :: map().

Description: Unregister as the leader for an execution and notify all waiting processes of completion. Called after execution finishes.

Test Code:
-module(hb_persistent_unregister_test).
-include_lib("eunit/include/eunit.hrl").
 
unregister_notify_test() ->
    % Use unique id to avoid conflicts with other tests
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"unregister_test">>},
    Msg2 = #{<<"path">> => <<"test">>},
    Opts = #{await_inprogress => named},
    {leader, GroupName} = hb_persistent:find_or_register(Msg1, Msg2, Opts),
    Result = {ok, #{<<"result">> => <<"success">>}},
    ok = hb_persistent:unregister_notify(GroupName, Msg2, Result, Opts).
 
unregister_ungrouped_test() ->
    % ungrouped_exec is a no-op
    ?assertEqual(ok, hb_persistent:unregister_notify(ungrouped_exec, #{}, {ok, #{}}, #{})).

4. await/4

-spec await(Worker, Msg1, Msg2, Opts) -> Result
    when
        Worker :: pid(),
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        Result :: term().

Description: Wait for a worker process to complete execution. Registers with the worker and blocks until result is received or worker dies.

Test Code:
-module(hb_persistent_await_test).
-include_lib("eunit/include/eunit.hrl").
 
await_worker_death_test() ->
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"await_death_test">>},
    Msg2 = #{<<"path">> => <<"test">>},
    
    Worker = spawn(fun() -> exit(normal) end),
    timer:sleep(10),
    
    Result = hb_persistent:await(Worker, Msg1, Msg2, #{}),
    ?assertEqual({error, leader_died}, Result).

5. notify/4

-spec notify(GroupName, Msg2, Msg3, Opts) -> ok
    when
        GroupName :: term(),
        Msg2 :: map(),
        Msg3 :: term(),
        Opts :: map().

Description: Check inbox for waiting processes and send them the execution result. Called automatically by unregister_notify/4.

Test Code:
-module(hb_persistent_notify_test).
-include_lib("eunit/include/eunit.hrl").
 
notify_waiting_processes_test() ->
    GroupName = notify_test_group,
    Msg2 = #{<<"path">> => <<"test">>},
    Result = {ok, #{<<"result">> => <<"done">>}},
    
    % Send a resolve request to ourselves (simulating a waiting process)
    % The Listener should be self() so we receive the resolved message
    TestPID = self(),
    self() ! {resolve, TestPID, GroupName, Msg2, #{}},
    
    % notify/4 checks inbox for {resolve, Listener, ...} and sends {resolved, ...} to Listener
    ok = hb_persistent:notify(GroupName, Msg2, Result, #{}),
    
    % Check we received the result
    receive
        {resolved, _, GroupName, Msg2, Result} -> ok
    after 100 -> ?assert(false)
    end.

6. start_worker/2, start_worker/3

-spec start_worker(Msg1, Opts) -> PID
    when
        Msg1 :: map(),
        Opts :: map(),
        PID :: pid().
 
-spec start_worker(WorkerFun, Msg1, Opts) -> PID
    when
        WorkerFun :: function(),
        Msg1 :: map(),
        Opts :: map(),
        PID :: pid().

Description: Spawn a persistent worker process. The 2-arity version uses the device's worker function or default_worker/3. The 3-arity version uses a custom worker function.

Worker Behavior:
  • Registers itself with a group name
  • Waits for incoming execution requests
  • Processes requests and notifies waiters
  • Re-registers for next execution (if configured)
Test Code:
-module(hb_persistent_worker_test).
-include_lib("eunit/include/eunit.hrl").
 
start_worker_basic_test() ->
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"worker_basic_test">>},
    Worker = hb_persistent:start_worker(Msg1, #{static_worker => true}),
    ?assert(is_pid(Worker)),
    ?assert(erlang:is_process_alive(Worker)).
 
start_worker_custom_function_test() ->
    CustomWorker = fun(_GroupName, _Msg, _Opts) ->
        receive stop -> ok end
    end,
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"worker_custom_test">>},
    Worker = hb_persistent:start_worker(CustomWorker, Msg1, #{}),
    ?assert(is_pid(Worker)).

7. forward_work/2

-spec forward_work(Msg, Opts) -> ok
    when
        Msg :: map(),
        Opts :: map().

Description: Forward work to the appropriate worker if spawn_worker option is set. Creates worker processes dynamically.

Test Code:
-module(hb_persistent_forward_test).
-include_lib("eunit/include/eunit.hrl").
 
forward_work_noop_test() ->
    % forward_work is a no-op when spawn_worker is false
    Msg = #{<<"device">> => test_device(), <<"id">> => <<"forward_noop_test">>},
    Opts = #{spawn_worker => false},
    ok = hb_persistent:forward_work(Msg, Opts).

8. group/3

-spec group(Msg1, Msg2, Opts) -> GroupName
    when
        Msg1 :: map(),
        Msg2 :: map() | undefined,
        Opts :: map(),
        GroupName :: term().

Description: Calculate the group name for a message pair. Uses the device's custom grouper function if available, otherwise uses default_grouper/3.

Test Code:
-module(hb_persistent_group_test).
-include_lib("eunit/include/eunit.hrl").
 
group_default_test() ->
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"group_default_test">>},
    Msg2 = #{<<"path">> => <<"execute">>},
    Opts = #{await_inprogress => true},
    Group = hb_persistent:group(Msg1, Msg2, Opts),
    ?assert(is_integer(Group)).
 
group_disabled_test() ->
    % Test default_grouper directly since test_device() has custom grouper
    Msg1 = #{<<"id">> => <<"group_disabled_test">>},
    Msg2 = #{<<"path">> => <<"execute">>},
    Opts = #{await_inprogress => false},
    ?assertEqual(ungrouped_exec, hb_persistent:default_grouper(Msg1, Msg2, Opts)).
 
group_custom_grouper_test() ->
    CustomDevice = #{
        info => fun() ->
            #{grouper => fun(M1, _M2, _Opts) -> {custom, M1} end}
        end
    },
    Msg1 = #{<<"device">> => CustomDevice, <<"id">> => <<"group_custom_test">>},
    Msg2 = #{<<"path">> => <<"test">>},
    Group = hb_persistent:group(Msg1, Msg2, #{}),
    ?assertMatch({custom, _}, Group).

9. default_grouper/3

-spec default_grouper(Msg1, Msg2, Opts) -> GroupName
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        GroupName :: integer() | ungrouped_exec.

Description: Default group name generator using phash2 hash of message pair (excluding priv fields). Returns ungrouped_exec when await_inprogress is disabled.

Warning: Hash collisions possible - not suitable for production without larger hash range.

Test Code:
-module(hb_persistent_grouper_test).
-include_lib("eunit/include/eunit.hrl").
 
default_grouper_deterministic_test() ->
    Msg1 = #{<<"key">> => <<"value1">>},
    Msg2 = #{<<"path">> => <<"test">>},
    Opts = #{await_inprogress => true},
    Group1 = hb_persistent:default_grouper(Msg1, Msg2, Opts),
    Group2 = hb_persistent:default_grouper(Msg1, Msg2, Opts),
    ?assertEqual(Group1, Group2).
 
default_grouper_different_messages_test() ->
    Msg1 = #{<<"key">> => <<"value1">>},
    Msg2a = #{<<"path">> => <<"test1">>},
    Msg2b = #{<<"path">> => <<"test2">>},
    Opts = #{await_inprogress => true},
    Group1 = hb_persistent:default_grouper(Msg1, Msg2a, Opts),
    Group2 = hb_persistent:default_grouper(Msg1, Msg2b, Opts),
    ?assertNotEqual(Group1, Group2).
 
default_grouper_ignores_priv_test() ->
    Msg1a = #{<<"key">> => <<"value">>, <<"priv">> => #{<<"data">> => <<"a">>}},
    Msg1b = #{<<"key">> => <<"value">>, <<"priv">> => #{<<"data">> => <<"b">>}},
    Msg2 = #{<<"path">> => <<"test">>},
    Opts = #{await_inprogress => true},
    Group1 = hb_persistent:default_grouper(Msg1a, Msg2, Opts),
    Group2 = hb_persistent:default_grouper(Msg1b, Msg2, Opts),
    ?assertEqual(Group1, Group2).

10. default_worker/3

-spec default_worker(GroupName, Msg1, Opts) -> never_returns
    when
        GroupName :: term(),
        Msg1 :: map(),
        Opts :: map().

Description: Default worker server function. Waits for incoming requests, executes them, notifies waiters, and either re-registers for the same group (static) or new group (dynamic).

Behavior:
  1. Wait for {resolve, Listener, GroupName, Msg2, ListenerOpts} message
  2. Execute hb_ao:resolve(Msg1, Msg2, Opts)
  3. Send result to listener
  4. Notify all waiting processes
  5. Re-register and loop (or exit after timeout)
Configuration:
  • static_worker => true: Re-register with same GroupName
  • static_worker => false: Re-register with new Msg3-based GroupName
  • worker_timeout: Milliseconds before unregistering (default 10000)
Test Code:
-module(hb_persistent_default_worker_test).
-include_lib("eunit/include/eunit.hrl").
 
default_worker_timeout_test() ->
    % Test that worker unregisters after timeout
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"default_worker_timeout_test">>},
    GroupName = erlang:phash2(Msg1),
    
    % Start worker with short timeout (it will timeout and exit)
    Worker = spawn(fun() ->
        hb_persistent:default_worker(GroupName, Msg1, #{
            worker_timeout => 50,
            error_strategy => ignore
        })
    end),
    
    ?assert(is_pid(Worker)),
    timer:sleep(100),
    ?assertNot(erlang:is_process_alive(Worker)).

11. default_await/5

-spec default_await(Worker, GroupName, Msg1, Msg2, Opts) -> Result
    when
        Worker :: pid(),
        GroupName :: term(),
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        Result :: term().

Description: Default await function that blocks until receiving result or worker death notification. Used by devices that don't provide custom await logic. Note: Caller must set up a monitor on the Worker process before calling this function (done by await/4).

Test Code:
-module(hb_persistent_default_await_test).
-include_lib("eunit/include/eunit.hrl").
 
default_await_success_test() ->
    TestPID = self(),
    GroupName = default_await_test_group,
    Msg2 = #{<<"path">> => <<"test">>},
    
    Worker = spawn(fun() ->
        receive
            {resolve, Listener, GN, M2, _} ->
                Listener ! {resolved, self(), GN, M2, {ok, <<"result">>}}
        end
    end),
    
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"await_success_test">>},
    
    % Set up monitor (normally done by await/4)
    erlang:monitor(process, Worker),
    % Test process sends the resolve request (normally done by await/4)
    Worker ! {resolve, TestPID, GroupName, Msg2, #{}},
    
    % Now wait for the response
    Result = hb_persistent:default_await(Worker, GroupName, Msg1, Msg2, #{}),
    ?assertEqual({ok, <<"result">>}, Result).
 
default_await_worker_death_test() ->
    Worker = spawn(fun() -> timer:sleep(50), exit(killed) end),
    Msg1 = #{<<"device">> => test_device(), <<"id">> => <<"await_death_test2">>},
    Msg2 = #{<<"path">> => <<"test">>},
    % Set up monitor (normally done by await/4)
    erlang:monitor(process, Worker),
    Result = hb_persistent:default_await(Worker, default_await_death_group, Msg1, Msg2, #{}),
    ?assertEqual({error, leader_died}, Result).

Common Patterns

%% Deduplicated execution
Msg1 = #{<<"device">> => <<"Counter@1.0">>},
Msg2 = #{<<"path">> => <<"increment">>},
Opts = #{await_inprogress => named},
 
case hb_persistent:find_or_register(Msg1, Msg2, Opts) of
    {leader, GroupName} ->
        % Execute the work
        Result = hb_ao:resolve(Msg1, Msg2, Opts),
        hb_persistent:unregister_notify(GroupName, Msg2, Result, Opts),
        Result;
    {wait, LeaderPID} ->
        % Wait for leader to finish
        hb_persistent:await(LeaderPID, Msg1, Msg2, Opts)
end.
 
%% Persistent worker for repeated executions
Msg1 = #{<<"device">> => <<"Process@1.0">>, <<"process">> => ProcID},
Worker = hb_persistent:start_worker(Msg1, #{
    static_worker => true,
    worker_timeout => 60000  % 1 minute
}),
 
% Later, send requests to the worker
Msg2 = #{<<"action">> => <<"cron">>},
Result = hb_persistent:await(Worker, Msg1, Msg2, #{}).
 
%% Custom grouper for fine-grained control
CustomDevice = #{
    info => fun() ->
        #{
            grouper => fun(Msg1, _Msg2, _Opts) ->
                % Group by process ID only
                maps:get(<<"process">>, Msg1)
            end
        }
    end
},
Msg1 = #{<<"device">> => CustomDevice, <<"process">> => <<"MyProc">>}.
 
%% Monitor all workers
Monitor = hb_persistent:start_monitor(),
% ... do work ...
hb_persistent:stop_monitor(Monitor).
 
%% Spawn worker after first execution
Msg1 = #{<<"device">> => <<"Process@1.0">>},
Msg2 = #{<<"action">> => <<"init">>},
Result = hb_ao:resolve(Msg1, Msg2, #{
    spawn_worker => true,
    static_worker => false  % Creates new workers for each state
}),
% Worker now handles subsequent requests.

Execution Deduplication

How It Works

% Process A starts execution
{leader, Group1} = find_or_register(Msg1, Msg2, Opts),
% Group1 = 123456789
 
% Process B arrives while A is executing
{wait, ProcessA} = find_or_register(Msg1, Msg2, Opts),
% Waits for Process A to finish
 
% Process A completes
unregister_notify(Group1, Msg2, Result, Opts),
% Sends Result to Process B
 
% Both A and B have the same result

Benefits

  1. Performance: Expensive operations executed once
  2. Consistency: All waiters get identical results
  3. Resource Management: Prevents parallel execution storms
  4. Serialization: Enforces sequential processing when needed

Worker Lifecycle

Static Worker (Re-registers with same group)

Start → Register(Group1) → Wait → Receive(Msg2) → Execute 
    → Notify → Re-register(Group1) → Wait → ...

Dynamic Worker (Re-registers with new group)

Start → Register(Group1) → Wait → Receive(Msg2) → Execute(→Msg3)
    → Notify → Re-register(Group3) → Wait → ...

Timeout

Start → Register(Group) → Wait(10s) → Timeout → Unregister → Exit

Configuration Options

#{
    % Enable/disable execution waiting
    await_inprogress => false,     % Never wait
    await_inprogress => named,     % Wait for named processes
    await_inprogress => true,      % Wait for all processes
    
    % Worker behavior
    static_worker => true,         % Re-use same group name
    static_worker => false,        % Create new group after each execution
    
    % Worker timeout
    worker_timeout => 10000,       % Milliseconds (default 10s)
    
    % Spawn workers automatically
    spawn_worker => true,          % Create worker after first execution
    spawn_worker => false,         % Manual worker creation
    
    % Error handling
    error_strategy => ignore,      % Continue on errors
    error_strategy => throw,       % Throw on errors
    
    % Worker context
    is_worker => true,             % Running inside worker
    allow_infinite => true         % Allow infinite recursion detection
}

Custom Device Integration

% Custom grouper function
#{
    info => fun() ->
        #{
            grouper => fun(Msg1, Msg2, Opts) ->
                % Custom group name logic
                ProcessID = maps:get(<<"process">>, Msg1),
                Action = maps:get(<<"action">>, Msg2),
                {ProcessID, Action}
            end
        }
    end
}
 
% Custom worker function
#{
    info => fun() ->
        #{
            worker => fun(GroupName, Msg1, Opts) ->
                % Custom worker logic
                custom_worker_loop(GroupName, Msg1, Opts)
            end
        }
    end
}
 
% Custom await function
#{
    info => fun() ->
        #{
            await => fun(Worker, GroupName, Msg1, Msg2, Opts) ->
                % Custom await logic with timeout
                receive
                    {resolved, _, GroupName, Msg2, Res} -> Res
                after 5000 ->
                    {error, timeout}
                end
            end
        }
    end
}

Performance Characteristics

Deduplication Benefits

Without Deduplication:
Request A: 1000ms execution
Request B: 1000ms execution (parallel)
Request C: 1000ms execution (parallel)
Total Wall Time: ~1000ms
Total CPU Time: 3000ms
With Deduplication:
Request A: 1000ms execution (leader)
Request B: Waits for A
Request C: Waits for A
Total Wall Time: ~1000ms
Total CPU Time: 1000ms
Results: All identical

Worker Persistence Benefits

Without Workers:
Request 1: Load state (100ms) + Execute (50ms) = 150ms
Request 2: Load state (100ms) + Execute (50ms) = 150ms
Request 3: Load state (100ms) + Execute (50ms) = 150ms
Total: 450ms
With Workers:
Request 1: Load state (100ms) + Execute (50ms) = 150ms
Request 2: Execute (50ms) = 50ms (state in memory)
Request 3: Execute (50ms) = 50ms (state in memory)
Total: 250ms
Speedup: 1.8x

References

  • Process Groups - Erlang pg module
  • AO-Core Resolution - hb_ao.erl
  • Device System - hb_ao_device.erl
  • Name Registration - hb_name.erl
  • Configuration - hb_opts.erl

Notes

  1. Process Groups: Uses Erlang's pg module for distributed process registration
  2. Hash Collisions: Default grouper uses phash2 which has collision risk
  3. Recursion Detection: Detects when a process tries to wait for itself
  4. Worker Lifecycle: Workers timeout after configurable period of inactivity
  5. Static vs Dynamic: Static workers stay on same group, dynamic transition to new groups
  6. Monitor Output: Monitor prints process stats every second for debugging
  7. Notification: Workers notify all waiting processes when execution completes
  8. Error Handling: Configurable strategy for handling execution errors
  9. Performance: Significant benefits for expensive or repeated operations
  10. Distribution: Works across distributed Erlang nodes via pg