Skip to content

dev_scheduler_registry.erl - Scheduler Process Registry

Overview

Purpose: Registry for local scheduler processes using the pg process group system
Module: dev_scheduler_registry
Name Service: hb_name (wrapper around pg)
Scope: Local node scheduling services

This module provides a simple registry for local scheduling services in AO. It manages the mapping between process IDs and their corresponding Erlang scheduler processes, enabling efficient lookup and lazy creation of scheduler servers.

Supported Operations

  • Process Lookup: Find scheduler process by AO process ID
  • Process Creation: Lazily create scheduler servers when needed
  • Wallet Access: Get the node's scheduling wallet
  • Process Listing: Enumerate all managed processes

Dependencies

  • HyperBEAM: hb, hb_name, hb_message, hb_test_utils
  • Scheduler: dev_scheduler_server
  • Testing: eunit
  • Includes: include/hb.hrl

Public Functions Overview

%% Registry Control
-spec start() -> ok.
 
%% Process Lookup
-spec find(ProcID) -> pid() | not_found.
-spec find(ProcID, ProcMsgOrFalse) -> pid() | not_found.
-spec find(ProcID, ProcMsgOrFalse, Opts) -> pid() | not_found.
 
%% Information
-spec get_wallet() -> Wallet.
-spec get_processes() -> [ProcID].

Public Functions

1. start/0

-spec start() -> ok.

Description: Initialize the registry by starting the underlying name service (hb_name). Should be called before any other registry operations.

Test Code:
-module(dev_scheduler_registry_start_test).
-include_lib("eunit/include/eunit.hrl").
 
start_test() ->
    ?assertEqual(ok, dev_scheduler_registry:start()).
 
start_idempotent_test() ->
    ?assertEqual(ok, dev_scheduler_registry:start()),
    ?assertEqual(ok, dev_scheduler_registry:start()).

2. find/1

-spec find(ProcID) -> pid() | not_found
    when
        ProcID :: binary().

Description: Find a scheduler process by AO process ID. If the process is not registered, returns not_found without creating a new one.

Test Code:
-module(dev_scheduler_registry_find1_test).
-include_lib("eunit/include/eunit.hrl").
 
find_non_existent_test() ->
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => hb:wallet()
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"image">> => <<0:(1024*32)>>
    }, Opts),
    dev_scheduler_registry:start(),
    ?assertEqual(not_found, dev_scheduler_registry:find(hb_message:id(Proc, all))).

3. find/2

-spec find(ProcID, ProcMsgOrFalse) -> pid() | not_found
    when
        ProcID :: binary(),
        ProcMsgOrFalse :: map() | false.

Description: Find a scheduler process by AO process ID. If ProcMsgOrFalse is a process message (not false) and no scheduler exists, creates a new scheduler server.

Test Code:
-module(dev_scheduler_registry_find2_test).
-include_lib("eunit/include/eunit.hrl").
 
find_without_creation_test() ->
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => hb:wallet()
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"image">> => <<0:(1024*32)>>
    }, Opts),
    dev_scheduler_registry:start(),
    ID = hb_message:id(Proc, all),
    ?assertEqual(not_found, dev_scheduler_registry:find(ID, false)).
 
find_with_creation_test() ->
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => hb:wallet()
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"image">> => <<0:(1024*32)>>
    }, Opts),
    dev_scheduler_registry:start(),
    ID = hb_message:id(Proc, all, Opts),
    Pid1 = dev_scheduler_registry:find(ID, Proc),
    ?assert(is_pid(Pid1)),
    ?assert(is_process_alive(Pid1)),
    % Verify same PID is returned on subsequent find
    Pid2 = dev_scheduler_registry:find(ID, Proc),
    ?assertEqual(Pid1, Pid2).

4. find/3

-spec find(ProcID, ProcMsgOrFalse, Opts) -> pid() | not_found
    when
        ProcID :: binary(),
        ProcMsgOrFalse :: map() | false,
        Opts :: map().

Description: Same as find/2 but with additional options passed when spawning a new scheduler server.

Test Code:
-module(dev_scheduler_registry_find3_test).
-include_lib("eunit/include/eunit.hrl").
 
find_with_opts_test() ->
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => hb:wallet(),
        scheduling_mode => local_confirmation
    },
    Proc = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"image">> => <<0:(1024*32)>>
    }, Opts),
    dev_scheduler_registry:start(),
    ID = hb_message:id(Proc, all, Opts),
    Pid = dev_scheduler_registry:find(ID, Proc, Opts),
    ?assert(is_pid(Pid)).

5. get_wallet/0

-spec get_wallet() -> Wallet
    when
        Wallet :: {PrivKey, PubKey}.

Description: Get the wallet used by the scheduler. Currently returns the node's main wallet via hb:wallet().

Test Code:
-module(dev_scheduler_registry_wallet_test).
-include_lib("eunit/include/eunit.hrl").
 
get_wallet_test() ->
    Wallet = dev_scheduler_registry:get_wallet(),
    ?assert(is_tuple(Wallet)),
    {Priv, Pub} = Wallet,
    ?assert(is_tuple(Priv) orelse is_binary(Priv)),
    ?assert(is_tuple(Pub)).

6. get_processes/0

-spec get_processes() -> [ProcID]
    when
        ProcID :: binary().

Description: Return a list of all currently registered process IDs managed by this scheduler node.

Test Code:
-module(dev_scheduler_registry_processes_test).
-include_lib("eunit/include/eunit.hrl").
 
get_processes_after_creation_test() ->
    Opts = #{
        store => hb_test_utils:test_store(),
        priv_wallet => hb:wallet()
    },
    % Use same image pattern as module's internal tests
    Proc1 = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"image">> => <<0:(1024*32)>>
    }, Opts),
    Proc2 = hb_message:commit(#{
        <<"type">> => <<"Process">>,
        <<"image">> => <<0:(1024*32)>>
    }, Opts),
    dev_scheduler_registry:start(),
    ID1 = hb_message:id(Proc1, all, Opts),
    ID2 = hb_message:id(Proc2, all, Opts),
    dev_scheduler_registry:find(ID1, Proc1, Opts),
    dev_scheduler_registry:find(ID2, Proc2, Opts),
    Processes = dev_scheduler_registry:get_processes(),
    % Check that at least 2 processes are registered and our IDs are included
    ?assert(length(Processes) >= 2),
    ?assert(lists:member(ID1, Processes)),
    ?assert(lists:member(ID2, Processes)).

Registry Key Format

The registry uses composite keys in the format:

{<<"scheduler@1.0">>, ProcID}

This allows multiple device types to use the same name service without collision.


Common Patterns

%% Initialize and find/create scheduler
dev_scheduler_registry:start(),
Opts = #{
    store => Store,
    priv_wallet => Wallet,
    scheduling_mode => local_confirmation
},
Process = hb_message:commit(#{
    <<"type">> => <<"Process">>,
    <<"scheduler">> => hb_util:human_id(ar_wallet:to_address(Wallet))
}, Opts),
ProcID = hb_message:id(Process, all, Opts),
SchedulerPid = dev_scheduler_registry:find(ProcID, Process, Opts).
 
%% Check if process is being scheduled locally
case dev_scheduler_registry:find(ProcID) of
    not_found -> 
        find_remote_scheduler(ProcID);
    Pid when is_pid(Pid) ->
        {local, Pid}
end.
 
%% List all managed processes
AllProcesses = dev_scheduler_registry:get_processes(),
lists:foreach(
    fun(ProcID) ->
        Pid = dev_scheduler_registry:find(ProcID),
        Info = dev_scheduler_server:info(Pid),
        io:format("Process ~s: slot ~p~n", [ProcID, maps:get(current, Info)])
    end,
    AllProcesses
).

References

  • Scheduler Server - dev_scheduler_server.erl
  • Main Scheduler - dev_scheduler.erl
  • Name Service - hb_name.erl
  • Process Groups - Erlang pg module

Notes

  1. Singleton Pattern: Each process ID maps to exactly one scheduler process
  2. Lazy Creation: Scheduler servers are created on first access with process message
  3. Global Wallet: Currently uses single node wallet for all schedulers
  4. Name Registration: Uses hb_name:register/1 for atomic registration
  5. Process Linking: Scheduler servers are linked to their creators via spawn_link
  6. Idempotent Start: start/0 can be called multiple times safely
  7. False vs Message: Passing false prevents automatic server creation
  8. Options Passthrough: Options are forwarded to dev_scheduler_server:start/3