Skip to content

dev_copycat.erl - Message Indexing Orchestration Device

Overview

Purpose: Orchestrate indexing of messages from foreign sources into HyperBEAM caches
Module: dev_copycat
Device: ~copycat@1.0
Pattern: External Data → HyperBEAM Cache

This device orchestrates the indexing of messages from external sources (primarily Arweave) into a HyperBEAM node's local caches. It acts as a coordinator, delegating to specific engine implementations for each data source type.

Supported Sources

1. GraphQL Endpoints

  • Engine: dev_copycat_graphql
  • Use Case: Query Arweave gateway GraphQL APIs
  • Features: Pagination, filtering by tags/owner/recipient

2. Arweave Nodes

  • Engine: dev_copycat_arweave
  • Use Case: Fetch blocks directly from Arweave nodes
  • Features: Height-based range fetching, reverse chronological

Dependencies

  • Engines: dev_copycat_graphql, dev_copycat_arweave

Public Functions Overview

%% Source-Specific Functions
-spec graphql(Base, Request, Opts) -> {ok, IndexedCount} | {error, Reason}.
-spec arweave(Base, Request, Opts) -> {ok, FinalHeight} | {error, Reason}.

Public Functions

1. graphql/3

-spec graphql(Base, Request, Opts) -> {ok, IndexedCount} | {error, Reason}
    when
        Base :: map(),
        Request :: map(),
        Opts :: map(),
        IndexedCount :: integer(),
        Reason :: term().

Description: Fetch data from a GraphQL endpoint for replication. Delegates to dev_copycat_graphql:graphql/3.

Request Parameters:
  • <<"query">> - GraphQL query string
  • <<"tag">> / <<"tags">> - Filter by tags
  • <<"owner">> - Filter by transaction owner
  • <<"recipient">> - Filter by recipient
  • <<"node">> (in Opts) - Gateway URL
  • <<"operationName">> - GraphQL operation name
  • <<"variables">> - Query variables
Test Code:
-module(dev_copycat_graphql_test).
-include_lib("eunit/include/eunit.hrl").
 
graphql_with_query_test() ->
    Base = #{},
    Request = #{
        <<"query">> => <<"query { transactions { edges { node { id } } } }">>
    },
    Opts = #{
        <<"node">> => <<"https://arweave.net">>
    },
    % Note: This would hit real endpoint, use mock in actual tests
    Result = dev_copycat:graphql(Base, Request, Opts),
    ?assert(is_tuple(Result)).
 
graphql_with_tag_filter_test() ->
    Request = #{
        <<"tag">> => <<"App-Name">>,
        <<"value">> => <<"MyApp">>
    },
    Result = dev_copycat:graphql(#{}, Request, #{}),
    ?assert(is_tuple(Result)).

2. arweave/3

-spec arweave(Base, Request, Opts) -> {ok, FinalHeight} | {error, Reason}
    when
        Base :: map(),
        Request :: map(),
        Opts :: map(),
        FinalHeight :: integer(),
        Reason :: term().

Description: Fetch data from an Arweave node for replication. Delegates to dev_copycat_arweave:arweave/3.

Request Parameters:
  • <<"from">> - Starting block height (default: current height)
  • <<"to">> - Ending block height (default: 0 - Genesis)
Test Code:
-module(dev_copycat_arweave_test).
-include_lib("eunit/include/eunit.hrl").
 
arweave_range_test() ->
    Base = #{},
    Request = #{
        <<"from">> => 1000,
        <<"to">> => 999
    },
    Result = dev_copycat:arweave(Base, Request, #{}),
    ?assert(is_tuple(Result)).
 
arweave_from_current_test() ->
    % Use a small bounded range to avoid hanging
    Request = #{
        <<"from">> => 1001,
        <<"to">> => 1000
    },
    Result = dev_copycat:arweave(#{}, Request, #{}),
    ?assert(is_tuple(Result)).

Engine Architecture

Module Pattern

Each data source has its own engine module:

-module(dev_copycat_[ENGINE]).
-export([[engine_name]/3]).
 
[engine_name](Base, Request, Opts) ->
    % Engine-specific implementation
    {ok, Result}.

Adding New Engines

To add support for a new data source:

  1. Create dev_copycat_[engine].erl
  2. Implement [engine]/3 function
  3. Add routing function to dev_copycat.erl:
    [engine](Base, Request, Opts) ->
        dev_copycat_[engine]:[engine](Base, Request, Opts).

Common Patterns

%% Index from GraphQL with custom query
Result = dev_copycat:graphql(
    #{},
    #{
        <<"query">> => <<"
            query($after: String) {
                transactions(
                    first: 100,
                    after: $after,
                    tags: [{name: \"App-Name\", values: [\"MyApp\"]}]
                ) {
                    edges {
                        node { id owner { address } tags { name value } }
                        cursor
                    }
                    pageInfo { hasNextPage }
                }
            }
        ">>,
        <<"variables">> => #{}
    },
    #{<<"node">> => <<"https://arweave.net">>}
).
% Returns: {ok, TotalIndexed}
 
%% Index by tag filter (auto-generates query)
dev_copycat:graphql(
    #{},
    #{
        <<"tag">> => <<"App-Name">>,
        <<"value">> => <<"MyApp">>
    },
    #{}
).
 
%% Index by owner
dev_copycat:graphql(
    #{},
    #{<<"owner">> => <<"wallet-address-here">>},
    #{}
).
 
%% Fetch Arweave blocks in a range
dev_copycat:arweave(
    #{},
    #{
        <<"from">> => 1500000,
        <<"to">> => 1499000
    },
    #{}
).
% Returns: {ok, FinalHeight}
 
%% Fetch all blocks from current to target
dev_copycat:arweave(
    #{},
    #{<<"to">> => 1000000},
    #{}
).
 
%% Fetch recent blocks (from current)
dev_copycat:arweave(#{}, #{}, #{}).
% Fetches from latest block down to Genesis

Use Cases

1. Initial Node Sync

Populate a new HyperBEAM node with historical data:

% Start from current block, index backwards
dev_copycat:arweave(#{}, #{}, #{}).

2. Process-Specific Indexing

Index only messages for a specific process:

dev_copycat:graphql(
    #{},
    #{
        <<"tag">> => <<"Process">>,
        <<"value">> => ProcessID
    },
    #{}
).

3. Owner-Based Indexing

Index all transactions from a specific wallet:

dev_copycat:graphql(
    #{},
    #{<<"owner">> => WalletAddress},
    #{}
).

4. Gap Filling

Fill missing blocks in specific height ranges:

% Fetch specific range
dev_copycat:arweave(
    #{},
    #{
        <<"from">> => MissingEnd,
        <<"to">> => MissingStart
    },
    #{}
).

5. Application-Specific Indexing

Index transactions for a specific application:

dev_copycat:graphql(
    #{},
    #{
        <<"tags">> => [
            #{<<"name">> => <<"App-Name">>, <<"value">> => <<"MyApp">>},
            #{<<"name">> => <<"App-Version">>, <<"value">> => <<"1.0">>}
        ]
    },
    #{}
).

Engine Characteristics

GraphQL Engine (dev_copycat_graphql)

Strengths:
  • Flexible filtering
  • Efficient pagination
  • Tag-based queries
  • Owner/recipient filtering
Limitations:
  • Depends on gateway availability
  • Limited to indexed data
  • Query complexity limits
Best For:
  • Targeted message retrieval
  • Tag-based filtering
  • Recent transaction indexing

Arweave Engine (dev_copycat_arweave)

Strengths:
  • Direct block access
  • Complete data availability
  • No gateway dependency
  • Reliable for historical data
Limitations:
  • Sequential block fetching
  • Slower than GraphQL for filtered queries
  • Network-intensive
Best For:
  • Complete node sync
  • Historical block indexing
  • Gap filling in block ranges

Error Handling

Both engines return standardized error tuples:

{ok, Result}        % Success with result
{error, Reason}     % Failure with reason

Common error scenarios:

  • Network failures
  • Invalid queries
  • Missing blocks
  • Rate limiting
  • Parse errors

Integration with Cache System

The copycat device works closely with HyperBEAM's cache:

  1. Check Cache: Before fetching, checks if data exists
  2. Fetch: Retrieves data from external source
  3. Parse: Converts to HyperBEAM message format
  4. Write: Stores in local cache
  5. Continue: Proceeds to next item

This ensures efficient replication without duplicating existing data.


Performance Considerations

GraphQL

  • Batch size: 100 transactions per request (typical)
  • Pagination: Automatic with cursor-based continuation
  • Rate limits: Depends on gateway

Arweave

  • Sequential: One block at a time
  • Caching: Skips already-indexed blocks
  • Direction: Reverse chronological (newest first)

Monitoring

Both engines emit events for monitoring:

% GraphQL events
?event({graphql_parsed_msgs, Count})
?event({indexer_graphql_wrote, {total, Total}})
 
% Arweave events
?event({arweave_block_cached, {height, Height}})
?event({arweave_block_not_found, {height, Height}})

References

  • GraphQL Engine - dev_copycat_graphql.erl
  • Arweave Engine - dev_copycat_arweave.erl
  • Cache System - hb_cache.erl
  • Gateway Client - hb_gateway_client.erl
  • Arweave Device - ~arweave@2.9-pre

Notes

  1. Orchestrator Pattern: Delegates to specialized engines
  2. Extensible: Easy to add new data sources
  3. Cache-Aware: Avoids duplicate indexing
  4. Event-Driven: Emits events for monitoring
  5. Async-Capable: Can run indexing in background
  6. Gateway Integration: Works with Arweave gateways
  7. Direct Node Access: Can fetch directly from nodes
  8. Flexible Filtering: Multiple query options for GraphQL
  9. Range Support: Block height ranges for Arweave
  10. Automatic Pagination: Handles multi-page GraphQL results
  11. Error Resilient: Continues on individual item failures
  12. Simple API: Two main functions for different sources
  13. Stateless: No persistent state in device itself
  14. Cache-First: Leverages existing cached data
  15. Modular Design: Clean separation of concerns per engine