dev_dedup.erl - Message Deduplication Device
Overview
Purpose: Deduplicate messages in evaluation streams to ensure single execution
Module: dev_dedup
Pattern: First-pass deduplication with subject-key configuration
Integration: Typically used in ~process@1.0 evaluation with device stacks
This module implements a deduplication device that tracks seen messages and returns skip status for duplicates. It ensures that messages are only executed once, even if assigned multiple times. The device operates on the first pass of multi-pass executions and can be configured to deduplicate based on different message keys.
Dependencies
- HyperBEAM:
hb_ao,hb_message - Device Layer:
dev_message - Records:
#tx{}frominclude/hb.hrl
Public Functions Overview
%% Device Info
-spec info(Msg) -> DeviceInfo.
%% Internal Handler
-spec handle(Key, Msg1, Msg2, Opts) -> {ok, Msg} | {skip, Msg}.Public Functions
1. info/1
-spec info(Msg) -> DeviceInfo
when
Msg :: map(),
DeviceInfo :: #{
default => HandlerFun,
exclude => [atom()]
}.Description: Return device information structure. Specifies default handler and excluded keys.
Returns:#{
default => fun handle/4,
exclude => [keys, set, id, commit]
}-module(dev_dedup_info_test).
-include_lib("eunit/include/eunit.hrl").
info_test() ->
Info = dev_dedup:info(#{}),
?assert(is_map(Info)),
?assert(maps:is_key(default, Info)),
?assert(maps:is_key(exclude, Info)),
?assertEqual([keys, set, id, commit], maps:get(exclude, Info)).
info_has_handler_test() ->
Info = dev_dedup:info(#{}),
Handler = maps:get(default, Info),
?assert(is_function(Handler, 4)).2. handle/4
-spec handle(Key, Msg1, Msg2, Opts) -> {ok, UpdatedMsg1} | {skip, Msg1}
when
Key :: binary(),
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
UpdatedMsg1 :: map().Description: Handle deduplication for a key call. Forwards keys and set to dev_message, handles all other keys with deduplication logic.
- Extracts
dedup-subjectkey (defaults tobody) - Gets subject from Msg2 based on subject key
- Checks if this is first pass
- If subject seen before: returns
{skip, Msg1} - If new subject: adds to dedup list and returns
{ok, UpdatedMsg1}
SubjectKey = <<"request">>- Use entire request as subjectPass != 1- Skip deduplication check (not first pass)Subject = not_found- Skip deduplication check
-module(dev_dedup_handle_test).
-include_lib("eunit/include/eunit.hrl").
handle_keys_test() ->
% Get the handler from info/1
Info = dev_dedup:info(#{}),
Handler = maps:get(default, Info),
Msg1 = #{<<"key1">> => <<"value1">>, <<"key2">> => <<"value2">>},
Msg2 = #{},
Result = Handler(<<"keys">>, Msg1, Msg2, #{}),
% dev_message:keys returns {ok, Keys} or just Keys
case Result of
{ok, Keys} -> ?assert(is_list(Keys));
Keys when is_list(Keys) -> ?assert(true);
_ -> ?assert(is_map(Result)) % May return a map
end.
handle_set_test() ->
Info = dev_dedup:info(#{}),
Handler = maps:get(default, Info),
Msg1 = #{<<"data">> => <<"old">>},
Msg2 = #{<<"data">> => <<"new">>},
{ok, Result} = Handler(<<"set">>, Msg1, Msg2, #{}),
?assertEqual(<<"new">>, maps:get(<<"data">>, Result)).
handle_first_occurrence_test() ->
Info = dev_dedup:info(#{}),
Handler = maps:get(default, Info),
Msg1 = #{<<"dedup">> => [], <<"pass">> => 1},
Msg2 = #{<<"body">> => #{<<"id">> => <<"msg1">>}},
{ok, Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
DedupList = maps:get(<<"dedup">>, Result),
?assertEqual(1, length(DedupList)).
handle_duplicate_test() ->
Info = dev_dedup:info(#{}),
Handler = maps:get(default, Info),
FirstSubject = #{<<"id">> => <<"msg1">>},
SubjectID = hb_message:id(FirstSubject, all),
Msg1 = #{<<"dedup">> => [SubjectID], <<"pass">> => 1},
Msg2 = #{<<"body">> => FirstSubject},
{skip, _Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
?assert(true).
handle_non_first_pass_test() ->
Info = dev_dedup:info(#{}),
Handler = maps:get(default, Info),
Msg1 = #{<<"dedup">> => [], <<"pass">> => 2},
Msg2 = #{<<"body">> => #{<<"id">> => <<"msg1">>}},
{ok, Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
% Dedup list should not be updated on non-first passes
?assertEqual([], maps:get(<<"dedup">>, Result)).
handle_request_subject_test() ->
Info = dev_dedup:info(#{}),
Handler = maps:get(default, Info),
Msg1 = #{
<<"dedup">> => [],
<<"pass">> => 1,
<<"dedup-subject">> => <<"request">>
},
Msg2 = #{<<"action">> => <<"test">>},
{ok, Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
DedupList = maps:get(<<"dedup">>, Result),
?assertEqual(1, length(DedupList)).
handle_custom_subject_key_test() ->
Info = dev_dedup:info(#{}),
Handler = maps:get(default, Info),
Msg1 = #{
<<"dedup">> => [],
<<"pass">> => 1,
<<"dedup-subject">> => <<"custom-key">>
},
CustomSubject = #{<<"data">> => <<"unique">>},
Msg2 = #{<<"custom-key">> => CustomSubject},
{ok, Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
DedupList = maps:get(<<"dedup">>, Result),
?assertEqual(1, length(DedupList)).Common Patterns
%% Basic deduplication in a device stack
Msg = #{
<<"device">> => <<"stack@1.0">>,
<<"dedup-subject">> => <<"body">>,
<<"device-stack">> => #{
<<"1">> => <<"dedup@1.0">>,
<<"2">> => <<"your-device@1.0">>
}
},
{ok, Result} = hb_ao:resolve(Msg, Request1, #{}),
{ok, Result2} = hb_ao:resolve(Result, Request1, #{}). % Skipped
%% Deduplicate by entire request
Msg = #{
<<"device">> => <<"stack@1.0">>,
<<"dedup-subject">> => <<"request">>,
<<"device-stack">> => #{
<<"1">> => <<"dedup@1.0">>,
<<"2">> => <<"processor@1.0">>
}
},
{ok, Result} = hb_ao:resolve(Msg, #{<<"data">> => <<"test">>}, #{}),
{ok, Result2} = hb_ao:resolve(Result, #{<<"data">> => <<"test">>}, #{}). % Skipped
%% Deduplicate by custom key
Msg = #{
<<"device">> => <<"stack@1.0">>,
<<"dedup-subject">> => <<"transaction-id">>,
<<"device-stack">> => #{
<<"1">> => <<"dedup@1.0">>,
<<"2">> => <<"handler@1.0">>
}
},
Request = #{<<"transaction-id">> => <<"tx-123">>},
{ok, Result} = hb_ao:resolve(Msg, Request, #{}),
{ok, Result2} = hb_ao:resolve(Result, Request, #{}). % Skipped
%% Multipass execution with deduplication
Msg = #{
<<"device">> => <<"stack@1.0">>,
<<"dedup-subject">> => <<"request">>,
<<"device-stack">> => #{
<<"1">> => <<"dedup@1.0">>,
<<"2">> => <<"processor@1.0">>,
<<"3">> => <<"multipass@1.0">>
},
<<"passes">> => 2
},
% Dedup only checks on first pass
{ok, Result} = hb_ao:resolve(Msg, Request, #{}).Configuration
Subject Key Options
Default (<<"body">>)
#{
<<"dedup-subject">> => <<"body">>
}
% Deduplicates based on message body#{
<<"dedup-subject">> => <<"request">>
}
% Deduplicates based on entire request#{
<<"dedup-subject">> => <<"transaction-id">>
}
% Deduplicates based on transaction-id fieldState Structure
Dedup List
#{
<<"dedup">> => [SubjectID1, SubjectID2, ...]
}
% List of already-seen subject IDsPass Tracking
#{
<<"pass">> => 1 % First pass
}
#{
<<"pass">> => 2 % Subsequent pass (dedup skipped)
}Execution Flow
1. Check if key is excluded (keys, set, id, commit)
→ If yes: Forward to dev_message
2. Extract dedup-subject key (default: body)
3. Get subject from Msg2 based on subject-key
→ If subject-key == "request": use entire Msg2
→ Otherwise: extract value of subject-key
4. Check if first pass (pass == 1)
→ If not first pass: return {ok, Msg1} (skip check)
5. If subject not found: return {ok, Msg1}
6. Calculate subject ID using hb_message:id/2
7. Check if subject ID in dedup list
→ If yes: return {skip, Msg1}
→ If no: add to dedup list and return {ok, UpdatedMsg1}Integration with Device Stacks
Stack Position
Deduplication device should typically be placed early in the stack:
#{
<<"device-stack">> => #{
<<"1">> => <<"dedup@1.0">>, % First: deduplicate
<<"2">> => <<"validator@1.0">>, % Then: validate
<<"3">> => <<"processor@1.0">> % Finally: process
}
}With Multipass
#{
<<"device-stack">> => #{
<<"1">> => <<"dedup@1.0">>,
<<"2">> => <<"processor@1.0">>,
<<"3">> => <<"multipass@1.0">> % Repeats execution
},
<<"passes">> => 3
}
% Dedup only checks on pass 1
% Passes 2-3 skip dedup checkReferences
- Message Device -
dev_message.erl - Message Handling -
hb_message.erl - AO Resolution -
hb_ao.erl - Device Stacks -
dev_stack.erl - Multipass -
dev_multipass.erl
Notes
- First-Pass Only: Deduplication only occurs on first pass (pass == 1)
- Memory Storage: Currently stores dedup list in memory (may move to cache)
- Subject Configuration: Flexible subject key selection via
dedup-subject - Request Dedup: Can deduplicate entire requests with
subject-key = request - ID Calculation: Uses
hb_message:id(Subject, all)for unique identification - Skip Status: Returns
{skip, Msg}instead of{ok, Msg}for duplicates - Excluded Keys:
keys,set,id,commitbypass deduplication - Stack Compatible: Designed for use in device stack configurations
- Multipass Support: Automatically skips checks on non-first passes
- Process Integration: Commonly used in
~process@1.0evaluation flows - State Preservation: Dedup list persists across multiple requests in same execution
- Custom Keys: Any message field can be used as deduplication subject
- Accumulative: Dedup list grows with each unique message
- No Expiration: Currently no TTL or size limit on dedup list
- Permissionless: Works with any message structure or format