dev_monitor.erl - Process Execution Monitoring
Overview
Purpose: Flexible monitoring of process execution state
Module: dev_monitor
Pattern: Observer pattern with automatic cleanup
Lifecycle: Monitor functions can signal completion
This device enables non-intrusive monitoring of process execution by invoking registered monitor functions during each pass. Monitors observe state without mutating it and can remove themselves when their observation is complete.
Dependencies
- HyperBEAM: None (minimal dependencies)
- Includes:
include/hb.hrl
Public Functions Overview
%% Device Lifecycle
-spec init(State, Req, InitState) -> {ok, StateWithMonitors}.
-spec execute(Message, State) -> {ok, State}.
-spec end_of_schedule(State) -> {ok, State}.
%% Monitor Management
-spec add_monitor(Monitor, State) -> {ok, StateWithMonitor}.
%% Device Metadata
-spec uses() -> all.Public Functions
1. init/3
-spec init(State, Req, InitState) -> {ok, StateWithMonitors}
when
State :: map(),
Req :: map(),
InitState :: [MonitorFun],
MonitorFun :: fun((State, Signal) -> term()),
StateWithMonitors :: map().Description: Initialize the monitor device with a list of monitor functions. Stores monitors in the state for later invocation.
Test Code:-module(dev_monitor_init_test).
-include_lib("eunit/include/eunit.hrl").
init_with_monitors_test() ->
Monitor1 = fun(_State, _Signal) -> ok end,
Monitor2 = fun(_State, _Signal) -> ok end,
InitState = [Monitor1, Monitor2],
{ok, State} = dev_monitor:init(#{}, #{}, InitState),
?assert(maps:is_key(<<"monitors">>, State)),
Monitors = maps:get(<<"monitors">>, State),
?assertEqual(2, length(Monitors)).
init_empty_test() ->
{ok, State} = dev_monitor:init(#{}, #{}, []),
?assertEqual([], maps:get(<<"monitors">>, State)).2. execute/2
-spec execute(Message, State) -> {ok, State}
when
Message :: map(),
State :: map().Description: Execute monitors when on the final pass of a multi-pass execution. Signals monitors with {message, Message}.
- Only runs when
pass == passes(final pass) - Silently returns state on non-final passes
-module(dev_monitor_execute_test).
-include_lib("eunit/include/eunit.hrl").
execute_final_pass_test() ->
Parent = self(),
Monitor = fun(_State, {message, Msg}) ->
Parent ! {monitored, Msg},
ok
end,
State = #{
<<"monitors">> => [Monitor],
<<"pass">> => 2,
<<"passes">> => 2
},
Message = #{ <<"data">> => <<"test">> },
{ok, _NewState} = dev_monitor:execute(Message, State),
receive
{monitored, Msg} -> ?assertEqual(Message, Msg)
after 100 -> error(monitor_not_called)
end.
execute_non_final_pass_test() ->
Parent = self(),
Monitor = fun(_State, _Signal) ->
Parent ! should_not_be_called,
ok
end,
State = #{
<<"monitors">> => [Monitor],
<<"pass">> => 1,
<<"passes">> => 2
},
{ok, _NewState} = dev_monitor:execute(#{}, State),
receive
should_not_be_called -> error(monitor_called_on_non_final_pass)
after 50 -> ok
end.3. end_of_schedule/1
-spec end_of_schedule(State) -> {ok, State}
when
State :: map().Description: Signal monitors that the schedule has ended. Called after all messages in a schedule have been processed.
Test Code:-module(dev_monitor_end_of_schedule_test).
-include_lib("eunit/include/eunit.hrl").
end_of_schedule_test() ->
Parent = self(),
Monitor = fun(_State, end_of_schedule) ->
Parent ! schedule_ended,
ok
end,
State = #{ <<"monitors">> => [Monitor] },
{ok, _NewState} = dev_monitor:end_of_schedule(State),
receive
schedule_ended -> ok
after 100 -> error(monitor_not_called)
end.
end_of_schedule_cleanup_test() ->
Parent = self(),
Monitor = fun(_State, end_of_schedule) ->
Parent ! cleaned_up,
done % Signal completion
end,
State = #{ <<"monitors">> => [Monitor] },
{ok, NewState} = dev_monitor:end_of_schedule(State),
receive
cleaned_up -> ok
after 100 -> error(monitor_not_called)
end,
% Monitor should be removed
?assertEqual([], maps:get(<<"monitors">>, NewState)).4. add_monitor/2
-spec add_monitor(Monitor, State) -> {ok, StateWithMonitor}
when
Monitor :: fun((State, Signal) -> term()),
State :: map(),
StateWithMonitor :: map().Description: Add a monitor function to the state. Monitors are prepended to the list (LIFO order).
Test Code:-module(dev_monitor_add_test).
-include_lib("eunit/include/eunit.hrl").
add_monitor_test() ->
Monitor1 = fun(_State, _Signal) -> ok end,
State = #{ <<"monitors">> => [] },
{ok, NewState} = dev_monitor:add_monitor(Monitor1, State),
?assertEqual(1, length(maps:get(<<"monitors">>, NewState))).
add_multiple_monitors_test() ->
Monitor1 = fun(_State, _Signal) -> ok end,
Monitor2 = fun(_State, _Signal) -> ok end,
State0 = #{ <<"monitors">> => [] },
{ok, State1} = dev_monitor:add_monitor(Monitor1, State0),
{ok, State2} = dev_monitor:add_monitor(Monitor2, State1),
?assertEqual(2, length(maps:get(<<"monitors">>, State2))).5. uses/0
-spec uses() -> all.Description: Indicates the device uses all available keys. This ensures monitors have access to complete process state.
Test Code:-module(dev_monitor_uses_test).
-include_lib("eunit/include/eunit.hrl").
uses_test() ->
?assertEqual(all, dev_monitor:uses()).Monitor Function Specification
Function Signature
-spec MonitorFun(State, Signal) -> Result
when
State :: map(),
Signal :: {message, Message} | end_of_schedule,
Message :: map(),
Result :: ok | done | term().Return Values
ok or any term except done:
- Monitor continues (remains in list)
- Will be called on next signal
done:
- Monitor completes (removed from list)
- Will not be called again
Signal Types
1. Message Signal
{message, Message}When: During final pass of execution
Contains: The message being processed
Use Case: Observe message processing, collect statistics
2. End of Schedule Signal
end_of_scheduleWhen: After all messages processed
Contains: Just the atom
Use Case: Cleanup, final reporting, state finalization
Monitor Patterns
One-Shot Monitor
Monitor = fun(_State, {message, _Msg}) ->
% Do something once
io:format("First message processed~n"),
done % Remove after first execution
end.Continuous Monitor
Monitor = fun(_State, Signal) ->
% Log all signals
io:format("Signal: ~p~n", [Signal]),
ok % Keep monitoring
end.Counting Monitor
Monitor = fun(State, {message, _Msg}) ->
Count = hb_ao:get(<<"message-count">>, State, 0, #{}),
io:format("Processed ~p messages~n", [Count + 1]),
if
Count + 1 >= 100 -> done; % Stop after 100 messages
true -> ok % Continue
end
end.State Validator
Monitor = fun(State, {message, _Msg}) ->
Balance = hb_ao:get(<<"now/balance">>, State, 0, #{}),
Supply = hb_ao:get(<<"now/supply">>, State, 0, #{}),
case Balance > Supply of
true ->
error({invariant_violation, balance_exceeds_supply});
false ->
ok
end
end.Performance Monitor
Monitor = fun(State, Signal) ->
Start = erlang:monotonic_time(),
% Monitor executes...
End = erlang:monotonic_time(),
Duration = erlang:convert_time_unit(End - Start, native, microsecond),
io:format("Monitor duration: ~p μs~n", [Duration]),
case Signal of
end_of_schedule -> done;
_ -> ok
end
end.Common Patterns
%% Initialize with monitors
Monitors = [
fun(State, {message, Msg}) ->
io:format("Message: ~p~n", [Msg]),
ok
end,
fun(State, end_of_schedule) ->
io:format("Schedule complete~n"),
done
end
],
{ok, State} = dev_monitor:init(BaseState, #{}, Monitors).
%% Add monitor dynamically
NewMonitor = fun(_State, {message, Msg}) ->
case hb_ao:get(<<"action">>, Msg, #{}#{}) of
<<"Transfer">> -> io:format("Transfer detected~n");
_ -> ok
end,
ok
end,
{ok, UpdatedState} = dev_monitor:add_monitor(NewMonitor, State).
%% Cleanup monitor
CleanupMonitor = fun(_State, end_of_schedule) ->
% Perform cleanup
io:format("Cleaning up~n"),
done % Remove after cleanup
end.
%% Statistics monitor
StatsMonitor = fun(State, Signal) ->
case Signal of
{message, _Msg} ->
Count = hb_ao:get(<<"stats/message-count">>, State, 0, #{}),
io:format("Messages processed: ~p~n", [Count + 1]),
ok;
end_of_schedule ->
FinalCount = hb_ao:get(<<"stats/message-count">>, State, 0, #{}),
io:format("Final count: ~p~n", [FinalCount]),
done
end
end.
%% Conditional monitor
ConditionalMonitor = fun(State, {message, Msg}) ->
case hb_ao:get(<<"debug-mode">>, State, false, #{}) of
true ->
io:format("Debug: ~p~n", [Msg]),
ok;
false ->
done % Stop monitoring when debug disabled
end
end.Integration with Multi-Pass Execution
The monitor device integrates with dev_multipass for multi-pass execution:
Pass Tracking
State = #{
<<"pass">> => 1,
<<"passes">> => 3,
<<"monitors">> => Monitors
}Execution Flow
- Pass 1: Monitors not executed
- Pass 2: Monitors not executed
- Pass 3 (final): Monitors executed with
{message, Msg} - After all passes:
end_of_schedulesignal sent
Monitor Cleanup
Monitors are automatically cleaned up when they return done:
signal(State = #{ <<"monitors">> := StartingMonitors }, Signal) ->
RemainingMonitors =
lists:filter(
fun(Mon) ->
case Mon(State, Signal) of
done -> false; % Remove monitor
_ -> true % Keep monitor
end
end,
StartingMonitors
),
{ok, State#{ <<"monitors">> := RemainingMonitors }}.Testing Monitors
Test Helper Pattern
test_monitor(TestPid, ExpectedSignal) ->
fun(_State, Signal) ->
case Signal of
ExpectedSignal ->
TestPid ! signal_received,
done;
_ ->
ok
end
end.
monitor_test() ->
Monitor = test_monitor(self(), {message, test_msg}),
State = #{ <<"monitors">> => [Monitor] },
dev_monitor:execute(test_msg, State),
receive
signal_received -> ok
after 100 -> error(timeout)
end.Performance Considerations
Monitor Efficiency
- Keep Monitors Light: Monitors run on every signal
- Avoid Heavy Computation: Use sampling for expensive operations
- Remove When Done: Return
doneto cleanup - Batch Operations: Collect data, process at
end_of_schedule
Sampling Monitor
SamplingMonitor = fun(State, {message, _Msg}) ->
% Only monitor every 10th message
Count = hb_ao:get(<<"monitor-count">>, State, 0, #{}),
case Count rem 10 of
0 ->
% Perform monitoring
ok;
_ ->
ok
end
end.References
- dev_multipass.erl - Multi-pass execution device
- hb_ao.erl - AO-Core resolution and state access
- Process Devices - Process execution framework
Notes
- Non-Intrusive: Monitors observe but don't modify state
- Final Pass Only: Execute monitors only run on final pass
- Automatic Cleanup: Monitors returning
doneare removed - LIFO Order: New monitors prepended to list
- Full State Access: Monitors receive complete process state
- Signal Types: Two signal types (message and end_of_schedule)
- Flexible Return: Any return value except
donekeeps monitor - Dynamic Addition: Monitors can be added during execution
- Event Logging: Logs remaining monitor count after cleanup
- Simple API: Minimal interface for maximum flexibility
- Performance: Lightweight - minimal overhead
- Testing Friendly: Easy to test with simple function mocks
- Composable: Multiple monitors work independently
- Error Isolation: Monitor errors don't affect process
- Lifecycle Hooks: Both execution and cleanup hooks available