dev_copycat_graphql.erl - GraphQL Indexing Engine
Overview
Purpose: Fetch data from GraphQL endpoints for replication
Module: dev_copycat_graphql
Device: ~copycat@1.0 (engine)
Pattern: Query → Parse → Cache → Paginate
This engine fetches data from GraphQL endpoints (typically Arweave gateways) for replication. It supports flexible filtering, automatic pagination, and batch processing of results.
Supported Filters
query- Direct GraphQL query stringtag/tags- Tag-based filteringowner- Owner address filterrecipient- Recipient address filterall- All transactions
Dependencies
- HyperBEAM:
hb_gateway_client,hb_cache,hb_util,hb_maps - Testing:
eunit - Includes:
include/hb.hrl
Public Functions Overview
-spec graphql(Base, Req, Opts) -> {ok, TotalIndexed} | {error, Reason}.Public Functions
graphql/3
-spec graphql(Base, Req, Opts) -> {ok, TotalIndexed} | {error, Reason}
when
Base :: map(),
Req :: map(),
Opts :: map(),
TotalIndexed :: integer(),
Reason :: term().Description: Takes a GraphQL query and iterates through messages returned, indexing them into the node's caches. Handles pagination automatically.
Request Parameters:<<"query">>- Raw GraphQL query string or query parameters map<<"tag">>+<<"value">>- Single tag filter<<"tags">>- Multiple tag filters (map)<<"owner">>- Owner address filter<<"recipient">>- Recipient address filter<<"all">>- Fetch all transactions<<"operationName">>- GraphQL operation name<<"variables">>- Query variables map
<<"node">>- Gateway URL (default: from options)
-module(dev_copycat_graphql_test).
-include_lib("eunit/include/eunit.hrl").
graphql_custom_query_test() ->
Query = <<"query { transactions { edges { node { id } } } }">>,
Req = #{<<"query">> => Query},
Opts = #{<<"node">> => <<"https://arweave.net">>},
Result = dev_copycat_graphql:graphql(#{}, Req, Opts),
?assert(is_tuple(Result)).
graphql_tag_filter_test() ->
Req = #{
<<"tag">> => <<"App-Name">>,
<<"value">> => <<"MyApp">>
},
Result = dev_copycat_graphql:graphql(#{}, Req, #{}),
?assert(is_tuple(Result)).Internal Functions
parse_query/3
-spec parse_query(Base, Req, Opts) -> {ok, Query} | {error, Reason}
when
Base :: map(),
Req :: map(),
Opts :: map(),
Query :: binary(),
Reason :: term().Description: Find or create a GraphQL query from base and request. Merges base and request maps, then determines which filter to use based on available keys.
Supported Filters:-define(SUPPORTED_FILTERS,
[<<"query">>, <<"tag">>, <<"owner">>, <<"recipient">>, <<"all">>]
).query- Direct GraphQL query (highest priority)tag- Single tag filterowner- Owner filterrecipient- Recipient filterall- All transactions
- Map: Treated as
tagsfilter, generates tag query - Binary: Used as raw GraphQL query
index_graphql/6
-spec index_graphql(Total, Query, Vars, Node, OpName, Opts) ->
{ok, FinalTotal} | {error, Reason}
when
Total :: integer(),
Query :: binary(),
Vars :: map(),
Node :: binary() | undefined,
OpName :: binary() | undefined,
Opts :: map(),
FinalTotal :: integer(),
Reason :: term().Description: Recursively index GraphQL query results with automatic pagination.
Process:- Execute GraphQL query via
hb_gateway_client:query/5 - Extract transaction nodes from response
- Parse each node to HyperBEAM message format
- Write parsed messages to cache
- Check for next page (
pageInfo/hasNextPage) - If has next page, recurse with new cursor
- Return total indexed count
Note: This function is internal and not exported. Testing is done through the main graphql/3 function.
default_query/3
-spec default_query(FilterType, Data, Opts) -> {ok, Query}
when
FilterType :: binary(),
Data :: term(),
Opts :: map(),
Query :: binary().Description: Generate a default GraphQL query for a given filter type.
Filter Type → Query Generation: Tags (Map):% Input
#{<<"App-Name">> => <<"MyApp">>, <<"Version">> => <<"1.0">>}
% Generated Query
<<"query($after: String) {
transactions(after: $after, tags: [
{name: \"App-Name\", values: [\"MyApp\"]},
{name: \"Version\", values: [\"1.0\"]}
]) {
edges { ... }
pageInfo { hasNextPage }
}
}">>% Input
{<<"App-Name">>, <<"MyApp">>}
% Generated Query
<<"query($after: String) {
transactions(after: $after, tags: [
{name: \"App-Name\", values: [\"MyApp\"]}
]) {
edges { ... }
pageInfo { hasNextPage }
}
}">>% Generated Query
<<"query($after: String) {
transactions(after: $after, owner: \"ADDRESS\") {
edges { ... }
pageInfo { hasNextPage }
}
}">>% Generated Query
<<"query($after: String) {
transactions(after: $after, recipients: [\"ADDRESS\"]) {
edges { ... }
pageInfo { hasNextPage }
}
}">>% Generated Query
<<"query($after: String) {
transactions(after: $after) {
edges { ... }
pageInfo { hasNextPage }
}
}">>Note: All queries use hb_gateway_client:item_spec() for the edge node specification.
Batch Processing
Per-Request Flow
1. Execute Query (default 100 items)
↓
2. Extract Transaction Nodes
↓
3. Parse to HyperBEAM Messages
↓
4. Filter Parse Failures
↓
5. Write to Cache
↓
6. Filter Write Failures
↓
7. Track Success/Failure Counts
↓
8. Check Pagination
↓
9. Recurse with Cursor or CompleteError Handling
Parse Errors:try
{ok, ParsedMsg} =
hb_gateway_client:result_to_message(Struct, Opts),
{true, ParsedMsg}
catch
error:Reason ->
?event(warning,
{indexer_graphql_parse_failed,
{struct, NodeStruct},
{reason, Reason}
}
),
false % Skip this item
endtry
{ok, _} = hb_cache:write(ParsedMsg, Opts),
true
catch
error:Reason ->
?event(warning,
{indexer_graphql_write_failed,
{reason, Reason},
{msg, ParsedMsg}
}
),
false % Skip this item
endPagination
Automatic Continuation
% Extract pagination info
HasNextPage = hb_util:deep_get(<<"pageInfo/hasNextPage">>, Res, false, Opts),
case HasNextPage of
true ->
% Get last cursor from results
{ok, Cursor} =
hb_maps:find(
<<"cursor">>,
lists:last(NodeStructs),
Opts
),
% Recurse with new cursor
index_graphql(
NewTotal,
Query,
Vars#{<<"after">> => Cursor},
Node,
OpName,
Opts
);
false ->
% No more pages
{ok, NewTotal}
endCommon Patterns
%% Query by application tag
dev_copycat_graphql:graphql(
#{},
#{
<<"tag">> => <<"App-Name">>,
<<"value">> => <<"ArConnect">>
},
#{<<"node">> => <<"https://arweave.net">>}
).
%% Query by multiple tags (as map)
dev_copycat_graphql:graphql(
#{},
#{
<<"tags">> => #{
<<"App-Name">> => <<"MyApp">>,
<<"Version">> => <<"1.0">>,
<<"Type">> => <<"Message">>
}
},
#{}
).
%% Query by owner
dev_copycat_graphql:graphql(
#{},
#{<<"owner">> => <<"wallet-address-here">>},
#{}
).
%% Query by recipient
dev_copycat_graphql:graphql(
#{},
#{<<"recipient">> => <<"recipient-address">>},
#{}
).
%% Custom query with variables
dev_copycat_graphql:graphql(
#{},
#{
<<"query">> => <<"
query($after: String, $appName: String!) {
transactions(
first: 100,
after: $after,
tags: [{name: \"App-Name\", values: [$appName]}]
) {
edges {
node { id tags { name value } }
cursor
}
pageInfo { hasNextPage }
}
}
">>,
<<"variables">> => #{<<"appName">> => <<"MyApp">>},
<<"operationName">> => <<"GetAppTransactions">>
},
#{}
).
%% All transactions (no filter)
dev_copycat_graphql:graphql(
#{},
#{<<"all">> => true},
#{}
).
%% Query from Base and Request merge
dev_copycat_graphql:graphql(
#{<<"tag">> => <<"App-Name">>},
#{<<"value">> => <<"MyApp">>},
#{}
).
%% Via copycat device
dev_copycat:graphql(
#{},
#{
<<"tag">> => <<"Process">>,
<<"value">> => ProcessID
},
#{}
).Event Monitoring
Events Emitted
% Query execution
?event({graphql_run_called,
{query, {string, Query}},
{operation, OpName},
{variables, Vars}
})
% Response received
?event({graphql_request_returned_items, Count})
% Indexing progress
?event({graphql_indexing_responses,
{query, {string, Query}},
{variables, Vars},
{result, Res}
})
% Messages parsed
?event({graphql_parsed_msgs, Count})
% Write progress
?event(copycat_short,
{indexer_graphql_wrote,
{total, Total},
{batch, BatchSize},
{batch_failures, Failures}
}
)
% Parse failure (warning)
?event(warning,
{indexer_graphql_parse_failed,
{struct, NodeStruct},
{reason, Reason}
}
)
% Write failure (warning)
?event(warning,
{indexer_graphql_write_failed,
{reason, Reason},
{msg, ParsedMsg}
}
)
% Tags query generation
?event({tags_query,
{message, Message},
{binary_pairs, BinaryPairs},
{tags_query_str, {string, TagsQueryStr}}
})Performance Characteristics
Batch Size
- Depends on GraphQL query
firstparameter - Gateway Client uses default from
item_spec() - Typical: 100 transactions per request
Pagination
- Cursor-based (efficient)
- Automatic continuation
- Accumulates total count
Parallel Processing
- Sequential batch processing
- Individual item error handling
- Continues on failures
- No inter-request parallelization
Gateway Integration
hb_gateway_client Integration
Execute Query:{ok, RawRes} = hb_gateway_client:query(
Query,
Variables,
Node,
OperationName,
Opts
){ok, ParsedMsg} = hb_gateway_client:result_to_message(
Struct,
Opts
)Item Specification:
All generated queries use hb_gateway_client:item_spec() for consistent edge node structure.
Response Structure
Expected GraphQL Response
{
"data": {
"transactions": {
"edges": [
{
"node": {
"id": "...",
"owner": { "address": "..." },
"tags": [
{ "name": "...", "value": "..." }
]
},
"cursor": "..."
}
],
"pageInfo": {
"hasNextPage": true
}
}
}
}Data Extraction
% Get transactions object
Res = hb_util:deep_get(<<"data/transactions">>, RawRes, #{}, Opts)
% Get edges array
NodeStructs = hb_util:deep_get(<<"edges">>, Res, [], Opts)
% Get pagination info
HasNextPage = hb_util:deep_get(<<"pageInfo/hasNextPage">>, Res, false, Opts)
% Get cursor from last item
{ok, Cursor} = hb_maps:find(<<"cursor">>, lists:last(NodeStructs), Opts)Use Cases
1. Application-Specific Indexing
dev_copycat_graphql:graphql(
#{},
#{
<<"tag">> => <<"App-Name">>,
<<"value">> => <<"ArConnect">>
},
#{}
).2. Process Message Indexing
dev_copycat_graphql:graphql(
#{},
#{
<<"tags">> => #{
<<"Process">> => ProcessID,
<<"Type">> => <<"Message">>
}
},
#{}
).3. Wallet Transaction History
dev_copycat_graphql:graphql(
#{},
#{<<"owner">> => WalletAddress},
#{}
).4. Recipient Indexing
dev_copycat_graphql:graphql(
#{},
#{<<"recipient">> => RecipientAddress},
#{}
).5. Complete Gateway Sync
% Warning: This indexes ALL transactions from gateway
dev_copycat_graphql:graphql(
#{},
#{<<"all">> => true},
#{}
).Error Scenarios
Missing Query
{error, #{
<<"body">> => <<"No query found in the request.">>
}}No Supported Filter
{error, #{
<<"body">> => <<"No supported filter fields found. Supported filters: \"query\", \"tag\", \"owner\", \"recipient\", \"all\"">>
}}GraphQL Query Error
{error, Reason} % From hb_gateway_client:query/5Comparison with Arweave Engine
GraphQL Engine Advantages
- Flexible filtering
- Efficient for targeted queries
- Tag-based search
- Owner/recipient filtering
- Fast for recent transactions
GraphQL Engine Disadvantages
- Depends on gateway availability
- Limited to indexed data
- Query complexity limits
- May miss unindexed transactions
When to Use
- Targeted message retrieval
- Tag-based filtering
- Recent transaction indexing
- Application-specific data
- Owner/recipient queries
References
- Copycat Device -
dev_copycat.erl - Gateway Client -
hb_gateway_client.erl - Cache System -
hb_cache.erl - Utilities -
hb_util.erl,hb_maps.erl
Notes
- Flexible Querying: Multiple filter types supported
- Auto-Pagination: Handles cursor-based pagination automatically
- Batch Processing: Processes results in batches
- Error Resilient: Continues on individual item failures
- Event-Driven: Comprehensive event emission
- Gateway-Based: Works with Arweave gateways
- Cache Integration: Direct cache writes
- Parse Robustness: Catches and logs parse errors
- Write Robustness: Catches and logs write errors
- Progress Tracking: Total count accumulation
- Custom Queries: Supports raw GraphQL
- Default Queries: Auto-generates for common filters
- Variable Support: GraphQL variables for dynamic queries
- Operation Naming: Supports named operations
- Completion Detection: Returns when no more pages