hb_process_monitor.erl - Process Cron Monitoring
Overview
Purpose: Monitor and execute scheduled cron tasks for AO processes
Module: hb_process_monitor
Pattern: Ticker-based polling with message pushing
This module provides a lightweight monitoring system for AO process cron jobs. It periodically polls for scheduled tasks (cron messages) and pushes them to the message unit (MU) for execution. Built with a simple server-ticker architecture.
Dependencies
- HyperBEAM:
hb_opts,hb_client,hb_logger,dev_mu - Erlang/OTP:
timer - Records: Internal state record
Public Functions Overview
%% Monitor Management
-spec start(ProcID) -> {MonitorPID, LoggerPID}.
-spec start(ProcID, Rate) -> {MonitorPID, LoggerPID}.
-spec start(ProcID, Rate, Cursor) -> {MonitorPID, LoggerPID}.
-spec stop(PID) -> stop.Public Functions
1. start/1, start/2, start/3
-spec start(ProcID) -> {MonitorPID, LoggerPID}
when
ProcID :: binary(),
MonitorPID :: pid(),
LoggerPID :: pid().
-spec start(ProcID, Rate) -> {MonitorPID, LoggerPID}
when
ProcID :: binary(),
Rate :: integer(),
MonitorPID :: pid(),
LoggerPID :: pid().
-spec start(ProcID, Rate, Cursor) -> {MonitorPID, LoggerPID}
when
ProcID :: binary(),
Rate :: integer(),
Cursor :: binary() | undefined,
MonitorPID :: pid(),
LoggerPID :: pid().Description: Start monitoring a process for cron messages. Creates a server process and a ticker that polls at the specified rate.
Parameters:- ProcID: Process ID to monitor (required)
- Rate: Polling interval in milliseconds (default from
default_cron_rate) - Cursor: Starting cursor for cron pagination (default from
hb_client:cron_cursor/1)
Returns: Tuple of {MonitorPID, LoggerPID} for control and logging
-module(hb_process_monitor_start_test).
-include_lib("eunit/include/eunit.hrl").
%% start/1 and start/2 call hb_client:cron_cursor/1 which may not be available
%% Use start/3 with explicit cursor for reliable testing
start_with_cursor_test() ->
ProcID = <<"test-process-3">>,
Rate = 1000,
Cursor = <<"initial-cursor">>,
{Monitor, Logger} = hb_process_monitor:start(ProcID, Rate, Cursor),
?assert(is_pid(Monitor)),
?assert(is_pid(Logger)),
?assert(erlang:is_process_alive(Monitor)),
?assert(erlang:is_process_alive(Logger)),
hb_process_monitor:stop(Monitor).
start_with_undefined_cursor_test() ->
ProcID = <<"test-process-4">>,
Rate = 2000,
{Monitor, Logger} = hb_process_monitor:start(ProcID, Rate, undefined),
?assert(is_pid(Monitor)),
?assert(is_pid(Logger)),
hb_process_monitor:stop(Monitor).
start_multiple_monitors_test() ->
ProcID1 = <<"process-1">>,
ProcID2 = <<"process-2">>,
Rate = 1000,
Cursor = <<"cursor">>,
{Mon1, Log1} = hb_process_monitor:start(ProcID1, Rate, Cursor),
{Mon2, Log2} = hb_process_monitor:start(ProcID2, Rate, Cursor),
?assertNotEqual(Mon1, Mon2),
?assertNotEqual(Log1, Log2),
?assert(erlang:is_process_alive(Mon1)),
?assert(erlang:is_process_alive(Mon2)),
hb_process_monitor:stop(Mon1),
hb_process_monitor:stop(Mon2).
start_exports_test() ->
code:ensure_loaded(hb_process_monitor),
?assert(erlang:function_exported(hb_process_monitor, start, 1)),
?assert(erlang:function_exported(hb_process_monitor, start, 2)),
?assert(erlang:function_exported(hb_process_monitor, start, 3)).2. stop/1
-spec stop(PID) -> stop
when
PID :: pid().Description: Stop a monitor process. Sends a stop message to the monitor server, causing it to exit gracefully. Returns the stop atom (the message sent).
-module(hb_process_monitor_stop_test).
-include_lib("eunit/include/eunit.hrl").
stop_monitor_test() ->
ProcID = <<"test-process">>,
Rate = 1000,
Cursor = <<"cursor">>,
{Monitor, _Logger} = hb_process_monitor:start(ProcID, Rate, Cursor),
?assert(erlang:is_process_alive(Monitor)),
stop = hb_process_monitor:stop(Monitor),
timer:sleep(100),
?assertNot(erlang:is_process_alive(Monitor)).
stop_idempotent_test() ->
ProcID = <<"test-process">>,
Rate = 1000,
Cursor = <<"cursor">>,
{Monitor, _Logger} = hb_process_monitor:start(ProcID, Rate, Cursor),
stop = hb_process_monitor:stop(Monitor),
timer:sleep(50),
% Stopping again should not crash (message goes to dead process)
stop = hb_process_monitor:stop(Monitor).
stop_returns_ok_test() ->
ProcID = <<"test-process-stop">>,
Rate = 1000,
{Monitor, _Logger} = hb_process_monitor:start(ProcID, Rate, <<"cursor">>),
% stop/1 sends message and returns the message (stop atom)
Result = hb_process_monitor:stop(Monitor),
?assertEqual(stop, Result).Internal Architecture
State Record
-record(state, {
proc_id, % Process ID being monitored
cursor, % Current pagination cursor
logger % Logger process PID
}).Server Loop
server(State) ->
receive
stop ->
ok; % Exit gracefully
tick ->
server(handle_crons(State)) % Process cron messages
end.Ticker Loop
ticker(Monitor, Rate) ->
case erlang:is_process_alive(Monitor) of
true ->
timer:sleep(Rate),
Monitor ! tick,
ticker(Monitor, Rate);
false ->
ok % Monitor died, exit
end.Cron Handling Flow
handle_crons/1
handle_crons(State) ->
case hb_client:cron(State#state.proc_id, State#state.cursor) of
{ok, HasNextPage, Results, NewCursor} ->
% Push each result to MU
lists:map(
fun(Res) ->
dev_mu:push(#{message => Res}, State)
end,
Results
),
NewState = State#state{cursor = NewCursor},
case HasNextPage of
true -> NewState; % Done for now
false -> handle_crons(NewState) % Recursively fetch more
end;
Error ->
hb_logger:log(State#state.logger, Error),
State % Keep original state on error
end.Flow Diagram
Start
↓
Ticker sends 'tick' every Rate ms
↓
Server receives 'tick'
↓
Call hb_client:cron(ProcID, Cursor)
↓
Receive {ok, HasNextPage, Results, NewCursor}
↓
For each Result:
Push to dev_mu
↓
Update State with NewCursor
↓
If HasNextPage = false:
Recursively call handle_crons (fetch more pages)
↓
Return to server loopCommon Patterns
%% Start monitoring a process with default settings
{Monitor, Logger} = hb_process_monitor:start(<<"MyProcess-ID">>),
% Monitor runs in background, pushing cron messages
% Logger captures events and errors
% Later, stop the monitor
hb_process_monitor:stop(Monitor).
%% Custom polling rate (every 30 seconds)
Rate = 30000, % milliseconds
{Monitor, Logger} = hb_process_monitor:start(<<"MyProcess-ID">>, Rate),
%% Resume from specific cursor
Cursor = <<"saved-cursor-from-previous-run">>,
{Monitor, Logger} = hb_process_monitor:start(
<<"MyProcess-ID">>,
10000, % 10 seconds
Cursor
),
%% Monitor multiple processes
Processes = [<<"Proc1">>, <<"Proc2">>, <<"Proc3">>],
Monitors = lists:map(
fun(ProcID) ->
{Mon, Log} = hb_process_monitor:start(ProcID),
{ProcID, Mon, Log}
end,
Processes
),
% Stop all monitors
lists:foreach(
fun({_ProcID, Mon, _Log}) ->
hb_process_monitor:stop(Mon)
end,
Monitors
).
%% Check monitor status
{Monitor, Logger} = hb_process_monitor:start(<<"MyProcess">>),
case erlang:is_process_alive(Monitor) of
true -> io:format("Monitor running~n");
false -> io:format("Monitor stopped~n")
end.Configuration
Default Cron Rate
The default polling rate is retrieved from options:
Rate = hb_opts:get(default_cron_rate),% In config.json or config.flat
#{
default_cron_rate => 10000 % 10 seconds
}Cursor Management
The initial cursor is fetched from the client:
Cursor = hb_client:cron_cursor(ProcID),This allows resuming from the last processed cron message.
Integration with Other Modules
hb_client
Used to fetch cron messages:
{ok, HasNextPage, Results, NewCursor} = hb_client:cron(ProcID, Cursor),- HasNextPage:
trueif more pages available,falseotherwise - Results: List of cron messages to process
- NewCursor: Updated cursor for next request
dev_mu
Used to push messages for execution:
dev_mu:push(#{message => CronMessage}, State),This queues the cron message for processing by the message unit.
hb_logger
Used for event logging:
% Start logger
Logger = hb_logger:start(),
% Register processes with logger
hb_logger:register(Monitor),
hb_logger:register(Ticker),
% Log events
hb_logger:log(Monitor, {ok, started_monitor, {ProcID, Rate, Cursor}}),
% Log errors
hb_logger:log(Logger, Error),Error Handling
Cron Fetch Errors
case hb_client:cron(ProcID, Cursor) of
{ok, HasNextPage, Results, NewCursor} ->
% Process normally
handle_results(Results, NewState);
Error ->
% Log error and continue with old state
hb_logger:log(Logger, Error),
State % Don't update cursor on error
end- Network errors from
hb_client - Invalid process ID
- Malformed cursor
- Rate limiting
Ticker Exit Behavior
case erlang:is_process_alive(Monitor) of
true ->
% Continue ticking
timer:sleep(Rate),
Monitor ! tick,
ticker(Monitor, Rate);
false ->
% Monitor died, clean exit
ok
endThe ticker automatically stops when the monitor process dies.
Performance Considerations
Polling Rate Selection
Fast Polling (1-5 seconds):- Pros: Low latency for cron execution
- Cons: Higher network/CPU usage
- Use Case: Time-sensitive processes
- Pros: Balanced performance
- Cons: Moderate latency
- Use Case: Most processes (recommended)
- Pros: Minimal resource usage
- Cons: Higher latency
- Use Case: Low-priority background tasks
Pagination
The monitor recursively fetches all available pages when HasNextPage = false:
case HasNextPage of
true ->
% Done for now, wait for next tick
NewState;
false ->
% More pages available, fetch immediately
handle_crons(NewState)
endThis ensures all pending cron messages are processed before waiting.
Resource Usage
Per Monitor:- 2 processes (server + ticker)
- Minimal memory (just state record)
- Network calls at configured rate
- Linear scaling with number of processes
- Independent operation (no shared state)
- Each monitor has own ticker
Use Cases
1. Scheduled Task Execution
% Process with scheduled tasks every hour
{Monitor, _} = hb_process_monitor:start(
<<"ScheduledProcess">>,
60000 % Check every minute
),2. Event Stream Processing
% Process consuming event stream
{Monitor, Logger} = hb_process_monitor:start(
<<"EventConsumer">>,
5000 % Fast polling for real-time events
),3. Background Job Processing
% Background job processor
{Monitor, _} = hb_process_monitor:start(
<<"JobQueue">>,
30000 % Check every 30 seconds
),4. Multi-Process Coordination
% Monitor multiple coordinated processes
Monitors = [
hb_process_monitor:start(<<"Process1">>, 10000),
hb_process_monitor:start(<<"Process2">>, 10000),
hb_process_monitor:start(<<"Process3">>, 10000)
],
% Stop all when done
lists:foreach(
fun({Mon, _}) -> hb_process_monitor:stop(Mon) end,
Monitors
).Logging and Monitoring
Startup Logging
hb_logger:log(Monitor, {ok, started_monitor, {ProcID, Rate, Cursor}}),{ok, started_monitor, {<<"MyProcess">>, 10000, <<"cursor-123">>}}Error Logging
hb_logger:log(State#state.logger, {error, cron_fetch_failed, Reason}),Process Registration
Both monitor and ticker register with the logger:
hb_logger:register(Monitor),
hb_logger:register(Ticker),This enables centralized log collection and debugging.
State Management
Cursor Evolution
Initial: Cursor = hb_client:cron_cursor(ProcID)
↓
Tick 1: Fetch with Cursor → Get NewCursor1
↓
Tick 2: Fetch with NewCursor1 → Get NewCursor2
↓
Tick 3: Fetch with NewCursor2 → Get NewCursor3
↓
...The cursor tracks pagination position across all cron messages.
State Immutability
State is updated functionally:
% Old state
State = #state{cursor = OldCursor, ...},
% Create new state with updated cursor
NewState = State#state{cursor = NewCursor},
% Return new state to server loop
server(NewState)Testing Considerations
Mock Cron Response
% Mock hb_client:cron/2 response
meck:expect(hb_client, cron, fun(ProcID, Cursor) ->
{ok, false, [
#{<<"message">> => <<"task1">>},
#{<<"message">> => <<"task2">>}
], <<"next-cursor">>}
end),
{Monitor, _} = hb_process_monitor:start(<<"TestProc">>),
timer:sleep(100),
% Verify messages pushed to MUMonitor Lifecycle
% Start
{Monitor, Logger} = hb_process_monitor:start(<<"Test">>),
?assert(erlang:is_process_alive(Monitor)),
% Operate
timer:sleep(1000),
?assert(erlang:is_process_alive(Monitor)),
% Stop
hb_process_monitor:stop(Monitor),
timer:sleep(100),
?assertNot(erlang:is_process_alive(Monitor)).References
- Client Module -
hb_client.erl - Message Unit -
dev_mu.erl - Logging -
hb_logger.erl - Configuration -
hb_opts.erl - Process System -
dev_process.erl
Notes
- Simple Architecture: Just server + ticker, minimal overhead
- Automatic Cleanup: Ticker stops when monitor dies
- Recursive Pagination: Fetches all available pages before waiting
- Error Resilience: Continues operation on fetch errors
- Independent Monitors: Each process has separate monitor
- Cursor Tracking: Maintains pagination state across ticks
- Logger Integration: All events captured for debugging
- Graceful Shutdown:
stop/1sends message for clean exit - No Shared State: Monitors operate independently
- Minimal Dependencies: Only requires client, MU, and logger modules