Skip to content

dev_push.erl - Recursive Message Pushing Device

Overview

Purpose: Evaluate and recursively push messages between processes
Module: dev_push
Pattern: Compute → Extract Outbox → Push Downstream → Recurse
Device: Push@1.0

This device evaluates process messages, extracts outgoing messages from the outbox, and recursively pushes them to target processes. Continues until no remaining messages to push, enabling complex multi-process workflows.

Architecture

Flow:
Process Compute → Extract Outbox → For Each Message:
  ├─ Resolve Target Process
  ├─ Schedule Message
  ├─ Push Recursively
  └─ Aggregate Results

Dependencies

  • HyperBEAM: hb_ao, hb_cache, hb_util, hb_maps, hb_private, hb_path, hb_message, hb_http, hb_http_server
  • Process: dev_process
  • Arweave: ar_bundles
  • Includes: include/hb.hrl

Public Functions Overview

%% Main Push Function
-spec push(Base, Req, Opts) -> {ok, PushResult} | {error, Reason}.

Public Function

push/3

-spec push(Base, Req, Opts) -> {ok, PushResult} | {error, Reason}
    when
        Base :: map(),
        Req :: map(),
        Opts :: map(),
        PushResult :: map().

Description: Push message or slot number, recursively processing downstream messages until completion.

Two Modes: 1. Initial Message Mode:
  • Request has body with process or message
  • Schedules message to process
  • If target is process definition → initialize it
  • Otherwise → push scheduled message
2. Slot Mode:
  • Request has slot number
  • Computes existing slot
  • Pushes resulting outbox messages
Optional Parameters:
  • result-depth - Depth of full results to include (default: 1)
  • push-mode - Sync or async pushing (default: sync)
Test Code:
-module(dev_push_test).
-include_lib("eunit/include/eunit.hrl").
 
push_simple_message_test() ->
    dev_process:init(),
    Process = dev_process:test_aos_process(),
    {ok, _} = hb_cache:write(Process, #{}),
    
    Script = <<"
        Handlers.add('Test',
            { Action = 'Test' },
            function(m)
                Send({ Target = ao.id, Action = 'Reply', Data = 'OK' })
            end
        )
    ">>,
    
    {ok, _} = dev_process:schedule_aos_call(Process, Script),
    
    % Push with message in body
    {ok, PushResult} = hb_ao:resolve(
        Process,
        #{
            <<"path">> => <<"push">>,
            <<"slot">> => 0
        },
        #{}
    ),
    
    ?assert(is_map(PushResult)).
 
push_recursive_test() ->
    dev_process:init(),
    Client = dev_process:test_aos_process(),
    {ok, _} = hb_cache:write(Client, #{}),
    
    PingScript = <<"
        Handlers.add('Ping',
            { Action = 'Ping' },
            function(m)
                C = tonumber(m.Count) or 0
                if C < 3 then
                    Send({ Target = ao.id, Action = 'Ping', Count = C + 1 })
                end
            end
        )
        Send({ Target = ao.id, Action = 'Ping', Count = 1 })
    ">>,
    
    {ok, _} = dev_process:schedule_aos_call(Client, PingScript),
    {ok, Result} = hb_ao:resolve(
        Client,
        #{ <<"path">> => <<"push">>, <<"slot">> => 0 },
        #{}
    ),
    
    ?assert(is_map(Result)).

Push Modes

Synchronous (Default)

is_async(Process, Req, Opts) ->
    hb_ao:get_first(
        [
            {Req, <<"push-mode">>},
            {Process, <<"push-mode">>},
            {Process, <<"process/push-mode">>}
        ],
        <<"sync">>,
        Opts
    ).
Behavior:
  • Waits for push completion
  • Returns complete result tree
  • Blocks until all downstream pushes done

Asynchronous

push_with_mode(Process, Req, Opts) ->
    case is_async(Process, Req, Opts) of
        <<"async">> ->
            spawn(fun() -> do_push(Process, Req, Opts) end);
        <<"sync">> ->
            do_push(Process, Req, Opts)
    end.
Behavior:
  • Spawns separate process
  • Returns immediately
  • No result collection

Push Flow

1. Compute Results

{Status, Result} = hb_ao:resolve(
    {as, <<"process@1.0">>, PrimaryProcess},
    #{ <<"path">> => <<"compute/results">>, <<"slot">> => Slot },
    Opts
).

2. Extract Outbox

case hb_ao:get(<<"outbox">>, Result, #{}, Opts) of
    NoResults when ?IS_EMPTY_MESSAGE(NoResults) ->
        {ok, #{<<"slot">> => Slot, <<"process">> => ID}};
    Outbox ->
        % Process each outbox message
        ...
end.

3. Process Each Message

hb_maps:map(
    fun(Key, RawMsgToPush) ->
        MsgToPush = maybe_evaluate_message(RawMsgToPush, Opts),
        case hb_cache:read(Target, Opts) of
            {ok, DownstreamProcess} ->
                push_result_message(DownstreamProcess, MsgToPush, ...);
            not_found ->
                #{ <<"response">> => <<"error">>, <<"status">> => 404 }
        end
    end,
    Outbox,
    Opts
).

4. Push Downstream

push_result_message(DownstreamProcess, MsgToPush, Metadata, Opts) ->
    % Schedule message
    {ok, Assignment} = schedule_message(...),
    % Recursively push
    push(DownstreamProcess, Assignment, Opts).

Message Evaluation

Path-Based Evaluation

maybe_evaluate_message(Message = #{ <<"path">> := Path }, Opts) ->
    % Remove target temporarily
    Target = maps:get(<<"target">>, Message),
    WithoutTarget = maps:without([<<"target">>], Message),
    
    % Evaluate path
    case hb_ao:resolve(WithoutTarget, Opts) of
        {ok, Result} ->
            % Re-add target
            {ok, Result#{ <<"target">> => Target }};
        Error -> Error
    end.
Use Case:
% Outbox message with evaluation
#{
    <<"target">> => ProcessID,
    <<"path">> => <<"/compute-value">>,
    <<"params">> => #{ <<"x">> => 42 }
}
 
% Evaluates path, then pushes result to target

Result Depth Control

IncludeDepth = hb_ao:get(<<"result-depth">>, Assignment, 1, Opts),
 
AdditionalRes = case IncludeDepth of
    X when X > 0 -> Result;  % Include full result
    _ -> #{}                 % Only tree structure
end.
Depth Levels:
  • 0 - Only push tree (slot, process IDs)
  • 1 - First level full results + tree
  • 2+ - Multiple levels of full results

Metadata Propagation

Metadata = #{
    <<"process">> => SourceProcessID,
    <<"slot">> => SourceSlot,
    <<"outbox-key">> => OutboxKey,
    <<"result-depth">> => IncludeDepth - 1,
    <<"from-base">> => BaseID,
    <<"from-uncommitted">> => UncommittedID,
    <<"from-scheduler">> => SchedulerDevice,
    <<"from-authority">> => AuthorityDevice
}.
Propagated Information:
  • Source process and slot
  • Outbox message key
  • Remaining result depth
  • Process identifiers
  • Scheduler configuration
  • Authority configuration

Common Patterns

%% Push a computed slot
{ok, Result} = hb_ao:resolve(
    Process,
    #{ <<"path">> => <<"push">>, <<"slot">> => 5 },
    #{}
).
 
%% Push with initial message
{ok, Result} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"push">>,
        <<"method">> => <<"POST">>,
        <<"body">> => hb_message:commit(#{
            <<"target">> => TargetProcessID,
            <<"action">> => <<"Eval">>,
            <<"data">> => <<"return 42">>
        }, Wallet)
    },
    #{}
).
 
%% Async push
{ok, _Spawned} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"push">>,
        <<"slot">> => 0,
        <<"push-mode">> => <<"async">>
    },
    #{}
).
 
%% Limit result depth
{ok, TreeOnly} = hb_ao:resolve(
    Process,
    #{
        <<"path">> => <<"push">>,
        <<"slot">> => 0,
        <<"result-depth">> => 0
    },
    #{}
).
 
%% Recursive ping-pong
PingPongScript = <<"
    Handlers.add('Ping',
        { Action = 'Ping' },
        function(m)
            Count = tonumber(m.Count) or 0
            if Count < 10 then
                Send({ 
                    Target = ao.id, 
                    Action = 'Ping', 
                    Count = Count + 1 
                })
            end
        end
    )
    Send({ Target = ao.id, Action = 'Ping', Count = 1 })
">>,
 
dev_process:schedule_aos_call(Process, PingPongScript),
{ok, Result} = hb_ao:resolve(
    Process,
    #{ <<"path">> => <<"push">>, <<"slot">> => 0 },
    #{}
).
% Recursively pushes all 10 messages

Process Initialization

case find_type(hb_ao:get(<<"body">>, Assignment, Opts), Opts) of
    <<"Process">> ->
        % Target is a process definition
        % Schedule it (initializes)
        {ok, Assignment};
    _ ->
        % Regular message, push it
        push_with_mode(Process, Assignment, Opts)
end.
Process Detection:
  • Checks type tag or field
  • If Process → just schedule (init)
  • Otherwise → schedule and push

Message Scheduling

schedule_initial_message(Process, Req, Opts) ->
    Body = hb_ao:get(<<"body">>, Req, Opts),
    hb_ao:resolve(
        {as, <<"process@1.0">>, Process},
        #{
            <<"path">> => <<"schedule">>,
            <<"method">> => <<"POST">>,
            <<"body">> => Body
        },
        Opts
    ).

Target Resolution

case hb_cache:read(Target, Opts) of
    {ok, DownstreamProcess} ->
        % Process found, push message
        push_result_message(...);
    not_found ->
        % Process not found, return error
        #{
            <<"response">> => <<"error">>,
            <<"status">> => 404,
            <<"target">> => Target,
            <<"reason">> => <<"Could not access target process!">>
        }
end.

Error Handling

Missing Target

(Key, Msg = #{ }) when not is_map_key(<<"target">>, Msg) ->
    #{
        <<"response">> => <<"error">>,
        <<"status">> => 404,
        <<"outbox-index">> => Key,
        <<"reason">> => <<"Target process not available.">>,
        <<"message">> => Msg
    }

Evaluation Errors

MsgToPush = case maybe_evaluate_message(RawMsgToPush, Opts) of
    {ok, R} -> R;
    Err ->
        #{
            <<"resolve">> => <<"error">>,
            <<"target">> => ID,
            <<"status">> => 400,
            <<"outbox-index">> => Key,
            <<"reason">> => Err,
            <<"source">> => RawMsgToPush
        }
end.

Compute Errors

case {Status, hb_ao:get(<<"outbox">>, Result, #{}, Opts)} of
    {ok, Outbox} -> 
        % Process outbox
        ...;
    {Err, Error} when Err == error; Err == failure ->
        {error, Error}
end.

Result Structure

Successful Push

{ok, #{
    <<"slot">> => 5,
    <<"process">> => ProcessID,
    % Downstream results (if result-depth > 0)
    <<"1">> => #{
        <<"slot">> => 0,
        <<"process">> => TargetProcessID,
        <<"resulted-in">> => SlotNumber
    },
    <<"2">> => #{
        <<"slot">> => 1,
        <<"process">> => OtherProcessID
    }
}}

Empty Outbox

{ok, #{
    <<"slot">> => 5,
    <<"process">> => ProcessID
}}

With Errors

{ok, #{
    <<"slot">> => 5,
    <<"process">> => ProcessID,
    <<"1">> => #{
        <<"response">> => <<"error">>,
        <<"status">> => 404,
        <<"target">> => UnknownProcessID,
        <<"reason">> => <<"Could not access target process!">>
    }
}}

Base ID Calculation

calculate_base_id(Process, Opts) ->
    case hb_ao:get(<<"process">>, Process, not_found, Opts) of
        not_found -> 
            hb_message:id(Process, all, Opts);
        ProcessDef -> 
            hb_message:id(ProcessDef, all, Opts)
    end.

Purpose: Track original process for nested pushes


Outbox Normalization

hb_util:lower_case_key_map(
    hb_ao:normalize_keys(
        hb_private:reset(Outbox)
    ),
    Opts
)
Steps:
  1. Reset private fields
  2. Normalize key format
  3. Convert to lowercase keys

Integration Examples

With Process Device

Process = #{
    <<"device">> => <<"process@1.0">>,
    <<"push-device">> => <<"push@1.0">>,
    <<"push-mode">> => <<"sync">>
}.
 
% Automatic pushing after computation
{ok, _} = hb_ao:resolve(
    Process,
    #{ <<"path">> => <<"push">>, <<"slot">> => 0 },
    #{}
).

With HTTP Server

Node = hb_http_server:start_node(#{
    node_processes => #{
        <<"my-process">> => Process
    }
}),
 
{ok, Result} = hb_http:post(
    Node,
    hb_message:commit(#{
        <<"path">> => <<"/my-process~node-process@1.0/push">>,
        <<"slot">> => 0
    }, Wallet),
    #{}
).

Oracle Pattern

OracleScript = <<"
    Handlers.add('Oracle',
        function(m) return true end,
        function(m)
            Send({
                target = ao.id,
                resolve = '/~relay@1.0/call',
                ['relay-path'] = 'https://api.external.com/data'
            })
        end
    )
">>,
 
dev_process:schedule_aos_call(Process, OracleScript),
{ok, Result} = hb_ao:resolve(
    Process,
    #{ <<"path">> => <<"push">>, <<"slot">> => 0 },
    #{}
).
% Resolves external path and pushes result back

Performance Considerations

Synchronous Push

  • Pros: Complete result tree, error handling
  • Cons: Blocking, can be slow for deep trees
  • Use: When results needed immediately

Asynchronous Push

  • Pros: Non-blocking, parallel processing
  • Cons: No results, fire-and-forget
  • Use: Background processing, notifications

Result Depth

  • Depth 0: Minimal data, fast
  • Depth 1: Reasonable balance
  • Depth 2+: Full details, slower

Message Type Detection

find_type(Msg, Opts) ->
    hb_ao:get_first(
        [
            {{as, <<"message@1.0">>, Msg}, <<"type">>},
            {{as, <<"message@1.0">>, Msg}, <<"path">>}
        ],
        undefined,
        Opts
    ).

References

  • Process Device - dev_process.erl
  • AO Core - hb_ao.erl
  • Cache - hb_cache.erl
  • Message - hb_message.erl

Notes

  1. Recursive Nature: Continues pushing until no more outbox messages
  2. Target Resolution: Requires processes in cache
  3. Process Initialization: Detects and initializes new processes
  4. Path Evaluation: Supports dynamic message generation
  5. Metadata Tracking: Full audit trail of push chain
  6. Error Aggregation: Collects all errors in result tree
  7. Depth Control: Configurable result detail level
  8. Mode Flexibility: Sync or async execution
  9. Outbox Format: Normalized map with string keys
  10. ID Calculation: Tracks both signed and unsigned IDs
  11. Private Reset: Clears sensitive data before propagation
  12. Authority Propagation: Maintains authority chain
  13. Scheduler Propagation: Preserves scheduler configuration
  14. Result Merging: Combines metadata with downstream results
  15. HTTP Compatible: Full integration with HTTP interface