Skip to content

dev_monitor.erl - Process Execution Monitoring

Overview

Purpose: Flexible monitoring of process execution state
Module: dev_monitor
Pattern: Observer pattern with automatic cleanup
Lifecycle: Monitor functions can signal completion

This device enables non-intrusive monitoring of process execution by invoking registered monitor functions during each pass. Monitors observe state without mutating it and can remove themselves when their observation is complete.

Dependencies

  • HyperBEAM: None (minimal dependencies)
  • Includes: include/hb.hrl

Public Functions Overview

%% Device Lifecycle
-spec init(State, Req, InitState) -> {ok, StateWithMonitors}.
-spec execute(Message, State) -> {ok, State}.
-spec end_of_schedule(State) -> {ok, State}.
 
%% Monitor Management
-spec add_monitor(Monitor, State) -> {ok, StateWithMonitor}.
 
%% Device Metadata
-spec uses() -> all.

Public Functions

1. init/3

-spec init(State, Req, InitState) -> {ok, StateWithMonitors}
    when
        State :: map(),
        Req :: map(),
        InitState :: [MonitorFun],
        MonitorFun :: fun((State, Signal) -> term()),
        StateWithMonitors :: map().

Description: Initialize the monitor device with a list of monitor functions. Stores monitors in the state for later invocation.

Test Code:
-module(dev_monitor_init_test).
-include_lib("eunit/include/eunit.hrl").
 
init_with_monitors_test() ->
    Monitor1 = fun(_State, _Signal) -> ok end,
    Monitor2 = fun(_State, _Signal) -> ok end,
    InitState = [Monitor1, Monitor2],
    {ok, State} = dev_monitor:init(#{}, #{}, InitState),
    ?assert(maps:is_key(<<"monitors">>, State)),
    Monitors = maps:get(<<"monitors">>, State),
    ?assertEqual(2, length(Monitors)).
 
init_empty_test() ->
    {ok, State} = dev_monitor:init(#{}, #{}, []),
    ?assertEqual([], maps:get(<<"monitors">>, State)).

2. execute/2

-spec execute(Message, State) -> {ok, State}
    when
        Message :: map(),
        State :: map().

Description: Execute monitors when on the final pass of a multi-pass execution. Signals monitors with {message, Message}.

Execution Condition:
  • Only runs when pass == passes (final pass)
  • Silently returns state on non-final passes
Test Code:
-module(dev_monitor_execute_test).
-include_lib("eunit/include/eunit.hrl").
 
execute_final_pass_test() ->
    Parent = self(),
    Monitor = fun(_State, {message, Msg}) ->
        Parent ! {monitored, Msg},
        ok
    end,
    State = #{
        <<"monitors">> => [Monitor],
        <<"pass">> => 2,
        <<"passes">> => 2
    },
    Message = #{ <<"data">> => <<"test">> },
    {ok, _NewState} = dev_monitor:execute(Message, State),
    receive
        {monitored, Msg} -> ?assertEqual(Message, Msg)
    after 100 -> error(monitor_not_called)
    end.
 
execute_non_final_pass_test() ->
    Parent = self(),
    Monitor = fun(_State, _Signal) ->
        Parent ! should_not_be_called,
        ok
    end,
    State = #{
        <<"monitors">> => [Monitor],
        <<"pass">> => 1,
        <<"passes">> => 2
    },
    {ok, _NewState} = dev_monitor:execute(#{}, State),
    receive
        should_not_be_called -> error(monitor_called_on_non_final_pass)
    after 50 -> ok
    end.

3. end_of_schedule/1

-spec end_of_schedule(State) -> {ok, State}
    when
        State :: map().

Description: Signal monitors that the schedule has ended. Called after all messages in a schedule have been processed.

Test Code:
-module(dev_monitor_end_of_schedule_test).
-include_lib("eunit/include/eunit.hrl").
 
end_of_schedule_test() ->
    Parent = self(),
    Monitor = fun(_State, end_of_schedule) ->
        Parent ! schedule_ended,
        ok
    end,
    State = #{ <<"monitors">> => [Monitor] },
    {ok, _NewState} = dev_monitor:end_of_schedule(State),
    receive
        schedule_ended -> ok
    after 100 -> error(monitor_not_called)
    end.
 
end_of_schedule_cleanup_test() ->
    Parent = self(),
    Monitor = fun(_State, end_of_schedule) ->
        Parent ! cleaned_up,
        done  % Signal completion
    end,
    State = #{ <<"monitors">> => [Monitor] },
    {ok, NewState} = dev_monitor:end_of_schedule(State),
    receive
        cleaned_up -> ok
    after 100 -> error(monitor_not_called)
    end,
    % Monitor should be removed
    ?assertEqual([], maps:get(<<"monitors">>, NewState)).

4. add_monitor/2

-spec add_monitor(Monitor, State) -> {ok, StateWithMonitor}
    when
        Monitor :: fun((State, Signal) -> term()),
        State :: map(),
        StateWithMonitor :: map().

Description: Add a monitor function to the state. Monitors are prepended to the list (LIFO order).

Test Code:
-module(dev_monitor_add_test).
-include_lib("eunit/include/eunit.hrl").
 
add_monitor_test() ->
    Monitor1 = fun(_State, _Signal) -> ok end,
    State = #{ <<"monitors">> => [] },
    {ok, NewState} = dev_monitor:add_monitor(Monitor1, State),
    ?assertEqual(1, length(maps:get(<<"monitors">>, NewState))).
 
add_multiple_monitors_test() ->
    Monitor1 = fun(_State, _Signal) -> ok end,
    Monitor2 = fun(_State, _Signal) -> ok end,
    State0 = #{ <<"monitors">> => [] },
    {ok, State1} = dev_monitor:add_monitor(Monitor1, State0),
    {ok, State2} = dev_monitor:add_monitor(Monitor2, State1),
    ?assertEqual(2, length(maps:get(<<"monitors">>, State2))).

5. uses/0

-spec uses() -> all.

Description: Indicates the device uses all available keys. This ensures monitors have access to complete process state.

Test Code:
-module(dev_monitor_uses_test).
-include_lib("eunit/include/eunit.hrl").
 
uses_test() ->
    ?assertEqual(all, dev_monitor:uses()).

Monitor Function Specification

Function Signature

-spec MonitorFun(State, Signal) -> Result
    when
        State :: map(),
        Signal :: {message, Message} | end_of_schedule,
        Message :: map(),
        Result :: ok | done | term().

Return Values

ok or any term except done:
  • Monitor continues (remains in list)
  • Will be called on next signal
done:
  • Monitor completes (removed from list)
  • Will not be called again

Signal Types

1. Message Signal

{message, Message}

When: During final pass of execution
Contains: The message being processed
Use Case: Observe message processing, collect statistics

2. End of Schedule Signal

end_of_schedule

When: After all messages processed
Contains: Just the atom
Use Case: Cleanup, final reporting, state finalization


Monitor Patterns

One-Shot Monitor

Monitor = fun(_State, {message, _Msg}) ->
    % Do something once
    io:format("First message processed~n"),
    done  % Remove after first execution
end.

Continuous Monitor

Monitor = fun(_State, Signal) ->
    % Log all signals
    io:format("Signal: ~p~n", [Signal]),
    ok  % Keep monitoring
end.

Counting Monitor

Monitor = fun(State, {message, _Msg}) ->
    Count = hb_ao:get(<<"message-count">>, State, 0, #{}),
    io:format("Processed ~p messages~n", [Count + 1]),
    if
        Count + 1 >= 100 -> done;  % Stop after 100 messages
        true -> ok  % Continue
    end
end.

State Validator

Monitor = fun(State, {message, _Msg}) ->
    Balance = hb_ao:get(<<"now/balance">>, State, 0, #{}),
    Supply = hb_ao:get(<<"now/supply">>, State, 0, #{}),
    case Balance > Supply of
        true -> 
            error({invariant_violation, balance_exceeds_supply});
        false -> 
            ok
    end
end.

Performance Monitor

Monitor = fun(State, Signal) ->
    Start = erlang:monotonic_time(),
    % Monitor executes...
    End = erlang:monotonic_time(),
    Duration = erlang:convert_time_unit(End - Start, native, microsecond),
    io:format("Monitor duration: ~p μs~n", [Duration]),
    case Signal of
        end_of_schedule -> done;
        _ -> ok
    end
end.

Common Patterns

%% Initialize with monitors
Monitors = [
    fun(State, {message, Msg}) ->
        io:format("Message: ~p~n", [Msg]),
        ok
    end,
    fun(State, end_of_schedule) ->
        io:format("Schedule complete~n"),
        done
    end
],
{ok, State} = dev_monitor:init(BaseState, #{}, Monitors).
 
%% Add monitor dynamically
NewMonitor = fun(_State, {message, Msg}) ->
    case hb_ao:get(<<"action">>, Msg, #{}#{}) of
        <<"Transfer">> -> io:format("Transfer detected~n");
        _ -> ok
    end,
    ok
end,
{ok, UpdatedState} = dev_monitor:add_monitor(NewMonitor, State).
 
%% Cleanup monitor
CleanupMonitor = fun(_State, end_of_schedule) ->
    % Perform cleanup
    io:format("Cleaning up~n"),
    done  % Remove after cleanup
end.
 
%% Statistics monitor
StatsMonitor = fun(State, Signal) ->
    case Signal of
        {message, _Msg} ->
            Count = hb_ao:get(<<"stats/message-count">>, State, 0, #{}),
            io:format("Messages processed: ~p~n", [Count + 1]),
            ok;
        end_of_schedule ->
            FinalCount = hb_ao:get(<<"stats/message-count">>, State, 0, #{}),
            io:format("Final count: ~p~n", [FinalCount]),
            done
    end
end.
 
%% Conditional monitor
ConditionalMonitor = fun(State, {message, Msg}) ->
    case hb_ao:get(<<"debug-mode">>, State, false, #{}) of
        true ->
            io:format("Debug: ~p~n", [Msg]),
            ok;
        false ->
            done  % Stop monitoring when debug disabled
    end
end.

Integration with Multi-Pass Execution

The monitor device integrates with dev_multipass for multi-pass execution:

Pass Tracking

State = #{
    <<"pass">> => 1,
    <<"passes">> => 3,
    <<"monitors">> => Monitors
}

Execution Flow

  1. Pass 1: Monitors not executed
  2. Pass 2: Monitors not executed
  3. Pass 3 (final): Monitors executed with {message, Msg}
  4. After all passes: end_of_schedule signal sent

Monitor Cleanup

Monitors are automatically cleaned up when they return done:

signal(State = #{ <<"monitors">> := StartingMonitors }, Signal) ->
    RemainingMonitors =
        lists:filter(
            fun(Mon) ->
                case Mon(State, Signal) of
                    done -> false;  % Remove monitor
                    _ -> true       % Keep monitor
                end
            end,
            StartingMonitors
        ),
    {ok, State#{ <<"monitors">> := RemainingMonitors }}.

Testing Monitors

Test Helper Pattern

test_monitor(TestPid, ExpectedSignal) ->
    fun(_State, Signal) ->
        case Signal of
            ExpectedSignal ->
                TestPid ! signal_received,
                done;
            _ ->
                ok
        end
    end.
 
monitor_test() ->
    Monitor = test_monitor(self(), {message, test_msg}),
    State = #{ <<"monitors">> => [Monitor] },
    dev_monitor:execute(test_msg, State),
    receive
        signal_received -> ok
    after 100 -> error(timeout)
    end.

Performance Considerations

Monitor Efficiency

  1. Keep Monitors Light: Monitors run on every signal
  2. Avoid Heavy Computation: Use sampling for expensive operations
  3. Remove When Done: Return done to cleanup
  4. Batch Operations: Collect data, process at end_of_schedule

Sampling Monitor

SamplingMonitor = fun(State, {message, _Msg}) ->
    % Only monitor every 10th message
    Count = hb_ao:get(<<"monitor-count">>, State, 0, #{}),
    case Count rem 10 of
        0 -> 
            % Perform monitoring
            ok;
        _ -> 
            ok
    end
end.

References

  • dev_multipass.erl - Multi-pass execution device
  • hb_ao.erl - AO-Core resolution and state access
  • Process Devices - Process execution framework

Notes

  1. Non-Intrusive: Monitors observe but don't modify state
  2. Final Pass Only: Execute monitors only run on final pass
  3. Automatic Cleanup: Monitors returning done are removed
  4. LIFO Order: New monitors prepended to list
  5. Full State Access: Monitors receive complete process state
  6. Signal Types: Two signal types (message and end_of_schedule)
  7. Flexible Return: Any return value except done keeps monitor
  8. Dynamic Addition: Monitors can be added during execution
  9. Event Logging: Logs remaining monitor count after cleanup
  10. Simple API: Minimal interface for maximum flexibility
  11. Performance: Lightweight - minimal overhead
  12. Testing Friendly: Easy to test with simple function mocks
  13. Composable: Multiple monitors work independently
  14. Error Isolation: Monitor errors don't affect process
  15. Lifecycle Hooks: Both execution and cleanup hooks available