Skip to content

dev_copycat_graphql.erl - GraphQL Indexing Engine

Overview

Purpose: Fetch data from GraphQL endpoints for replication
Module: dev_copycat_graphql
Device: ~copycat@1.0 (engine)
Pattern: Query → Parse → Cache → Paginate

This engine fetches data from GraphQL endpoints (typically Arweave gateways) for replication. It supports flexible filtering, automatic pagination, and batch processing of results.

Supported Filters

  • query - Direct GraphQL query string
  • tag / tags - Tag-based filtering
  • owner - Owner address filter
  • recipient - Recipient address filter
  • all - All transactions

Dependencies

  • HyperBEAM: hb_gateway_client, hb_cache, hb_util, hb_maps
  • Testing: eunit
  • Includes: include/hb.hrl

Public Functions Overview

-spec graphql(Base, Req, Opts) -> {ok, TotalIndexed} | {error, Reason}.

Public Functions

graphql/3

-spec graphql(Base, Req, Opts) -> {ok, TotalIndexed} | {error, Reason}
    when
        Base :: map(),
        Req :: map(),
        Opts :: map(),
        TotalIndexed :: integer(),
        Reason :: term().

Description: Takes a GraphQL query and iterates through messages returned, indexing them into the node's caches. Handles pagination automatically.

Request Parameters:
  • <<"query">> - Raw GraphQL query string or query parameters map
  • <<"tag">> + <<"value">> - Single tag filter
  • <<"tags">> - Multiple tag filters (map)
  • <<"owner">> - Owner address filter
  • <<"recipient">> - Recipient address filter
  • <<"all">> - Fetch all transactions
  • <<"operationName">> - GraphQL operation name
  • <<"variables">> - Query variables map
Options:
  • <<"node">> - Gateway URL (default: from options)
Test Code:
-module(dev_copycat_graphql_test).
-include_lib("eunit/include/eunit.hrl").
 
graphql_custom_query_test() ->
    Query = <<"query { transactions { edges { node { id } } } }">>,
    Req = #{<<"query">> => Query},
    Opts = #{<<"node">> => <<"https://arweave.net">>},
    Result = dev_copycat_graphql:graphql(#{}, Req, Opts),
    ?assert(is_tuple(Result)).
 
graphql_tag_filter_test() ->
    Req = #{
        <<"tag">> => <<"App-Name">>,
        <<"value">> => <<"MyApp">>
    },
    Result = dev_copycat_graphql:graphql(#{}, Req, #{}),
    ?assert(is_tuple(Result)).

Internal Functions

parse_query/3

-spec parse_query(Base, Req, Opts) -> {ok, Query} | {error, Reason}
    when
        Base :: map(),
        Req :: map(),
        Opts :: map(),
        Query :: binary(),
        Reason :: term().

Description: Find or create a GraphQL query from base and request. Merges base and request maps, then determines which filter to use based on available keys.

Supported Filters:
-define(SUPPORTED_FILTERS,
    [<<"query">>, <<"tag">>, <<"owner">>, <<"recipient">>, <<"all">>]
).
Priority Order:
  1. query - Direct GraphQL query (highest priority)
  2. tag - Single tag filter
  3. owner - Owner filter
  4. recipient - Recipient filter
  5. all - All transactions
Query Field Types:
  • Map: Treated as tags filter, generates tag query
  • Binary: Used as raw GraphQL query

index_graphql/6

-spec index_graphql(Total, Query, Vars, Node, OpName, Opts) -> 
    {ok, FinalTotal} | {error, Reason}
    when
        Total :: integer(),
        Query :: binary(),
        Vars :: map(),
        Node :: binary() | undefined,
        OpName :: binary() | undefined,
        Opts :: map(),
        FinalTotal :: integer(),
        Reason :: term().

Description: Recursively index GraphQL query results with automatic pagination.

Process:
  1. Execute GraphQL query via hb_gateway_client:query/5
  2. Extract transaction nodes from response
  3. Parse each node to HyperBEAM message format
  4. Write parsed messages to cache
  5. Check for next page (pageInfo/hasNextPage)
  6. If has next page, recurse with new cursor
  7. Return total indexed count

Note: This function is internal and not exported. Testing is done through the main graphql/3 function.


default_query/3

-spec default_query(FilterType, Data, Opts) -> {ok, Query}
    when
        FilterType :: binary(),
        Data :: term(),
        Opts :: map(),
        Query :: binary().

Description: Generate a default GraphQL query for a given filter type.

Filter Type → Query Generation: Tags (Map):
% Input
#{<<"App-Name">> => <<"MyApp">>, <<"Version">> => <<"1.0">>}
 
% Generated Query
<<"query($after: String) {
    transactions(after: $after, tags: [
        {name: \"App-Name\", values: [\"MyApp\"]},
        {name: \"Version\", values: [\"1.0\"]}
    ]) {
        edges { ... }
        pageInfo { hasNextPage }
    }
}">>
Tag (Single):
% Input
{<<"App-Name">>, <<"MyApp">>}
 
% Generated Query
<<"query($after: String) {
    transactions(after: $after, tags: [
        {name: \"App-Name\", values: [\"MyApp\"]}
    ]) {
        edges { ... }
        pageInfo { hasNextPage }
    }
}">>
Owner:
% Generated Query
<<"query($after: String) {
    transactions(after: $after, owner: \"ADDRESS\") {
        edges { ... }
        pageInfo { hasNextPage }
    }
}">>
Recipient:
% Generated Query
<<"query($after: String) {
    transactions(after: $after, recipients: [\"ADDRESS\"]) {
        edges { ... }
        pageInfo { hasNextPage }
    }
}">>
All:
% Generated Query
<<"query($after: String) {
    transactions(after: $after) {
        edges { ... }
        pageInfo { hasNextPage }
    }
}">>

Note: All queries use hb_gateway_client:item_spec() for the edge node specification.


Batch Processing

Per-Request Flow

1. Execute Query (default 100 items)

2. Extract Transaction Nodes

3. Parse to HyperBEAM Messages

4. Filter Parse Failures

5. Write to Cache

6. Filter Write Failures

7. Track Success/Failure Counts

8. Check Pagination

9. Recurse with Cursor or Complete

Error Handling

Parse Errors:
try
    {ok, ParsedMsg} =
        hb_gateway_client:result_to_message(Struct, Opts),
    {true, ParsedMsg}
catch
    error:Reason ->
        ?event(warning,
            {indexer_graphql_parse_failed,
                {struct, NodeStruct},
                {reason, Reason}
            }
        ),
        false  % Skip this item
end
Write Errors:
try
    {ok, _} = hb_cache:write(ParsedMsg, Opts),
    true
catch
    error:Reason ->
        ?event(warning,
            {indexer_graphql_write_failed,
                {reason, Reason},
                {msg, ParsedMsg}
            }
        ),
        false  % Skip this item
end

Pagination

Automatic Continuation

% Extract pagination info
HasNextPage = hb_util:deep_get(<<"pageInfo/hasNextPage">>, Res, false, Opts),
 
case HasNextPage of
    true ->
        % Get last cursor from results
        {ok, Cursor} =
            hb_maps:find(
                <<"cursor">>,
                lists:last(NodeStructs),
                Opts
            ),
        % Recurse with new cursor
        index_graphql(
            NewTotal,
            Query,
            Vars#{<<"after">> => Cursor},
            Node,
            OpName,
            Opts
        );
    false ->
        % No more pages
        {ok, NewTotal}
end

Common Patterns

%% Query by application tag
dev_copycat_graphql:graphql(
    #{},
    #{
        <<"tag">> => <<"App-Name">>,
        <<"value">> => <<"ArConnect">>
    },
    #{<<"node">> => <<"https://arweave.net">>}
).
 
%% Query by multiple tags (as map)
dev_copycat_graphql:graphql(
    #{},
    #{
        <<"tags">> => #{
            <<"App-Name">> => <<"MyApp">>,
            <<"Version">> => <<"1.0">>,
            <<"Type">> => <<"Message">>
        }
    },
    #{}
).
 
%% Query by owner
dev_copycat_graphql:graphql(
    #{},
    #{<<"owner">> => <<"wallet-address-here">>},
    #{}
).
 
%% Query by recipient
dev_copycat_graphql:graphql(
    #{},
    #{<<"recipient">> => <<"recipient-address">>},
    #{}
).
 
%% Custom query with variables
dev_copycat_graphql:graphql(
    #{},
    #{
        <<"query">> => <<"
            query($after: String, $appName: String!) {
                transactions(
                    first: 100,
                    after: $after,
                    tags: [{name: \"App-Name\", values: [$appName]}]
                ) {
                    edges {
                        node { id tags { name value } }
                        cursor
                    }
                    pageInfo { hasNextPage }
                }
            }
        ">>,
        <<"variables">> => #{<<"appName">> => <<"MyApp">>},
        <<"operationName">> => <<"GetAppTransactions">>
    },
    #{}
).
 
%% All transactions (no filter)
dev_copycat_graphql:graphql(
    #{},
    #{<<"all">> => true},
    #{}
).
 
%% Query from Base and Request merge
dev_copycat_graphql:graphql(
    #{<<"tag">> => <<"App-Name">>},
    #{<<"value">> => <<"MyApp">>},
    #{}
).
 
%% Via copycat device
dev_copycat:graphql(
    #{},
    #{
        <<"tag">> => <<"Process">>,
        <<"value">> => ProcessID
    },
    #{}
).

Event Monitoring

Events Emitted

% Query execution
?event({graphql_run_called,
    {query, {string, Query}},
    {operation, OpName},
    {variables, Vars}
})
 
% Response received
?event({graphql_request_returned_items, Count})
 
% Indexing progress
?event({graphql_indexing_responses,
    {query, {string, Query}},
    {variables, Vars},
    {result, Res}
})
 
% Messages parsed
?event({graphql_parsed_msgs, Count})
 
% Write progress
?event(copycat_short,
    {indexer_graphql_wrote,
        {total, Total},
        {batch, BatchSize},
        {batch_failures, Failures}
    }
)
 
% Parse failure (warning)
?event(warning,
    {indexer_graphql_parse_failed,
        {struct, NodeStruct},
        {reason, Reason}
    }
)
 
% Write failure (warning)
?event(warning,
    {indexer_graphql_write_failed,
        {reason, Reason},
        {msg, ParsedMsg}
    }
)
 
% Tags query generation
?event({tags_query,
    {message, Message},
    {binary_pairs, BinaryPairs},
    {tags_query_str, {string, TagsQueryStr}}
})

Performance Characteristics

Batch Size

  • Depends on GraphQL query first parameter
  • Gateway Client uses default from item_spec()
  • Typical: 100 transactions per request

Pagination

  • Cursor-based (efficient)
  • Automatic continuation
  • Accumulates total count

Parallel Processing

  • Sequential batch processing
  • Individual item error handling
  • Continues on failures
  • No inter-request parallelization

Gateway Integration

hb_gateway_client Integration

Execute Query:
{ok, RawRes} = hb_gateway_client:query(
    Query,
    Variables,
    Node,
    OperationName,
    Opts
)
Parse Result:
{ok, ParsedMsg} = hb_gateway_client:result_to_message(
    Struct,
    Opts
)

Item Specification: All generated queries use hb_gateway_client:item_spec() for consistent edge node structure.


Response Structure

Expected GraphQL Response

{
  "data": {
    "transactions": {
      "edges": [
        {
          "node": {
            "id": "...",
            "owner": { "address": "..." },
            "tags": [
              { "name": "...", "value": "..." }
            ]
          },
          "cursor": "..."
        }
      ],
      "pageInfo": {
        "hasNextPage": true
      }
    }
  }
}

Data Extraction

% Get transactions object
Res = hb_util:deep_get(<<"data/transactions">>, RawRes, #{}, Opts)
 
% Get edges array
NodeStructs = hb_util:deep_get(<<"edges">>, Res, [], Opts)
 
% Get pagination info
HasNextPage = hb_util:deep_get(<<"pageInfo/hasNextPage">>, Res, false, Opts)
 
% Get cursor from last item
{ok, Cursor} = hb_maps:find(<<"cursor">>, lists:last(NodeStructs), Opts)

Use Cases

1. Application-Specific Indexing

dev_copycat_graphql:graphql(
    #{},
    #{
        <<"tag">> => <<"App-Name">>,
        <<"value">> => <<"ArConnect">>
    },
    #{}
).

2. Process Message Indexing

dev_copycat_graphql:graphql(
    #{},
    #{
        <<"tags">> => #{
            <<"Process">> => ProcessID,
            <<"Type">> => <<"Message">>
        }
    },
    #{}
).

3. Wallet Transaction History

dev_copycat_graphql:graphql(
    #{},
    #{<<"owner">> => WalletAddress},
    #{}
).

4. Recipient Indexing

dev_copycat_graphql:graphql(
    #{},
    #{<<"recipient">> => RecipientAddress},
    #{}
).

5. Complete Gateway Sync

% Warning: This indexes ALL transactions from gateway
dev_copycat_graphql:graphql(
    #{},
    #{<<"all">> => true},
    #{}
).

Error Scenarios

Missing Query

{error, #{
    <<"body">> => <<"No query found in the request.">>
}}

No Supported Filter

{error, #{
    <<"body">> => <<"No supported filter fields found. Supported filters: \"query\", \"tag\", \"owner\", \"recipient\", \"all\"">>
}}

GraphQL Query Error

{error, Reason}  % From hb_gateway_client:query/5

Comparison with Arweave Engine

GraphQL Engine Advantages

  • Flexible filtering
  • Efficient for targeted queries
  • Tag-based search
  • Owner/recipient filtering
  • Fast for recent transactions

GraphQL Engine Disadvantages

  • Depends on gateway availability
  • Limited to indexed data
  • Query complexity limits
  • May miss unindexed transactions

When to Use

  • Targeted message retrieval
  • Tag-based filtering
  • Recent transaction indexing
  • Application-specific data
  • Owner/recipient queries

References

  • Copycat Device - dev_copycat.erl
  • Gateway Client - hb_gateway_client.erl
  • Cache System - hb_cache.erl
  • Utilities - hb_util.erl, hb_maps.erl

Notes

  1. Flexible Querying: Multiple filter types supported
  2. Auto-Pagination: Handles cursor-based pagination automatically
  3. Batch Processing: Processes results in batches
  4. Error Resilient: Continues on individual item failures
  5. Event-Driven: Comprehensive event emission
  6. Gateway-Based: Works with Arweave gateways
  7. Cache Integration: Direct cache writes
  8. Parse Robustness: Catches and logs parse errors
  9. Write Robustness: Catches and logs write errors
  10. Progress Tracking: Total count accumulation
  11. Custom Queries: Supports raw GraphQL
  12. Default Queries: Auto-generates for common filters
  13. Variable Support: GraphQL variables for dynamic queries
  14. Operation Naming: Supports named operations
  15. Completion Detection: Returns when no more pages