dev_push.erl - Recursive Message Pushing Device
Overview
Purpose: Evaluate and recursively push messages between processes
Module: dev_push
Pattern: Compute → Extract Outbox → Push Downstream → Recurse
Device: Push@1.0
This device evaluates process messages, extracts outgoing messages from the outbox, and recursively pushes them to target processes. Continues until no remaining messages to push, enabling complex multi-process workflows.
Architecture
Flow:
Process Compute → Extract Outbox → For Each Message:
├─ Resolve Target Process
├─ Schedule Message
├─ Push Recursively
└─ Aggregate ResultsDependencies
- HyperBEAM:
hb_ao,hb_cache,hb_util,hb_maps,hb_private,hb_path,hb_message,hb_http,hb_http_server - Process:
dev_process - Arweave:
ar_bundles - Includes:
include/hb.hrl
Public Functions Overview
%% Main Push Function
-spec push(Base, Req, Opts) -> {ok, PushResult} | {error, Reason}.Public Function
push/3
-spec push(Base, Req, Opts) -> {ok, PushResult} | {error, Reason}
when
Base :: map(),
Req :: map(),
Opts :: map(),
PushResult :: map().Description: Push message or slot number, recursively processing downstream messages until completion.
Two Modes: 1. Initial Message Mode:- Request has
bodywith process or message - Schedules message to process
- If target is process definition → initialize it
- Otherwise → push scheduled message
- Request has
slotnumber - Computes existing slot
- Pushes resulting outbox messages
result-depth- Depth of full results to include (default: 1)push-mode- Sync or async pushing (default: sync)
-module(dev_push_test).
-include_lib("eunit/include/eunit.hrl").
push_simple_message_test() ->
dev_process:init(),
Process = dev_process:test_aos_process(),
{ok, _} = hb_cache:write(Process, #{}),
Script = <<"
Handlers.add('Test',
{ Action = 'Test' },
function(m)
Send({ Target = ao.id, Action = 'Reply', Data = 'OK' })
end
)
">>,
{ok, _} = dev_process:schedule_aos_call(Process, Script),
% Push with message in body
{ok, PushResult} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"push">>,
<<"slot">> => 0
},
#{}
),
?assert(is_map(PushResult)).
push_recursive_test() ->
dev_process:init(),
Client = dev_process:test_aos_process(),
{ok, _} = hb_cache:write(Client, #{}),
PingScript = <<"
Handlers.add('Ping',
{ Action = 'Ping' },
function(m)
C = tonumber(m.Count) or 0
if C < 3 then
Send({ Target = ao.id, Action = 'Ping', Count = C + 1 })
end
end
)
Send({ Target = ao.id, Action = 'Ping', Count = 1 })
">>,
{ok, _} = dev_process:schedule_aos_call(Client, PingScript),
{ok, Result} = hb_ao:resolve(
Client,
#{ <<"path">> => <<"push">>, <<"slot">> => 0 },
#{}
),
?assert(is_map(Result)).Push Modes
Synchronous (Default)
is_async(Process, Req, Opts) ->
hb_ao:get_first(
[
{Req, <<"push-mode">>},
{Process, <<"push-mode">>},
{Process, <<"process/push-mode">>}
],
<<"sync">>,
Opts
).- Waits for push completion
- Returns complete result tree
- Blocks until all downstream pushes done
Asynchronous
push_with_mode(Process, Req, Opts) ->
case is_async(Process, Req, Opts) of
<<"async">> ->
spawn(fun() -> do_push(Process, Req, Opts) end);
<<"sync">> ->
do_push(Process, Req, Opts)
end.- Spawns separate process
- Returns immediately
- No result collection
Push Flow
1. Compute Results
{Status, Result} = hb_ao:resolve(
{as, <<"process@1.0">>, PrimaryProcess},
#{ <<"path">> => <<"compute/results">>, <<"slot">> => Slot },
Opts
).2. Extract Outbox
case hb_ao:get(<<"outbox">>, Result, #{}, Opts) of
NoResults when ?IS_EMPTY_MESSAGE(NoResults) ->
{ok, #{<<"slot">> => Slot, <<"process">> => ID}};
Outbox ->
% Process each outbox message
...
end.3. Process Each Message
hb_maps:map(
fun(Key, RawMsgToPush) ->
MsgToPush = maybe_evaluate_message(RawMsgToPush, Opts),
case hb_cache:read(Target, Opts) of
{ok, DownstreamProcess} ->
push_result_message(DownstreamProcess, MsgToPush, ...);
not_found ->
#{ <<"response">> => <<"error">>, <<"status">> => 404 }
end
end,
Outbox,
Opts
).4. Push Downstream
push_result_message(DownstreamProcess, MsgToPush, Metadata, Opts) ->
% Schedule message
{ok, Assignment} = schedule_message(...),
% Recursively push
push(DownstreamProcess, Assignment, Opts).Message Evaluation
Path-Based Evaluation
maybe_evaluate_message(Message = #{ <<"path">> := Path }, Opts) ->
% Remove target temporarily
Target = maps:get(<<"target">>, Message),
WithoutTarget = maps:without([<<"target">>], Message),
% Evaluate path
case hb_ao:resolve(WithoutTarget, Opts) of
{ok, Result} ->
% Re-add target
{ok, Result#{ <<"target">> => Target }};
Error -> Error
end.% Outbox message with evaluation
#{
<<"target">> => ProcessID,
<<"path">> => <<"/compute-value">>,
<<"params">> => #{ <<"x">> => 42 }
}
% Evaluates path, then pushes result to targetResult Depth Control
IncludeDepth = hb_ao:get(<<"result-depth">>, Assignment, 1, Opts),
AdditionalRes = case IncludeDepth of
X when X > 0 -> Result; % Include full result
_ -> #{} % Only tree structure
end.0- Only push tree (slot, process IDs)1- First level full results + tree2+- Multiple levels of full results
Metadata Propagation
Metadata = #{
<<"process">> => SourceProcessID,
<<"slot">> => SourceSlot,
<<"outbox-key">> => OutboxKey,
<<"result-depth">> => IncludeDepth - 1,
<<"from-base">> => BaseID,
<<"from-uncommitted">> => UncommittedID,
<<"from-scheduler">> => SchedulerDevice,
<<"from-authority">> => AuthorityDevice
}.- Source process and slot
- Outbox message key
- Remaining result depth
- Process identifiers
- Scheduler configuration
- Authority configuration
Common Patterns
%% Push a computed slot
{ok, Result} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"push">>, <<"slot">> => 5 },
#{}
).
%% Push with initial message
{ok, Result} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"push">>,
<<"method">> => <<"POST">>,
<<"body">> => hb_message:commit(#{
<<"target">> => TargetProcessID,
<<"action">> => <<"Eval">>,
<<"data">> => <<"return 42">>
}, Wallet)
},
#{}
).
%% Async push
{ok, _Spawned} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"push">>,
<<"slot">> => 0,
<<"push-mode">> => <<"async">>
},
#{}
).
%% Limit result depth
{ok, TreeOnly} = hb_ao:resolve(
Process,
#{
<<"path">> => <<"push">>,
<<"slot">> => 0,
<<"result-depth">> => 0
},
#{}
).
%% Recursive ping-pong
PingPongScript = <<"
Handlers.add('Ping',
{ Action = 'Ping' },
function(m)
Count = tonumber(m.Count) or 0
if Count < 10 then
Send({
Target = ao.id,
Action = 'Ping',
Count = Count + 1
})
end
end
)
Send({ Target = ao.id, Action = 'Ping', Count = 1 })
">>,
dev_process:schedule_aos_call(Process, PingPongScript),
{ok, Result} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"push">>, <<"slot">> => 0 },
#{}
).
% Recursively pushes all 10 messagesProcess Initialization
case find_type(hb_ao:get(<<"body">>, Assignment, Opts), Opts) of
<<"Process">> ->
% Target is a process definition
% Schedule it (initializes)
{ok, Assignment};
_ ->
% Regular message, push it
push_with_mode(Process, Assignment, Opts)
end.- Checks
typetag or field - If
Process→ just schedule (init) - Otherwise → schedule and push
Message Scheduling
schedule_initial_message(Process, Req, Opts) ->
Body = hb_ao:get(<<"body">>, Req, Opts),
hb_ao:resolve(
{as, <<"process@1.0">>, Process},
#{
<<"path">> => <<"schedule">>,
<<"method">> => <<"POST">>,
<<"body">> => Body
},
Opts
).Target Resolution
case hb_cache:read(Target, Opts) of
{ok, DownstreamProcess} ->
% Process found, push message
push_result_message(...);
not_found ->
% Process not found, return error
#{
<<"response">> => <<"error">>,
<<"status">> => 404,
<<"target">> => Target,
<<"reason">> => <<"Could not access target process!">>
}
end.Error Handling
Missing Target
(Key, Msg = #{ }) when not is_map_key(<<"target">>, Msg) ->
#{
<<"response">> => <<"error">>,
<<"status">> => 404,
<<"outbox-index">> => Key,
<<"reason">> => <<"Target process not available.">>,
<<"message">> => Msg
}Evaluation Errors
MsgToPush = case maybe_evaluate_message(RawMsgToPush, Opts) of
{ok, R} -> R;
Err ->
#{
<<"resolve">> => <<"error">>,
<<"target">> => ID,
<<"status">> => 400,
<<"outbox-index">> => Key,
<<"reason">> => Err,
<<"source">> => RawMsgToPush
}
end.Compute Errors
case {Status, hb_ao:get(<<"outbox">>, Result, #{}, Opts)} of
{ok, Outbox} ->
% Process outbox
...;
{Err, Error} when Err == error; Err == failure ->
{error, Error}
end.Result Structure
Successful Push
{ok, #{
<<"slot">> => 5,
<<"process">> => ProcessID,
% Downstream results (if result-depth > 0)
<<"1">> => #{
<<"slot">> => 0,
<<"process">> => TargetProcessID,
<<"resulted-in">> => SlotNumber
},
<<"2">> => #{
<<"slot">> => 1,
<<"process">> => OtherProcessID
}
}}Empty Outbox
{ok, #{
<<"slot">> => 5,
<<"process">> => ProcessID
}}With Errors
{ok, #{
<<"slot">> => 5,
<<"process">> => ProcessID,
<<"1">> => #{
<<"response">> => <<"error">>,
<<"status">> => 404,
<<"target">> => UnknownProcessID,
<<"reason">> => <<"Could not access target process!">>
}
}}Base ID Calculation
calculate_base_id(Process, Opts) ->
case hb_ao:get(<<"process">>, Process, not_found, Opts) of
not_found ->
hb_message:id(Process, all, Opts);
ProcessDef ->
hb_message:id(ProcessDef, all, Opts)
end.Purpose: Track original process for nested pushes
Outbox Normalization
hb_util:lower_case_key_map(
hb_ao:normalize_keys(
hb_private:reset(Outbox)
),
Opts
)- Reset private fields
- Normalize key format
- Convert to lowercase keys
Integration Examples
With Process Device
Process = #{
<<"device">> => <<"process@1.0">>,
<<"push-device">> => <<"push@1.0">>,
<<"push-mode">> => <<"sync">>
}.
% Automatic pushing after computation
{ok, _} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"push">>, <<"slot">> => 0 },
#{}
).With HTTP Server
Node = hb_http_server:start_node(#{
node_processes => #{
<<"my-process">> => Process
}
}),
{ok, Result} = hb_http:post(
Node,
hb_message:commit(#{
<<"path">> => <<"/my-process~node-process@1.0/push">>,
<<"slot">> => 0
}, Wallet),
#{}
).Oracle Pattern
OracleScript = <<"
Handlers.add('Oracle',
function(m) return true end,
function(m)
Send({
target = ao.id,
resolve = '/~relay@1.0/call',
['relay-path'] = 'https://api.external.com/data'
})
end
)
">>,
dev_process:schedule_aos_call(Process, OracleScript),
{ok, Result} = hb_ao:resolve(
Process,
#{ <<"path">> => <<"push">>, <<"slot">> => 0 },
#{}
).
% Resolves external path and pushes result backPerformance Considerations
Synchronous Push
- Pros: Complete result tree, error handling
- Cons: Blocking, can be slow for deep trees
- Use: When results needed immediately
Asynchronous Push
- Pros: Non-blocking, parallel processing
- Cons: No results, fire-and-forget
- Use: Background processing, notifications
Result Depth
- Depth 0: Minimal data, fast
- Depth 1: Reasonable balance
- Depth 2+: Full details, slower
Message Type Detection
find_type(Msg, Opts) ->
hb_ao:get_first(
[
{{as, <<"message@1.0">>, Msg}, <<"type">>},
{{as, <<"message@1.0">>, Msg}, <<"path">>}
],
undefined,
Opts
).References
- Process Device -
dev_process.erl - AO Core -
hb_ao.erl - Cache -
hb_cache.erl - Message -
hb_message.erl
Notes
- Recursive Nature: Continues pushing until no more outbox messages
- Target Resolution: Requires processes in cache
- Process Initialization: Detects and initializes new processes
- Path Evaluation: Supports dynamic message generation
- Metadata Tracking: Full audit trail of push chain
- Error Aggregation: Collects all errors in result tree
- Depth Control: Configurable result detail level
- Mode Flexibility: Sync or async execution
- Outbox Format: Normalized map with string keys
- ID Calculation: Tracks both signed and unsigned IDs
- Private Reset: Clears sensitive data before propagation
- Authority Propagation: Maintains authority chain
- Scheduler Propagation: Preserves scheduler configuration
- Result Merging: Combines metadata with downstream results
- HTTP Compatible: Full integration with HTTP interface