Skip to content

dev_dedup.erl - Message Deduplication Device

Overview

Purpose: Deduplicate messages in evaluation streams to ensure single execution
Module: dev_dedup
Pattern: First-pass deduplication with subject-key configuration
Integration: Typically used in ~process@1.0 evaluation with device stacks

This module implements a deduplication device that tracks seen messages and returns skip status for duplicates. It ensures that messages are only executed once, even if assigned multiple times. The device operates on the first pass of multi-pass executions and can be configured to deduplicate based on different message keys.

Dependencies

  • HyperBEAM: hb_ao, hb_message
  • Device Layer: dev_message
  • Records: #tx{} from include/hb.hrl

Public Functions Overview

%% Device Info
-spec info(Msg) -> DeviceInfo.
 
%% Internal Handler
-spec handle(Key, Msg1, Msg2, Opts) -> {ok, Msg} | {skip, Msg}.

Public Functions

1. info/1

-spec info(Msg) -> DeviceInfo
    when
        Msg :: map(),
        DeviceInfo :: #{
            default => HandlerFun,
            exclude => [atom()]
        }.

Description: Return device information structure. Specifies default handler and excluded keys.

Returns:
#{
    default => fun handle/4,
    exclude => [keys, set, id, commit]
}
Test Code:
-module(dev_dedup_info_test).
-include_lib("eunit/include/eunit.hrl").
 
info_test() ->
    Info = dev_dedup:info(#{}),
    ?assert(is_map(Info)),
    ?assert(maps:is_key(default, Info)),
    ?assert(maps:is_key(exclude, Info)),
    ?assertEqual([keys, set, id, commit], maps:get(exclude, Info)).
 
info_has_handler_test() ->
    Info = dev_dedup:info(#{}),
    Handler = maps:get(default, Info),
    ?assert(is_function(Handler, 4)).

2. handle/4

-spec handle(Key, Msg1, Msg2, Opts) -> {ok, UpdatedMsg1} | {skip, Msg1}
    when
        Key :: binary(),
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        UpdatedMsg1 :: map().

Description: Handle deduplication for a key call. Forwards keys and set to dev_message, handles all other keys with deduplication logic.

Deduplication Logic:
  1. Extracts dedup-subject key (defaults to body)
  2. Gets subject from Msg2 based on subject key
  3. Checks if this is first pass
  4. If subject seen before: returns {skip, Msg1}
  5. If new subject: adds to dedup list and returns {ok, UpdatedMsg1}
Special Cases:
  • SubjectKey = <<"request">> - Use entire request as subject
  • Pass != 1 - Skip deduplication check (not first pass)
  • Subject = not_found - Skip deduplication check
Test Code:
-module(dev_dedup_handle_test).
-include_lib("eunit/include/eunit.hrl").
 
handle_keys_test() ->
    % Get the handler from info/1
    Info = dev_dedup:info(#{}),
    Handler = maps:get(default, Info),
    Msg1 = #{<<"key1">> => <<"value1">>, <<"key2">> => <<"value2">>},
    Msg2 = #{},
    Result = Handler(<<"keys">>, Msg1, Msg2, #{}),
    % dev_message:keys returns {ok, Keys} or just Keys
    case Result of
        {ok, Keys} -> ?assert(is_list(Keys));
        Keys when is_list(Keys) -> ?assert(true);
        _ -> ?assert(is_map(Result))  % May return a map
    end.
 
handle_set_test() ->
    Info = dev_dedup:info(#{}),
    Handler = maps:get(default, Info),
    Msg1 = #{<<"data">> => <<"old">>},
    Msg2 = #{<<"data">> => <<"new">>},
    {ok, Result} = Handler(<<"set">>, Msg1, Msg2, #{}),
    ?assertEqual(<<"new">>, maps:get(<<"data">>, Result)).
 
handle_first_occurrence_test() ->
    Info = dev_dedup:info(#{}),
    Handler = maps:get(default, Info),
    Msg1 = #{<<"dedup">> => [], <<"pass">> => 1},
    Msg2 = #{<<"body">> => #{<<"id">> => <<"msg1">>}},
    {ok, Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
    DedupList = maps:get(<<"dedup">>, Result),
    ?assertEqual(1, length(DedupList)).
 
handle_duplicate_test() ->
    Info = dev_dedup:info(#{}),
    Handler = maps:get(default, Info),
    FirstSubject = #{<<"id">> => <<"msg1">>},
    SubjectID = hb_message:id(FirstSubject, all),
    Msg1 = #{<<"dedup">> => [SubjectID], <<"pass">> => 1},
    Msg2 = #{<<"body">> => FirstSubject},
    {skip, _Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
    ?assert(true).
 
handle_non_first_pass_test() ->
    Info = dev_dedup:info(#{}),
    Handler = maps:get(default, Info),
    Msg1 = #{<<"dedup">> => [], <<"pass">> => 2},
    Msg2 = #{<<"body">> => #{<<"id">> => <<"msg1">>}},
    {ok, Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
    % Dedup list should not be updated on non-first passes
    ?assertEqual([], maps:get(<<"dedup">>, Result)).
 
handle_request_subject_test() ->
    Info = dev_dedup:info(#{}),
    Handler = maps:get(default, Info),
    Msg1 = #{
        <<"dedup">> => [],
        <<"pass">> => 1,
        <<"dedup-subject">> => <<"request">>
    },
    Msg2 = #{<<"action">> => <<"test">>},
    {ok, Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
    DedupList = maps:get(<<"dedup">>, Result),
    ?assertEqual(1, length(DedupList)).
 
handle_custom_subject_key_test() ->
    Info = dev_dedup:info(#{}),
    Handler = maps:get(default, Info),
    Msg1 = #{
        <<"dedup">> => [],
        <<"pass">> => 1,
        <<"dedup-subject">> => <<"custom-key">>
    },
    CustomSubject = #{<<"data">> => <<"unique">>},
    Msg2 = #{<<"custom-key">> => CustomSubject},
    {ok, Result} = Handler(<<"compute">>, Msg1, Msg2, #{}),
    DedupList = maps:get(<<"dedup">>, Result),
    ?assertEqual(1, length(DedupList)).

Common Patterns

%% Basic deduplication in a device stack
Msg = #{
    <<"device">> => <<"stack@1.0">>,
    <<"dedup-subject">> => <<"body">>,
    <<"device-stack">> => #{
        <<"1">> => <<"dedup@1.0">>,
        <<"2">> => <<"your-device@1.0">>
    }
},
{ok, Result} = hb_ao:resolve(Msg, Request1, #{}),
{ok, Result2} = hb_ao:resolve(Result, Request1, #{}).  % Skipped
 
%% Deduplicate by entire request
Msg = #{
    <<"device">> => <<"stack@1.0">>,
    <<"dedup-subject">> => <<"request">>,
    <<"device-stack">> => #{
        <<"1">> => <<"dedup@1.0">>,
        <<"2">> => <<"processor@1.0">>
    }
},
{ok, Result} = hb_ao:resolve(Msg, #{<<"data">> => <<"test">>}, #{}),
{ok, Result2} = hb_ao:resolve(Result, #{<<"data">> => <<"test">>}, #{}).  % Skipped
 
%% Deduplicate by custom key
Msg = #{
    <<"device">> => <<"stack@1.0">>,
    <<"dedup-subject">> => <<"transaction-id">>,
    <<"device-stack">> => #{
        <<"1">> => <<"dedup@1.0">>,
        <<"2">> => <<"handler@1.0">>
    }
},
Request = #{<<"transaction-id">> => <<"tx-123">>},
{ok, Result} = hb_ao:resolve(Msg, Request, #{}),
{ok, Result2} = hb_ao:resolve(Result, Request, #{}).  % Skipped
 
%% Multipass execution with deduplication
Msg = #{
    <<"device">> => <<"stack@1.0">>,
    <<"dedup-subject">> => <<"request">>,
    <<"device-stack">> => #{
        <<"1">> => <<"dedup@1.0">>,
        <<"2">> => <<"processor@1.0">>,
        <<"3">> => <<"multipass@1.0">>
    },
    <<"passes">> => 2
},
% Dedup only checks on first pass
{ok, Result} = hb_ao:resolve(Msg, Request, #{}).

Configuration

Subject Key Options

Default (<<"body">>)
#{
    <<"dedup-subject">> => <<"body">>
}
% Deduplicates based on message body
Request Subject
#{
    <<"dedup-subject">> => <<"request">>
}
% Deduplicates based on entire request
Custom Key
#{
    <<"dedup-subject">> => <<"transaction-id">>
}
% Deduplicates based on transaction-id field

State Structure

Dedup List

#{
    <<"dedup">> => [SubjectID1, SubjectID2, ...]
}
% List of already-seen subject IDs

Pass Tracking

#{
    <<"pass">> => 1  % First pass
}
#{
    <<"pass">> => 2  % Subsequent pass (dedup skipped)
}

Execution Flow

1. Check if key is excluded (keys, set, id, commit)
   → If yes: Forward to dev_message
   
2. Extract dedup-subject key (default: body)
 
3. Get subject from Msg2 based on subject-key
   → If subject-key == "request": use entire Msg2
   → Otherwise: extract value of subject-key
 
4. Check if first pass (pass == 1)
   → If not first pass: return {ok, Msg1} (skip check)
 
5. If subject not found: return {ok, Msg1}
 
6. Calculate subject ID using hb_message:id/2
 
7. Check if subject ID in dedup list
   → If yes: return {skip, Msg1}
   → If no: add to dedup list and return {ok, UpdatedMsg1}

Integration with Device Stacks

Stack Position

Deduplication device should typically be placed early in the stack:

#{
    <<"device-stack">> => #{
        <<"1">> => <<"dedup@1.0">>,      % First: deduplicate
        <<"2">> => <<"validator@1.0">>,   % Then: validate
        <<"3">> => <<"processor@1.0">>    % Finally: process
    }
}

With Multipass

#{
    <<"device-stack">> => #{
        <<"1">> => <<"dedup@1.0">>,
        <<"2">> => <<"processor@1.0">>,
        <<"3">> => <<"multipass@1.0">>  % Repeats execution
    },
    <<"passes">> => 3
}
% Dedup only checks on pass 1
% Passes 2-3 skip dedup check

References

  • Message Device - dev_message.erl
  • Message Handling - hb_message.erl
  • AO Resolution - hb_ao.erl
  • Device Stacks - dev_stack.erl
  • Multipass - dev_multipass.erl

Notes

  1. First-Pass Only: Deduplication only occurs on first pass (pass == 1)
  2. Memory Storage: Currently stores dedup list in memory (may move to cache)
  3. Subject Configuration: Flexible subject key selection via dedup-subject
  4. Request Dedup: Can deduplicate entire requests with subject-key = request
  5. ID Calculation: Uses hb_message:id(Subject, all) for unique identification
  6. Skip Status: Returns {skip, Msg} instead of {ok, Msg} for duplicates
  7. Excluded Keys: keys, set, id, commit bypass deduplication
  8. Stack Compatible: Designed for use in device stack configurations
  9. Multipass Support: Automatically skips checks on non-first passes
  10. Process Integration: Commonly used in ~process@1.0 evaluation flows
  11. State Preservation: Dedup list persists across multiple requests in same execution
  12. Custom Keys: Any message field can be used as deduplication subject
  13. Accumulative: Dedup list grows with each unique message
  14. No Expiration: Currently no TTL or size limit on dedup list
  15. Permissionless: Works with any message structure or format