Skip to content

hb_process_monitor.erl - Process Cron Monitoring

Overview

Purpose: Monitor and execute scheduled cron tasks for AO processes
Module: hb_process_monitor
Pattern: Ticker-based polling with message pushing

This module provides a lightweight monitoring system for AO process cron jobs. It periodically polls for scheduled tasks (cron messages) and pushes them to the message unit (MU) for execution. Built with a simple server-ticker architecture.

Dependencies

  • HyperBEAM: hb_opts, hb_client, hb_logger, dev_mu
  • Erlang/OTP: timer
  • Records: Internal state record

Public Functions Overview

%% Monitor Management
-spec start(ProcID) -> {MonitorPID, LoggerPID}.
-spec start(ProcID, Rate) -> {MonitorPID, LoggerPID}.
-spec start(ProcID, Rate, Cursor) -> {MonitorPID, LoggerPID}.
-spec stop(PID) -> stop.

Public Functions

1. start/1, start/2, start/3

-spec start(ProcID) -> {MonitorPID, LoggerPID}
    when
        ProcID :: binary(),
        MonitorPID :: pid(),
        LoggerPID :: pid().
 
-spec start(ProcID, Rate) -> {MonitorPID, LoggerPID}
    when
        ProcID :: binary(),
        Rate :: integer(),
        MonitorPID :: pid(),
        LoggerPID :: pid().
 
-spec start(ProcID, Rate, Cursor) -> {MonitorPID, LoggerPID}
    when
        ProcID :: binary(),
        Rate :: integer(),
        Cursor :: binary() | undefined,
        MonitorPID :: pid(),
        LoggerPID :: pid().

Description: Start monitoring a process for cron messages. Creates a server process and a ticker that polls at the specified rate.

Parameters:
  • ProcID: Process ID to monitor (required)
  • Rate: Polling interval in milliseconds (default from default_cron_rate)
  • Cursor: Starting cursor for cron pagination (default from hb_client:cron_cursor/1)

Returns: Tuple of {MonitorPID, LoggerPID} for control and logging

Test Code:
-module(hb_process_monitor_start_test).
-include_lib("eunit/include/eunit.hrl").
 
%% start/1 and start/2 call hb_client:cron_cursor/1 which may not be available
%% Use start/3 with explicit cursor for reliable testing
 
start_with_cursor_test() ->
    ProcID = <<"test-process-3">>,
    Rate = 1000,
    Cursor = <<"initial-cursor">>,
    {Monitor, Logger} = hb_process_monitor:start(ProcID, Rate, Cursor),
    ?assert(is_pid(Monitor)),
    ?assert(is_pid(Logger)),
    ?assert(erlang:is_process_alive(Monitor)),
    ?assert(erlang:is_process_alive(Logger)),
    hb_process_monitor:stop(Monitor).
 
start_with_undefined_cursor_test() ->
    ProcID = <<"test-process-4">>,
    Rate = 2000,
    {Monitor, Logger} = hb_process_monitor:start(ProcID, Rate, undefined),
    ?assert(is_pid(Monitor)),
    ?assert(is_pid(Logger)),
    hb_process_monitor:stop(Monitor).
 
start_multiple_monitors_test() ->
    ProcID1 = <<"process-1">>,
    ProcID2 = <<"process-2">>,
    Rate = 1000,
    Cursor = <<"cursor">>,
    {Mon1, Log1} = hb_process_monitor:start(ProcID1, Rate, Cursor),
    {Mon2, Log2} = hb_process_monitor:start(ProcID2, Rate, Cursor),
    ?assertNotEqual(Mon1, Mon2),
    ?assertNotEqual(Log1, Log2),
    ?assert(erlang:is_process_alive(Mon1)),
    ?assert(erlang:is_process_alive(Mon2)),
    hb_process_monitor:stop(Mon1),
    hb_process_monitor:stop(Mon2).
 
start_exports_test() ->
    code:ensure_loaded(hb_process_monitor),
    ?assert(erlang:function_exported(hb_process_monitor, start, 1)),
    ?assert(erlang:function_exported(hb_process_monitor, start, 2)),
    ?assert(erlang:function_exported(hb_process_monitor, start, 3)).

2. stop/1

-spec stop(PID) -> stop
    when
        PID :: pid().

Description: Stop a monitor process. Sends a stop message to the monitor server, causing it to exit gracefully. Returns the stop atom (the message sent).

Test Code:
-module(hb_process_monitor_stop_test).
-include_lib("eunit/include/eunit.hrl").
 
stop_monitor_test() ->
    ProcID = <<"test-process">>,
    Rate = 1000,
    Cursor = <<"cursor">>,
    {Monitor, _Logger} = hb_process_monitor:start(ProcID, Rate, Cursor),
    ?assert(erlang:is_process_alive(Monitor)),
    
    stop = hb_process_monitor:stop(Monitor),
    timer:sleep(100),
    
    ?assertNot(erlang:is_process_alive(Monitor)).
 
stop_idempotent_test() ->
    ProcID = <<"test-process">>,
    Rate = 1000,
    Cursor = <<"cursor">>,
    {Monitor, _Logger} = hb_process_monitor:start(ProcID, Rate, Cursor),
    
    stop = hb_process_monitor:stop(Monitor),
    timer:sleep(50),
    % Stopping again should not crash (message goes to dead process)
    stop = hb_process_monitor:stop(Monitor).
 
stop_returns_ok_test() ->
    ProcID = <<"test-process-stop">>,
    Rate = 1000,
    {Monitor, _Logger} = hb_process_monitor:start(ProcID, Rate, <<"cursor">>),
    
    % stop/1 sends message and returns the message (stop atom)
    Result = hb_process_monitor:stop(Monitor),
    ?assertEqual(stop, Result).

Internal Architecture

State Record

-record(state, {
    proc_id,   % Process ID being monitored
    cursor,    % Current pagination cursor
    logger     % Logger process PID
}).

Server Loop

server(State) ->
    receive
        stop -> 
            ok;  % Exit gracefully
        tick ->
            server(handle_crons(State))  % Process cron messages
    end.

Ticker Loop

ticker(Monitor, Rate) ->
    case erlang:is_process_alive(Monitor) of
        true ->
            timer:sleep(Rate),
            Monitor ! tick,
            ticker(Monitor, Rate);
        false ->
            ok  % Monitor died, exit
    end.

Cron Handling Flow

handle_crons/1

handle_crons(State) ->
    case hb_client:cron(State#state.proc_id, State#state.cursor) of
        {ok, HasNextPage, Results, NewCursor} ->
            % Push each result to MU
            lists:map(
                fun(Res) ->
                    dev_mu:push(#{message => Res}, State)
                end,
                Results
            ),
            NewState = State#state{cursor = NewCursor},
            case HasNextPage of
                true -> NewState;           % Done for now
                false -> handle_crons(NewState)  % Recursively fetch more
            end;
        Error ->
            hb_logger:log(State#state.logger, Error),
            State  % Keep original state on error
    end.

Flow Diagram

Start

Ticker sends 'tick' every Rate ms

Server receives 'tick'

Call hb_client:cron(ProcID, Cursor)

Receive {ok, HasNextPage, Results, NewCursor}

For each Result:
  Push to dev_mu

Update State with NewCursor

If HasNextPage = false:
  Recursively call handle_crons (fetch more pages)

Return to server loop

Common Patterns

%% Start monitoring a process with default settings
{Monitor, Logger} = hb_process_monitor:start(<<"MyProcess-ID">>),
 
% Monitor runs in background, pushing cron messages
% Logger captures events and errors
 
% Later, stop the monitor
hb_process_monitor:stop(Monitor).
 
%% Custom polling rate (every 30 seconds)
Rate = 30000,  % milliseconds
{Monitor, Logger} = hb_process_monitor:start(<<"MyProcess-ID">>, Rate),
 
%% Resume from specific cursor
Cursor = <<"saved-cursor-from-previous-run">>,
{Monitor, Logger} = hb_process_monitor:start(
    <<"MyProcess-ID">>,
    10000,  % 10 seconds
    Cursor
),
 
%% Monitor multiple processes
Processes = [<<"Proc1">>, <<"Proc2">>, <<"Proc3">>],
Monitors = lists:map(
    fun(ProcID) ->
        {Mon, Log} = hb_process_monitor:start(ProcID),
        {ProcID, Mon, Log}
    end,
    Processes
),
 
% Stop all monitors
lists:foreach(
    fun({_ProcID, Mon, _Log}) ->
        hb_process_monitor:stop(Mon)
    end,
    Monitors
).
 
%% Check monitor status
{Monitor, Logger} = hb_process_monitor:start(<<"MyProcess">>),
case erlang:is_process_alive(Monitor) of
    true -> io:format("Monitor running~n");
    false -> io:format("Monitor stopped~n")
end.

Configuration

Default Cron Rate

The default polling rate is retrieved from options:

Rate = hb_opts:get(default_cron_rate),
Setting in config:
% In config.json or config.flat
#{
    default_cron_rate => 10000  % 10 seconds
}

Cursor Management

The initial cursor is fetched from the client:

Cursor = hb_client:cron_cursor(ProcID),

This allows resuming from the last processed cron message.


Integration with Other Modules

hb_client

Used to fetch cron messages:

{ok, HasNextPage, Results, NewCursor} = hb_client:cron(ProcID, Cursor),
Response:
  • HasNextPage: true if more pages available, false otherwise
  • Results: List of cron messages to process
  • NewCursor: Updated cursor for next request

dev_mu

Used to push messages for execution:

dev_mu:push(#{message => CronMessage}, State),

This queues the cron message for processing by the message unit.

hb_logger

Used for event logging:

% Start logger
Logger = hb_logger:start(),
 
% Register processes with logger
hb_logger:register(Monitor),
hb_logger:register(Ticker),
 
% Log events
hb_logger:log(Monitor, {ok, started_monitor, {ProcID, Rate, Cursor}}),
 
% Log errors
hb_logger:log(Logger, Error),

Error Handling

Cron Fetch Errors

case hb_client:cron(ProcID, Cursor) of
    {ok, HasNextPage, Results, NewCursor} ->
        % Process normally
        handle_results(Results, NewState);
    Error ->
        % Log error and continue with old state
        hb_logger:log(Logger, Error),
        State  % Don't update cursor on error
end
Error Types:
  • Network errors from hb_client
  • Invalid process ID
  • Malformed cursor
  • Rate limiting

Ticker Exit Behavior

case erlang:is_process_alive(Monitor) of
    true ->
        % Continue ticking
        timer:sleep(Rate),
        Monitor ! tick,
        ticker(Monitor, Rate);
    false ->
        % Monitor died, clean exit
        ok
end

The ticker automatically stops when the monitor process dies.


Performance Considerations

Polling Rate Selection

Fast Polling (1-5 seconds):
  • Pros: Low latency for cron execution
  • Cons: Higher network/CPU usage
  • Use Case: Time-sensitive processes
Medium Polling (10-30 seconds):
  • Pros: Balanced performance
  • Cons: Moderate latency
  • Use Case: Most processes (recommended)
Slow Polling (60+ seconds):
  • Pros: Minimal resource usage
  • Cons: Higher latency
  • Use Case: Low-priority background tasks

Pagination

The monitor recursively fetches all available pages when HasNextPage = false:

case HasNextPage of
    true -> 
        % Done for now, wait for next tick
        NewState;
    false -> 
        % More pages available, fetch immediately
        handle_crons(NewState)
end

This ensures all pending cron messages are processed before waiting.

Resource Usage

Per Monitor:
  • 2 processes (server + ticker)
  • Minimal memory (just state record)
  • Network calls at configured rate
Multiple Monitors:
  • Linear scaling with number of processes
  • Independent operation (no shared state)
  • Each monitor has own ticker

Use Cases

1. Scheduled Task Execution

% Process with scheduled tasks every hour
{Monitor, _} = hb_process_monitor:start(
    <<"ScheduledProcess">>,
    60000  % Check every minute
),

2. Event Stream Processing

% Process consuming event stream
{Monitor, Logger} = hb_process_monitor:start(
    <<"EventConsumer">>,
    5000  % Fast polling for real-time events
),

3. Background Job Processing

% Background job processor
{Monitor, _} = hb_process_monitor:start(
    <<"JobQueue">>,
    30000  % Check every 30 seconds
),

4. Multi-Process Coordination

% Monitor multiple coordinated processes
Monitors = [
    hb_process_monitor:start(<<"Process1">>, 10000),
    hb_process_monitor:start(<<"Process2">>, 10000),
    hb_process_monitor:start(<<"Process3">>, 10000)
],
 
% Stop all when done
lists:foreach(
    fun({Mon, _}) -> hb_process_monitor:stop(Mon) end,
    Monitors
).

Logging and Monitoring

Startup Logging

hb_logger:log(Monitor, {ok, started_monitor, {ProcID, Rate, Cursor}}),
Output:
{ok, started_monitor, {<<"MyProcess">>, 10000, <<"cursor-123">>}}

Error Logging

hb_logger:log(State#state.logger, {error, cron_fetch_failed, Reason}),

Process Registration

Both monitor and ticker register with the logger:

hb_logger:register(Monitor),
hb_logger:register(Ticker),

This enables centralized log collection and debugging.


State Management

Cursor Evolution

Initial: Cursor = hb_client:cron_cursor(ProcID)

Tick 1: Fetch with Cursor → Get NewCursor1

Tick 2: Fetch with NewCursor1 → Get NewCursor2

Tick 3: Fetch with NewCursor2 → Get NewCursor3

...

The cursor tracks pagination position across all cron messages.

State Immutability

State is updated functionally:

% Old state
State = #state{cursor = OldCursor, ...},
 
% Create new state with updated cursor
NewState = State#state{cursor = NewCursor},
 
% Return new state to server loop
server(NewState)

Testing Considerations

Mock Cron Response

% Mock hb_client:cron/2 response
meck:expect(hb_client, cron, fun(ProcID, Cursor) ->
    {ok, false, [
        #{<<"message">> => <<"task1">>},
        #{<<"message">> => <<"task2">>}
    ], <<"next-cursor">>}
end),
 
{Monitor, _} = hb_process_monitor:start(<<"TestProc">>),
timer:sleep(100),
% Verify messages pushed to MU

Monitor Lifecycle

% Start
{Monitor, Logger} = hb_process_monitor:start(<<"Test">>),
?assert(erlang:is_process_alive(Monitor)),
 
% Operate
timer:sleep(1000),
?assert(erlang:is_process_alive(Monitor)),
 
% Stop
hb_process_monitor:stop(Monitor),
timer:sleep(100),
?assertNot(erlang:is_process_alive(Monitor)).

References

  • Client Module - hb_client.erl
  • Message Unit - dev_mu.erl
  • Logging - hb_logger.erl
  • Configuration - hb_opts.erl
  • Process System - dev_process.erl

Notes

  1. Simple Architecture: Just server + ticker, minimal overhead
  2. Automatic Cleanup: Ticker stops when monitor dies
  3. Recursive Pagination: Fetches all available pages before waiting
  4. Error Resilience: Continues operation on fetch errors
  5. Independent Monitors: Each process has separate monitor
  6. Cursor Tracking: Maintains pagination state across ticks
  7. Logger Integration: All events captured for debugging
  8. Graceful Shutdown: stop/1 sends message for clean exit
  9. No Shared State: Monitors operate independently
  10. Minimal Dependencies: Only requires client, MU, and logger modules