Skip to content

L4: Decentralized Data Platform

Build a data storage device with content-addressed storage, bundle creation, tag-based queries, and node management.

What You'll Build

A decentralized data platform with these endpoints:

POST /~dataplatform@1.0/upload           Upload data with tags
GET  /~dataplatform@1.0/upload_status    Check upload status
GET  /~dataplatform@1.0/query            Query by ID
POST /~dataplatform@1.0/query_by_tags    Query by tags
POST /~dataplatform@1.0/create_bundle    Create data bundle
GET  /~dataplatform@1.0/verify_bundle    Verify bundle integrity
GET  /~dataplatform@1.0/nodes            List configured nodes
POST /~dataplatform@1.0/add_node         Add node
POST /~dataplatform@1.0/remove_node      Remove node

What You'll Learn

ConceptPurpose
Content addressingData identified by hash
Tag-based indexingQuery data by metadata
Bundle creationGroup multiple items
Node managementConfigure replication targets

Prerequisites


Part 1: Device Structure

Create HyperBEAM/src/dev_dataplatform.erl:

%%%-------------------------------------------------------------------
%%% @doc Decentralized Data Platform Device
%%%
%%% Data storage with bundle creation and distributed queries.
%%%
%%% @end
%%%-------------------------------------------------------------------
-module(dev_dataplatform).
-export([info/3]).
 
%% Upload operations
-export([upload/3, upload_status/3]).
 
%% Query operations
-export([query/3, query_by_tags/3]).
 
%% Bundle operations
-export([create_bundle/3, verify_bundle/3]).
 
%% Node management
-export([nodes/3, add_node/3, remove_node/3]).
 
-include("include/hb.hrl").
 
-define(ITEMS_KEY, <<"platform-items">>).
-define(INDEX_KEY, <<"platform-index">>).
-define(STATUS_KEY, <<"platform-status">>).
-define(BUNDLES_KEY, <<"platform-bundles">>).
-define(NODES_KEY, <<"platform-nodes">>).

Part 2: Device Info

%%====================================================================
%% Device Info
%%====================================================================
 
info(_M1, _M2, _Opts) ->
    {ok, #{
        <<"name">> => <<"dataplatform">>,
        <<"version">> => <<"1.0">>,
        <<"description">> => <<"Decentralized data storage with Arweave">>,
        <<"exports">> => [
            <<"upload">>, <<"upload_status">>,
            <<"query">>, <<"query_by_tags">>,
            <<"create_bundle">>, <<"verify_bundle">>,
            <<"nodes">>, <<"add_node">>, <<"remove_node">>
        ]
    }}.

Part 3: Content-Addressed Storage

The Key Concept

Every piece of data is identified by its cryptographic hash:

generate_item_id(Data) ->
    Hash = crypto:hash(sha256, Data),
    hb_util:encode(Hash).

This means:

  • Same data always gets the same ID
  • Data integrity is verifiable
  • Deduplication is automatic

Upload Implementation

%%====================================================================
%% Upload Operations
%%====================================================================
 
upload(M1, M2, Opts) ->
    Data = maps:get(<<"data">>, M2, not_found),
    Tags = maps:get(<<"tags">>, M2, #{}),
 
    case Data of
        not_found ->
            {error, #{<<"error">> => <<"data_required">>}};
        _ ->
            %% Create item ID from content hash
            ItemId = generate_item_id(Data),
 
            %% Create item
            Item = #{
                <<"id">> => ItemId,
                <<"data">> => Data,
                <<"tags">> => format_tags(Tags),
                <<"created">> => erlang:system_time(second)
            },
 
            %% Store item
            M1Updated = store_item(M1, ItemId, Item),
 
            %% Update index
            M1Final = update_index(M1Updated, ItemId),
 
            %% Set upload status
            M1WithStatus = update_status(M1Final, ItemId, <<"uploaded">>),
 
            {ok, maps:merge(M1WithStatus, #{
                <<"id">> => ItemId,
                <<"status">> => <<"uploaded">>,
                <<"size">> => byte_size(Data)
            })}
    end.

Upload Status

upload_status(M1, M2, _Opts) ->
    ItemId = maps:get(<<"id">>, M2, not_found),
 
    case ItemId of
        not_found ->
            {error, #{<<"error">> => <<"id_required">>}};
        _ ->
            Status = get_status(M1, ItemId),
            {ok, #{
                <<"id">> => ItemId,
                <<"status">> => Status
            }}
    end.

Part 4: Query Operations

Query by ID

%%====================================================================
%% Query Operations
%%====================================================================
 
query(M1, M2, _Opts) ->
    ItemId = maps:get(<<"id">>, M2, not_found),
 
    case ItemId of
        not_found ->
            {error, #{<<"error">> => <<"id_required">>}};
        _ ->
            case get_item(M1, ItemId) of
                {ok, Item} ->
                    {ok, #{
                        <<"id">> => ItemId,
                        <<"data">> => maps:get(<<"data">>, Item, <<>>),
                        <<"tags">> => maps:get(<<"tags">>, Item, [])
                    }};
                not_found ->
                    {error, #{<<"error">> => <<"not_found">>}}
            end
    end.

Query by Tags

Filter items by their metadata tags:

query_by_tags(M1, M2, _Opts) ->
    QueryTags = maps:get(<<"tags">>, M2, #{}),
    Limit = maps:get(<<"limit">>, M2, 100),
 
    %% Get all item IDs
    Index = get_index(M1),
 
    %% Filter by tags
    Matches = lists:filtermap(fun(ItemId) ->
        case get_item(M1, ItemId) of
            {ok, Item} ->
                ItemTags = maps:get(<<"tags">>, Item, []),
                case matches_tags(ItemTags, QueryTags) of
                    true -> {true, #{
                        <<"id">> => ItemId,
                        <<"tags">> => ItemTags
                    }};
                    false -> false
                end;
            _ ->
                false
        end
    end, Index),
 
    %% Apply limit
    Limited = lists:sublist(Matches, Limit),
 
    {ok, #{
        <<"results">> => Limited,
        <<"total">> => length(Matches),
        <<"returned">> => length(Limited)
    }}.

Tag Matching

matches_tags(ItemTags, QueryTags) when is_map(QueryTags) ->
    maps:fold(fun(Name, Value, Acc) ->
        Acc andalso has_tag(ItemTags, Name, Value)
    end, true, QueryTags);
matches_tags(_ItemTags, _) ->
    true.
 
has_tag(Tags, Name, Value) ->
    lists:any(fun(Tag) ->
        maps:get(<<"name">>, Tag, <<>>) =:= Name andalso
        maps:get(<<"value">>, Tag, <<>>) =:= Value
    end, Tags).

Part 5: Bundle Operations

Bundles group multiple data items into a single unit.

Create Bundle

%%====================================================================
%% Bundle Operations
%%====================================================================
 
create_bundle(M1, M2, _Opts) ->
    Items = maps:get(<<"items">>, M2, not_found),
 
    case Items of
        not_found ->
            {error, #{<<"error">> => <<"items_required">>}};
        _ when is_list(Items) ->
            %% Create items with IDs
            ItemsWithIds = lists:map(fun(ItemSpec) ->
                Data = maps:get(<<"data">>, ItemSpec, <<>>),
                Tags = maps:get(<<"tags">>, ItemSpec, #{}),
                ItemId = generate_item_id(Data),
                #{
                    <<"id">> => ItemId,
                    <<"data">> => Data,
                    <<"tags">> => format_tags(Tags),
                    <<"signature">> => generate_signature(Data)
                }
            end, Items),
 
            %% Create bundle
            BundleId = generate_bundle_id(ItemsWithIds),
            Bundle = #{
                <<"id">> => BundleId,
                <<"items">> => ItemsWithIds,
                <<"created">> => erlang:system_time(second)
            },
 
            %% Store bundle
            M1Updated = store_bundle(M1, BundleId, Bundle),
 
            {ok, maps:merge(M1Updated, #{
                <<"bundle_id">> => BundleId,
                <<"item_count">> => length(ItemsWithIds),
                <<"item_ids">> => [maps:get(<<"id">>, I) || I <- ItemsWithIds]
            })};
        _ ->
            {error, #{<<"error">> => <<"items_must_be_list">>}}
    end.
 
generate_bundle_id(Items) ->
    Serialized = term_to_binary(Items),
    Hash = crypto:hash(sha256, Serialized),
    hb_util:encode(Hash).
 
generate_signature(Data) ->
    %% Simplified signature for demo
    Hash = crypto:hash(sha256, Data),
    hb_util:encode(Hash).

Verify Bundle

verify_bundle(M1, M2, _Opts) ->
    BundleId = maps:get(<<"bundle_id">>, M2, not_found),
 
    case BundleId of
        not_found ->
            {error, #{<<"error">> => <<"bundle_id_required">>}};
        _ ->
            case get_bundle(M1, BundleId) of
                {ok, Bundle} ->
                    Items = maps:get(<<"items">>, Bundle, []),
                    %% Check all items have signatures
                    AllValid = lists:all(fun(Item) ->
                        maps:is_key(<<"signature">>, Item)
                    end, Items),
 
                    {ok, #{
                        <<"valid">> => AllValid,
                        <<"item_count">> => length(Items)
                    }};
                not_found ->
                    {error, #{<<"error">> => <<"bundle_not_found">>}}
            end
    end.

Part 6: Node Management

Manage a list of nodes for potential replication or querying.

List Nodes

%%====================================================================
%% Node Management
%%====================================================================
 
nodes(M1, _M2, _Opts) ->
    NodeList = get_nodes(M1),
    {ok, #{<<"nodes">> => NodeList, <<"count">> => length(NodeList)}}.

Add Node

add_node(M1, M2, _Opts) ->
    NodeUrl = maps:get(<<"url">>, M2, not_found),
 
    case NodeUrl of
        not_found ->
            {error, #{<<"error">> => <<"url_required">>}};
        _ ->
            NodeList = get_nodes(M1),
            case lists:member(NodeUrl, NodeList) of
                true ->
                    {ok, #{<<"message">> => <<"node_already_exists">>}};
                false ->
                    NewNodes = [NodeUrl | NodeList],
                    M1Updated = save_nodes(M1, NewNodes),
                    {ok, maps:merge(M1Updated, #{
                        <<"message">> => <<"node_added">>,
                        <<"url">> => NodeUrl
                    })}
            end
    end.

Remove Node

remove_node(M1, M2, _Opts) ->
    NodeUrl = maps:get(<<"url">>, M2, not_found),
 
    case NodeUrl of
        not_found ->
            {error, #{<<"error">> => <<"url_required">>}};
        _ ->
            NodeList = get_nodes(M1),
            NewNodes = lists:delete(NodeUrl, NodeList),
            M1Updated = save_nodes(M1, NewNodes),
            {ok, maps:merge(M1Updated, #{
                <<"message">> => <<"node_removed">>,
                <<"url">> => NodeUrl
            })}
    end.

Part 7: Internal Helpers

Storage Functions

%%====================================================================
%% Internal Functions
%%====================================================================
 
generate_item_id(Data) ->
    Hash = crypto:hash(sha256, Data),
    hb_util:encode(Hash).
 
format_tags(Tags) when is_map(Tags) ->
    maps:fold(fun(Name, Value, Acc) ->
        [#{<<"name">> => to_binary(Name),
           <<"value">> => to_binary(Value)} | Acc]
    end, [], Tags);
format_tags(Tags) when is_list(Tags) ->
    Tags.
 
to_binary(V) when is_binary(V) -> V;
to_binary(V) when is_list(V) -> list_to_binary(V);
to_binary(V) when is_atom(V) -> atom_to_binary(V, utf8);
to_binary(V) when is_integer(V) -> integer_to_binary(V).
 
%% Storage helpers using priv map
store_item(M1, ItemId, Item) ->
    Items = load_priv(M1, ?ITEMS_KEY, #{}),
    NewItems = maps:put(ItemId, Item, Items),
    save_priv(M1, ?ITEMS_KEY, NewItems).
 
get_item(M1, ItemId) ->
    Items = load_priv(M1, ?ITEMS_KEY, #{}),
    case maps:get(ItemId, Items, not_found) of
        not_found -> not_found;
        Item -> {ok, Item}
    end.
 
update_index(M1, ItemId) ->
    Index = get_index(M1),
    case lists:member(ItemId, Index) of
        true -> M1;
        false -> save_priv(M1, ?INDEX_KEY, [ItemId | Index])
    end.
 
get_index(M1) ->
    load_priv(M1, ?INDEX_KEY, []).
 
update_status(M1, ItemId, Status) ->
    StatusMap = load_priv(M1, ?STATUS_KEY, #{}),
    NewStatus = maps:put(ItemId, Status, StatusMap),
    save_priv(M1, ?STATUS_KEY, NewStatus).
 
get_status(M1, ItemId) ->
    StatusMap = load_priv(M1, ?STATUS_KEY, #{}),
    maps:get(ItemId, StatusMap, <<"unknown">>).
 
store_bundle(M1, BundleId, Bundle) ->
    Bundles = load_priv(M1, ?BUNDLES_KEY, #{}),
    NewBundles = maps:put(BundleId, Bundle, Bundles),
    save_priv(M1, ?BUNDLES_KEY, NewBundles).
 
get_bundle(M1, BundleId) ->
    Bundles = load_priv(M1, ?BUNDLES_KEY, #{}),
    case maps:get(BundleId, Bundles, not_found) of
        not_found -> not_found;
        Bundle -> {ok, Bundle}
    end.
 
get_nodes(M1) ->
    load_priv(M1, ?NODES_KEY, []).
 
save_nodes(M1, Nodes) ->
    save_priv(M1, ?NODES_KEY, Nodes).
 
load_priv(M1, Key, Default) ->
    case maps:get(<<"priv">>, M1, #{}) of
        #{Key := Data} -> Data;
        _ -> Default
    end.
 
save_priv(M1, Key, Data) ->
    Priv = maps:get(<<"priv">>, M1, #{}),
    M1#{<<"priv">> => Priv#{Key => Data}}.

Part 8: Testing

Test using hb_ao:resolve with the {as, Module, Msg} pattern.

Test Device Info

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
 
%% Test device info via hb_ao:resolve
info_test() ->
    application:ensure_all_started(hb),
    {ok, Info} = hb_ao:resolve(
        {as, dev_dataplatform, #{}},
        #{<<"path">> => <<"info">>},
        #{}
    ),
    ?assertEqual(<<"dataplatform">>, maps:get(<<"name">>, Info)).

Test Upload

%% Test upload via hb_ao:resolve
upload_test() ->
    application:ensure_all_started(hb),
    M1 = #{},
    {ok, Result} = hb_ao:resolve(
        {as, dev_dataplatform, M1},
        #{
            <<"path">> => <<"upload">>,
            <<"data">> => <<"Hello Arweave">>,
            <<"tags">> => #{<<"Content-Type">> => <<"text/plain">>}
        },
        #{}
    ),
    ?assertMatch(#{<<"id">> := _, <<"status">> := <<"uploaded">>}, Result).

Test Query

%% Test query via hb_ao:resolve
query_test() ->
    application:ensure_all_started(hb),
    M1 = #{},
    %% First upload
    {ok, UploadRes} = hb_ao:resolve(
        {as, dev_dataplatform, M1},
        #{<<"path">> => <<"upload">>, <<"data">> => <<"Test data">>},
        #{}
    ),
    ItemId = maps:get(<<"id">>, UploadRes),
 
    %% Then query
    {ok, QueryRes} = hb_ao:resolve(
        {as, dev_dataplatform, UploadRes},
        #{<<"path">> => <<"query">>, <<"id">> => ItemId},
        #{}
    ),
    ?assertEqual(ItemId, maps:get(<<"id">>, QueryRes)),
    ?assertEqual(<<"Test data">>, maps:get(<<"data">>, QueryRes)).

Test Query by Tags

%% Test query by tags via hb_ao:resolve
query_by_tags_test() ->
    application:ensure_all_started(hb),
    M1 = #{},
    %% Upload with tags
    {ok, M2} = hb_ao:resolve(
        {as, dev_dataplatform, M1},
        #{
            <<"path">> => <<"upload">>,
            <<"data">> => <<"Tagged data">>,
            <<"tags">> => #{<<"category">> => <<"test">>}
        },
        #{}
    ),
 
    %% Query by tags
    {ok, Result} = hb_ao:resolve(
        {as, dev_dataplatform, M2},
        #{
            <<"path">> => <<"query_by_tags">>,
            <<"tags">> => #{<<"category">> => <<"test">>}
        },
        #{}
    ),
    ?assertEqual(1, maps:get(<<"total">>, Result)).

Test Bundle Operations

%% Test bundle creation via hb_ao:resolve
create_bundle_test() ->
    application:ensure_all_started(hb),
    M1 = #{},
    {ok, Result} = hb_ao:resolve(
        {as, dev_dataplatform, M1},
        #{
            <<"path">> => <<"create_bundle">>,
            <<"items">> => [
                #{<<"data">> => <<"Item 1">>, <<"tags">> => #{}},
                #{<<"data">> => <<"Item 2">>, <<"tags">> => #{}}
            ]
        },
        #{}
    ),
    ?assertEqual(2, maps:get(<<"item_count">>, Result)),
    ?assertMatch(#{<<"bundle_id">> := _}, Result).
 
%% Test bundle verification via hb_ao:resolve
verify_bundle_test() ->
    application:ensure_all_started(hb),
    M1 = #{},
    %% Create bundle
    {ok, CreateRes} = hb_ao:resolve(
        {as, dev_dataplatform, M1},
        #{
            <<"path">> => <<"create_bundle">>,
            <<"items">> => [
                #{<<"data">> => <<"Item A">>},
                #{<<"data">> => <<"Item B">>}
            ]
        },
        #{}
    ),
    BundleId = maps:get(<<"bundle_id">>, CreateRes),
 
    %% Verify bundle
    {ok, VerifyRes} = hb_ao:resolve(
        {as, dev_dataplatform, CreateRes},
        #{<<"path">> => <<"verify_bundle">>, <<"bundle_id">> => BundleId},
        #{}
    ),
    ?assertEqual(true, maps:get(<<"valid">>, VerifyRes)),
    ?assertEqual(2, maps:get(<<"item_count">>, VerifyRes)).

Test Node Management

%% Test node management via hb_ao:resolve
node_management_test() ->
    application:ensure_all_started(hb),
    M1 = #{},
 
    %% Initially empty
    {ok, NodesRes1} = hb_ao:resolve(
        {as, dev_dataplatform, M1},
        #{<<"path">> => <<"nodes">>},
        #{}
    ),
    ?assertEqual(0, maps:get(<<"count">>, NodesRes1)),
 
    %% Add node
    {ok, AddRes} = hb_ao:resolve(
        {as, dev_dataplatform, M1},
        #{<<"path">> => <<"add_node">>, <<"url">> => <<"http://node1.example">>},
        #{}
    ),
    ?assertEqual(<<"node_added">>, maps:get(<<"message">>, AddRes)),
 
    %% List nodes
    {ok, NodesRes2} = hb_ao:resolve(
        {as, dev_dataplatform, AddRes},
        #{<<"path">> => <<"nodes">>},
        #{}
    ),
    ?assertEqual(1, maps:get(<<"count">>, NodesRes2)),
 
    %% Remove node
    {ok, RemoveRes} = hb_ao:resolve(
        {as, dev_dataplatform, AddRes},
        #{<<"path">> => <<"remove_node">>, <<"url">> => <<"http://node1.example">>},
        #{}
    ),
    ?assertEqual(<<"node_removed">>, maps:get(<<"message">>, RemoveRes)),
 
    %% Verify removed
    {ok, NodesRes3} = hb_ao:resolve(
        {as, dev_dataplatform, RemoveRes},
        #{<<"path">> => <<"nodes">>},
        #{}
    ),
    ?assertEqual(0, maps:get(<<"count">>, NodesRes3)).
 
-endif.

Run Tests

rebar3 eunit --module=dev_dataplatform

Part 9: Device Registration

Register the dataplatform device to use with the ~device@version URL syntax.

Add to sys.config

{hb, [
    {preloaded_devices, [
        %% ... existing devices ...
        #{name => <<"dataplatform@1.0">>, module => dev_dataplatform}
    ]}
]}

Or Register at Runtime

hb:init(#{
    preloaded_devices => [
        #{name => <<"dataplatform@1.0">>, module => dev_dataplatform}
    ]
}).

Verify Registration

GET  http://localhost:8734/~dataplatform@1.0/info
POST http://localhost:8734/~dataplatform@1.0/upload
POST http://localhost:8734/~dataplatform@1.0/query

Key Concepts

ConceptImplementation
Content addressingcrypto:hash(sha256, Data)
Tag formatList of #{<<"name">> => N, <<"value">> => V}
Tag matchingCheck all query tags exist in item
Bundle integrityAll items have signatures
Node listSimple URL list for replication

Next Steps


Resources