dev_scheduler_registry.erl - Scheduler Process Registry
Overview
Purpose: Registry for local scheduler processes using the pg process group system
Module: dev_scheduler_registry
Name Service: hb_name (wrapper around pg)
Scope: Local node scheduling services
This module provides a simple registry for local scheduling services in AO. It manages the mapping between process IDs and their corresponding Erlang scheduler processes, enabling efficient lookup and lazy creation of scheduler servers.
Supported Operations
- Process Lookup: Find scheduler process by AO process ID
- Process Creation: Lazily create scheduler servers when needed
- Wallet Access: Get the node's scheduling wallet
- Process Listing: Enumerate all managed processes
Dependencies
- HyperBEAM:
hb,hb_name,hb_message,hb_test_utils - Scheduler:
dev_scheduler_server - Testing:
eunit - Includes:
include/hb.hrl
Public Functions Overview
%% Registry Control
-spec start() -> ok.
%% Process Lookup
-spec find(ProcID) -> pid() | not_found.
-spec find(ProcID, ProcMsgOrFalse) -> pid() | not_found.
-spec find(ProcID, ProcMsgOrFalse, Opts) -> pid() | not_found.
%% Information
-spec get_wallet() -> Wallet.
-spec get_processes() -> [ProcID].Public Functions
1. start/0
-spec start() -> ok.Description: Initialize the registry by starting the underlying name service (hb_name). Should be called before any other registry operations.
-module(dev_scheduler_registry_start_test).
-include_lib("eunit/include/eunit.hrl").
start_test() ->
?assertEqual(ok, dev_scheduler_registry:start()).
start_idempotent_test() ->
?assertEqual(ok, dev_scheduler_registry:start()),
?assertEqual(ok, dev_scheduler_registry:start()).2. find/1
-spec find(ProcID) -> pid() | not_found
when
ProcID :: binary().Description: Find a scheduler process by AO process ID. If the process is not registered, returns not_found without creating a new one.
-module(dev_scheduler_registry_find1_test).
-include_lib("eunit/include/eunit.hrl").
find_non_existent_test() ->
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => hb:wallet()
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"image">> => <<0:(1024*32)>>
}, Opts),
dev_scheduler_registry:start(),
?assertEqual(not_found, dev_scheduler_registry:find(hb_message:id(Proc, all))).3. find/2
-spec find(ProcID, ProcMsgOrFalse) -> pid() | not_found
when
ProcID :: binary(),
ProcMsgOrFalse :: map() | false.Description: Find a scheduler process by AO process ID. If ProcMsgOrFalse is a process message (not false) and no scheduler exists, creates a new scheduler server.
-module(dev_scheduler_registry_find2_test).
-include_lib("eunit/include/eunit.hrl").
find_without_creation_test() ->
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => hb:wallet()
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"image">> => <<0:(1024*32)>>
}, Opts),
dev_scheduler_registry:start(),
ID = hb_message:id(Proc, all),
?assertEqual(not_found, dev_scheduler_registry:find(ID, false)).
find_with_creation_test() ->
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => hb:wallet()
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"image">> => <<0:(1024*32)>>
}, Opts),
dev_scheduler_registry:start(),
ID = hb_message:id(Proc, all, Opts),
Pid1 = dev_scheduler_registry:find(ID, Proc),
?assert(is_pid(Pid1)),
?assert(is_process_alive(Pid1)),
% Verify same PID is returned on subsequent find
Pid2 = dev_scheduler_registry:find(ID, Proc),
?assertEqual(Pid1, Pid2).4. find/3
-spec find(ProcID, ProcMsgOrFalse, Opts) -> pid() | not_found
when
ProcID :: binary(),
ProcMsgOrFalse :: map() | false,
Opts :: map().Description: Same as find/2 but with additional options passed when spawning a new scheduler server.
-module(dev_scheduler_registry_find3_test).
-include_lib("eunit/include/eunit.hrl").
find_with_opts_test() ->
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => hb:wallet(),
scheduling_mode => local_confirmation
},
Proc = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"image">> => <<0:(1024*32)>>
}, Opts),
dev_scheduler_registry:start(),
ID = hb_message:id(Proc, all, Opts),
Pid = dev_scheduler_registry:find(ID, Proc, Opts),
?assert(is_pid(Pid)).5. get_wallet/0
-spec get_wallet() -> Wallet
when
Wallet :: {PrivKey, PubKey}.Description: Get the wallet used by the scheduler. Currently returns the node's main wallet via hb:wallet().
-module(dev_scheduler_registry_wallet_test).
-include_lib("eunit/include/eunit.hrl").
get_wallet_test() ->
Wallet = dev_scheduler_registry:get_wallet(),
?assert(is_tuple(Wallet)),
{Priv, Pub} = Wallet,
?assert(is_tuple(Priv) orelse is_binary(Priv)),
?assert(is_tuple(Pub)).6. get_processes/0
-spec get_processes() -> [ProcID]
when
ProcID :: binary().Description: Return a list of all currently registered process IDs managed by this scheduler node.
Test Code:-module(dev_scheduler_registry_processes_test).
-include_lib("eunit/include/eunit.hrl").
get_processes_after_creation_test() ->
Opts = #{
store => hb_test_utils:test_store(),
priv_wallet => hb:wallet()
},
% Use same image pattern as module's internal tests
Proc1 = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"image">> => <<0:(1024*32)>>
}, Opts),
Proc2 = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"image">> => <<0:(1024*32)>>
}, Opts),
dev_scheduler_registry:start(),
ID1 = hb_message:id(Proc1, all, Opts),
ID2 = hb_message:id(Proc2, all, Opts),
dev_scheduler_registry:find(ID1, Proc1, Opts),
dev_scheduler_registry:find(ID2, Proc2, Opts),
Processes = dev_scheduler_registry:get_processes(),
% Check that at least 2 processes are registered and our IDs are included
?assert(length(Processes) >= 2),
?assert(lists:member(ID1, Processes)),
?assert(lists:member(ID2, Processes)).Registry Key Format
The registry uses composite keys in the format:
{<<"scheduler@1.0">>, ProcID}This allows multiple device types to use the same name service without collision.
Common Patterns
%% Initialize and find/create scheduler
dev_scheduler_registry:start(),
Opts = #{
store => Store,
priv_wallet => Wallet,
scheduling_mode => local_confirmation
},
Process = hb_message:commit(#{
<<"type">> => <<"Process">>,
<<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
ProcID = hb_message:id(Process, all, Opts),
SchedulerPid = dev_scheduler_registry:find(ProcID, Process, Opts).
%% Check if process is being scheduled locally
case dev_scheduler_registry:find(ProcID) of
not_found ->
find_remote_scheduler(ProcID);
Pid when is_pid(Pid) ->
{local, Pid}
end.
%% List all managed processes
AllProcesses = dev_scheduler_registry:get_processes(),
lists:foreach(
fun(ProcID) ->
Pid = dev_scheduler_registry:find(ProcID),
Info = dev_scheduler_server:info(Pid),
io:format("Process ~s: slot ~p~n", [ProcID, maps:get(current, Info)])
end,
AllProcesses
).References
- Scheduler Server -
dev_scheduler_server.erl - Main Scheduler -
dev_scheduler.erl - Name Service -
hb_name.erl - Process Groups - Erlang
pgmodule
Notes
- Singleton Pattern: Each process ID maps to exactly one scheduler process
- Lazy Creation: Scheduler servers are created on first access with process message
- Global Wallet: Currently uses single node wallet for all schedulers
- Name Registration: Uses
hb_name:register/1for atomic registration - Process Linking: Scheduler servers are linked to their creators via
spawn_link - Idempotent Start:
start/0can be called multiple times safely - False vs Message: Passing
falseprevents automatic server creation - Options Passthrough: Options are forwarded to
dev_scheduler_server:start/3