Skip to content

hb_http_client.erl - HTTP Client Connection Manager

Overview

Purpose: Connection pooling and HTTP client wrapper for Gun/HTTPC
Module: hb_http_client
Behavior: gen_server
Pattern: Connection pool → Gun/HTTPC → Response streaming

This module wraps Gun (and optionally HTTPC) HTTP clients with connection pooling, request management, and Prometheus metrics integration. It maintains persistent connections to peers, handles request/response cycles, and provides detailed performance monitoring.

Origin

This module originates from the Arweave project and has been modified for use in HyperBEAM. The Gun client wrapper provides connection pooling, automatic reconnection, and metrics collection.

Core Responsibilities

  • Connection Pooling: Maintain connections to remote peers
  • Client Abstraction: Support both Gun and HTTPC backends
  • Request Handling: Queue and dispatch HTTP requests
  • Response Streaming: Handle chunked transfer encoding
  • Metrics Collection: Record request durations and status codes
  • Reconnection: Automatic connection reestablishment on failure
  • Monitoring: Optional HTTP monitor callback invocation

Dependencies

  • HyperBEAM: hb_ao, hb_message, hb_util, hb_maps, hb_opts, hb_singleton
  • Rate Limiting: ar_rate_limiter
  • HTTP: gun, httpc
  • Metrics: prometheus, prometheus_cowboy, prometheus_http
  • OTP: gen_server, inet
  • Includes: include/hb.hrl

State Record

-record(state, {
    pid_by_peer = #{},      % Peer -> Connection PID mapping
    status_by_pid = #{},    % PID -> Status mapping
    opts = #{}              % Configuration options
}).

Public Functions Overview

%% Supervisor Interface
-spec start_link(Opts) -> {ok, pid()} | {error, Reason}.
 
%% Request Interface
-spec req(Args, Opts) -> {ok, Status, Headers, Body} | {error, Reason}.
-spec req(Args, ReestablishedConnection, Opts) -> {ok, Status, Headers, Body} | {error, Reason}.

Public Functions

1. start_link/1

-spec start_link(Opts) -> {ok, pid()} | {error, Reason}
    when
        Opts :: map(),
        Reason :: term().

Description: Start HTTP client gen_server under supervision.

Test Code:
-module(hb_http_client_test).
-include_lib("eunit/include/eunit.hrl").
 
start_link_test() ->
    %% Stop if already running
    case whereis(hb_http_client) of
        undefined -> ok;
        ExistingPid -> gen_server:stop(ExistingPid)
    end,
    
    {ok, Pid} = hb_http_client:start_link(#{}),
    ?assert(is_pid(Pid)),
    ?assert(is_process_alive(Pid)),
    gen_server:stop(Pid).

2. req/2, req/3

-spec req(Args, Opts) -> {ok, Status, Headers, Body} | {error, Reason}
    when
        Args :: #{
            peer := binary(),
            path := binary(),
            method := binary(),
            headers := map(),
            body := binary()
        },
        Opts :: map(),
        Status :: integer(),
        Headers :: [{binary(), binary()}],
        Body :: binary(),
        Reason :: term().

Description: Execute HTTP request using Gun or HTTPC backend. Handles connection pooling, retry logic, and metrics recording.

Test Code:
req_httpc_success_test() ->
    %% Start a simple HTTP server
    {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}, {reuseaddr, true}]),
    {ok, Port} = inet:port(ListenSock),
    
    %% Spawn server handler
    spawn(fun() ->
        {ok, Sock} = gen_tcp:accept(ListenSock),
        {ok, _Request} = gen_tcp:recv(Sock, 0, 5000),
        Response = <<"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok">>,
        gen_tcp:send(Sock, Response),
        gen_tcp:close(Sock),
        gen_tcp:close(ListenSock)
    end),
    
    %% Make request
    Args = #{
        peer => iolist_to_binary([<<"http://127.0.0.1:">>, integer_to_binary(Port)]),
        path => <<"/">>,
        method => <<"GET">>,
        headers => #{},
        body => <<>>
    },
    Result = hb_http_client:req(Args, #{http_client => httpc}),
    ?assertMatch({ok, 200, _, <<"ok">>}, Result).
 
req_httpc_connection_refused_test() ->
    %% Test httpc backend - connection to non-existent server
    Args = #{
        peer => <<"http://127.0.0.1:59999">>,
        path => <<"/">>,
        method => <<"GET">>,
        headers => #{},
        body => <<>>
    },
    Result = hb_http_client:req(Args, #{http_client => httpc}),
    ?assertMatch({error, _}, Result).
Args Structure:
Args = #{
    peer => <<"http://example.com">>,
    path => <<"/api/endpoint">>,
    method => <<"GET">> | <<"POST">> | <<"PUT">> | ...,
    headers => #{<<"content-type">> => <<"application/json">>},
    body => <<"request payload">>,
    limit => infinity | Bytes,
    is_peer_request => true | false
}
Client Selection:
req(Args, Opts) ->
    case hb_opts:get(http_client, gun, Opts) of
        gun -> gun_req(Args, false, Opts);
        httpc -> httpc_req(Args, false, Opts)
    end.

HTTP Client Implementations

Gun Client (Default)

gun_req(Args, ReestablishedConnection, Opts) ->
    #{peer := Peer, path := Path, method := Method} = Args,
    {ok, PID} = gen_server:call(?MODULE, {get_connection, Args, Opts}, infinity),
    ar_rate_limiter:throttle(Peer, Path, Opts),
    Response = request(PID, Args, Opts),
    record_duration(Details, Opts),
    Response.
Features:
  • Connection pooling via gen_server
  • Automatic reconnection on failure
  • Rate limiting support
  • Streaming response handling
  • HTTP/2 support (via Gun)
Connection Options:
GunOpts = #{
    retry => 0,
    retry_timeout => 500,
    connect_timeout => 5000,
    http_opts => #{keepalive => infinity},
    protocols => [http],
    transport => tcp | tls
}

HTTPC Client (Fallback)

httpc_req(Args, _, Opts) ->
    #{peer := Peer, path := Path, method := Method, headers := Headers, body := Body} = Args,
    {Host, Port} = parse_peer(Peer, Opts),
    URL = binary_to_list(iolist_to_binary([Scheme, "://", Host, ":", Port, Path])),
    
    Request = case Method of
        get -> {URL, HeaderKV};
        _ -> {URL, HeaderKV, ContentType, Body}
    end,
    
    {ok, {{_, Status, _}, RespHeaders, RespBody}} = 
        httpc:request(Method, Request, [], [{full_result, true}, {body_format, binary}]),
    
    {ok, Status, RespHeaders, RespBody}.
Features:
  • No connection pooling (stateless)
  • Simple request/response model
  • Fallback option for Gun issues
  • Built-in Erlang/OTP client

Connection Management

get_connection Callback

handle_call({get_connection, Args, Opts}, From, State) ->
    #{peer := Peer} = Args,
    case maps:get(Peer, State#state.pid_by_peer, undefined) of
        undefined ->
            % Open new connection
            {ok, PID} = open_connection(Peer, Opts),
            NewState = State#{
                pid_by_peer => maps:put(Peer, PID, State#state.pid_by_peer),
                status_by_pid => maps:put(PID, {up, []}, State#state.status_by_pid)
            },
            {reply, {ok, PID}, NewState};
        PID ->
            % Reuse existing connection
            {reply, {ok, PID}, State}
    end.

Connection Monitoring

handle_info({'DOWN', _Ref, process, PID, Reason}, State) ->
    % Find peer for this PID
    Peer = find_peer_by_pid(PID, State#state.pid_by_peer),
    
    % Reply error to pending requests
    Pending = get_pending_requests(PID, State#state.status_by_pid),
    reply_error(Pending, Reason),
    
    % Remove from state
    NewState = State#{
        pid_by_peer => maps:remove(Peer, State#state.pid_by_peer),
        status_by_pid => maps:remove(PID, State#state.status_by_pid)
    },
    {noreply, NewState}.

Request Execution

request/3

request(PID, Args, Opts) ->
    Timer = inet:start_timer(hb_opts:get(http_request_send_timeout, no_request_send_timeout, Opts)),
    Method = hb_maps:get(method, Args),
    Path = hb_maps:get(path, Args),
    Headers = prepare_headers(hb_maps:get(headers, Args, #{}), Opts),
    Body = hb_maps:get(body, Args, <<>>),
    
    Ref = gun:request(PID, Method, Path, Headers, Body),
    Response = await_response(#{
        pid => PID,
        stream_ref => Ref,
        timer => Timer,
        limit => hb_maps:get(limit, Args, infinity),
        counter => 0,
        acc => [],
        method => Method,
        path => Path
    }, Opts),
    
    inet:stop_timer(Timer),
    Response.

await_response/2

await_response(Args, Opts) ->
    #{pid := PID, stream_ref := Ref, timer := Timer, limit := Limit} = Args,
    case gun:await(PID, Ref, inet:timeout(Timer)) of
        {response, fin, Status, Headers} ->
            {ok, Status, Headers, <<>>};
        
        {response, nofin, Status, Headers} ->
            await_response(Args#{status => Status, headers => Headers}, Opts);
        
        {data, nofin, Data} ->
            % Accumulate data chunk
            case check_limit(Limit, Args, Data) of
                ok -> await_response(Args#{acc := [Args#.acc | Data]}, Opts);
                {error, too_much_data} -> {error, too_much_data}
            end;
        
        {data, fin, Data} ->
            % Final data chunk
            FinData = iolist_to_binary([Args#.acc | Data]),
            {ok, Args#.status, Args#.headers, FinData};
        
        {error, Reason} ->
            gun:cancel(PID, Ref),
            {error, Reason}
    end.
Features:
  • Streaming response handling
  • Data size limiting
  • Timeout management
  • Automatic cleanup on error

Header Processing

Cookie Normalization

prepare_headers(HeaderMap, Opts) ->
    HeadersWithoutCookie = hb_maps:to_list(
        hb_maps:without([<<"cookie">>], HeaderMap, Opts)
    ),
    
    CookieLines = case hb_maps:get(<<"cookie">>, HeaderMap, [], Opts) of
        BinCookieLine when is_binary(BinCookieLine) -> [BinCookieLine];
        CookieLinesList -> CookieLinesList
    end,
    
    CookieHeaders = [{<<"cookie">>, Line} || Line <- CookieLines],
    HeadersWithoutCookie ++ CookieHeaders.
Special Cases:
  • content-type handled separately for HTTPC
  • cookie can be binary or list of binaries
  • Multiple cookie headers supported

Metrics Integration

Prometheus Initialization

init_prometheus(Opts) ->
    % Request duration histogram
    prometheus_histogram:declare([
        {name, http_request_duration_seconds},
        {help, "HTTP request duration in seconds"},
        {labels, [request_method, request_path, status_class]},
        {buckets, [0.1, 0.5, 1, 2, 5, 10, 30]}
    ]),
    
    % Request counters
    prometheus_counter:declare([
        {name, gun_requests_total},
        {help, "Total Gun HTTP requests"},
        {labels, [method, path, status_class]}
    ]),
    
    % Upload/download bytes
    prometheus_counter:declare([
        {name, http_client_uploaded_bytes_total},
        {help, "Total bytes uploaded"},
        {labels, [path]}
    ]),
    
    prometheus_counter:declare([
        {name, http_client_downloaded_bytes_total},
        {help, "Total bytes downloaded"},
        {labels, [path]}
    ]).

Duration Recording

record_duration(Details, Opts) ->
    spawn(fun() ->
        % Record to Prometheus
        case application:get_application(prometheus) of
            undefined -> ok;
            _ ->
                prometheus_histogram:observe(
                    http_request_duration_seconds,
                    [
                        hb_util:list(maps:get(<<"request-method">>, Details)),
                        hb_util:list(maps:get(<<"request-path">>, Details)),
                        hb_util:list(maps:get(<<"status-class">>, Details))
                    ],
                    maps:get(<<"duration">>, Details)
                )
        end,
        
        % Invoke HTTP monitor if configured
        maybe_invoke_monitor(Details#{<<"path">> => <<"duration">>}, Opts)
    end).

HTTP Monitoring

maybe_invoke_monitor(Details, Opts) ->
    case hb_ao:get(<<"http_monitor">>, Opts, Opts) of
        not_found -> ok;
        Monitor ->
            MaybeWithReference = case hb_ao:get(<<"http_reference">>, Opts, Opts) of
                not_found -> Details;
                Ref -> Details#{<<"reference">> => Ref}
            end,
            
            Req = Monitor#{
                <<"body">> => hb_message:commit(
                    MaybeWithReference#{<<"method">> => <<"POST">>},
                    Opts
                )
            },
            
            ReqMsgs = hb_singleton:from(Req, Opts),
            hb_ao:resolve_many(ReqMsgs, Opts)
    end.
Monitor Message Structure:
#{
    <<"request-method">> => <<"POST">>,
    <<"request-path">> => <<"/api/endpoint">>,
    <<"status-class">> => <<"2xx">>,
    <<"duration">> => 150,  % milliseconds
    <<"reference">> => <<"custom-ref">>  % optional
}

Status Classification

get_status_class/1

get_status_class({ok, {{Status, _}, _, _, _, _}}) ->
    get_status_class(Status);
get_status_class({error, connection_closed}) -> <<"connection_closed">>;
get_status_class({error, connect_timeout}) -> <<"connect_timeout">>;
get_status_class({error, timeout}) -> <<"timeout">>;
get_status_class({error, {shutdown, timeout}}) -> <<"shutdown_timeout">>;
get_status_class({error, econnrefused}) -> <<"econnrefused">>;
get_status_class({error, {shutdown, econnrefused}}) -> <<"shutdown_econnrefused">>;
get_status_class({error, {shutdown, ehostunreach}}) -> <<"shutdown_ehostunreach">>;
get_status_class({error, {shutdown, normal}}) -> <<"shutdown_normal">>;
get_status_class({error, {closed, _}}) -> <<"closed">>;
get_status_class({error, noproc}) -> <<"noproc">>;
get_status_class(208) -> <<"already_processed">>;
get_status_class(Status) when is_integer(Status), Status > 0 ->
    hb_util:bin(prometheus_http:status_class(Status)).
Status Classes:
  • 1xx - Informational
  • 2xx - Success
  • 3xx - Redirection
  • 4xx - Client Error
  • 5xx - Server Error
  • Custom error classifications for network issues

Method Conversion

method_to_bin(get) -> <<"GET">>;
method_to_bin(post) -> <<"POST">>;
method_to_bin(put) -> <<"PUT">>;
method_to_bin(head) -> <<"HEAD">>;
method_to_bin(delete) -> <<"DELETE">>;
method_to_bin(connect) -> <<"CONNECT">>;
method_to_bin(options) -> <<"OPTIONS">>;
method_to_bin(trace) -> <<"TRACE">>;
method_to_bin(patch) -> <<"PATCH">>;
method_to_bin(_) -> <<"unknown">>.

Peer Parsing

parse_peer(Peer, Opts) ->
    Parsed = uri_string:parse(Peer),
    case Parsed of
        #{host := Host, port := Port} ->
            {hb_util:list(Host), Port};
        URI = #{host := Host} ->
            {
                hb_util:list(Host),
                case hb_maps:get(scheme, URI, undefined, Opts) of
                    <<"https">> -> 443;
                    _ -> hb_opts:get(port, 8734, Opts)
                end
            }
    end.
URI Formats Supported:
  • http://example.com:8080{"example.com", 8080}
  • https://example.com{"example.com", 443}
  • example.com{"example.com", 8734} (default port)

Common Patterns

%% Basic request with Gun
Args = #{
    peer => <<"http://example.com">>,
    path => <<"/api/data">>,
    method => <<"GET">>,
    headers => #{},
    body => <<>>
},
{ok, Status, Headers, Body} = hb_http_client:req(Args, #{}).
 
%% POST with JSON body
Args = #{
    peer => <<"http://api.example.com">>,
    path => <<"/v1/endpoint">>,
    method => <<"POST">>,
    headers => #{
        <<"content-type">> => <<"application/json">>
    },
    body => <<"{\"key\":\"value\"}">>
},
{ok, 200, Headers, Response} = hb_http_client:req(Args, #{}).
 
%% Request with size limit
Args = #{
    peer => <<"http://example.com">>,
    path => <<"/large-file">>,
    method => <<"GET">>,
    headers => #{},
    body => <<>>,
    limit => 1048576  % 1MB limit
},
case hb_http_client:req(Args, #{}) of
    {ok, _, _, Data} -> process(Data);
    {error, too_much_data} -> handle_oversized()
end.
 
%% Use HTTPC instead of Gun
Opts = #{http_client => httpc},
{ok, Status, Headers, Body} = hb_http_client:req(Args, Opts).
 
%% With HTTP monitor
Opts = #{
    http_monitor => #{
        <<"device">> => <<"logger@1.0">>,
        <<"path">> => <<"/log">>
    },
    http_reference => <<"request-123">>
},
{ok, _, _, _} = hb_http_client:req(Args, Opts).

Configuration Options

Opts = #{
    % Client selection
    http_client => gun | httpc,  % Default: gun
    
    % Timeouts
    http_request_send_timeout => Milliseconds,  % Request timeout
    connect_timeout => 5000,  % Connection timeout
    
    % Size limits
    limit => Bytes | infinity,  % Response size limit
    
    % Network
    port => 8734,  % Default port when not specified
    
    % Monitoring
    http_monitor => MonitorMessage,  % HTTP monitor configuration
    http_reference => Reference,  % Custom reference for monitoring
    
    % Metrics
    prometheus => true | false,  % Enable Prometheus (default: not test mode)
    
    % Connection
    http_opts => #{keepalive => infinity},
    retry => 0,
    retry_timeout => 500
}.

Error Handling

% Successful responses
{ok, Status, Headers, Body}
 
% Network errors
{error, connection_closed}
{error, connect_timeout}
{error, timeout}
{error, econnrefused}
 
% Shutdown errors
{error, {shutdown, normal}}
{error, {shutdown, timeout}}
{error, {shutdown, econnrefused}}
{error, {shutdown, ehostunreach}}
 
% Process errors
{error, noproc}
{error, client_error}
 
% Data size errors
{error, too_much_data}

Performance Features

  1. Connection Pooling: Reuse TCP connections across requests
  2. Parallel Requests: Multiple concurrent requests per connection
  3. Streaming: Efficient memory usage for large responses
  4. Rate Limiting: Integrated with ar_rate_limiter
  5. Metrics: Detailed Prometheus instrumentation
  6. Reconnection: Automatic on connection failure
  7. Timeout Management: Per-request timeout control

Metrics Collected

Histograms

  • http_request_duration_seconds - Request duration by method, path, status class

Counters

  • gun_requests_total - Total requests by method, path, status class
  • http_client_uploaded_bytes_total - Bytes uploaded by path
  • http_client_downloaded_bytes_total - Bytes downloaded by path

Labels

  • request_method - HTTP method (GET, POST, etc.)
  • request_path - Request path
  • status_class - Response status classification
  • method - Method atom (for Gun requests)
  • path - Path string

References

  • HTTP Core - hb_http.erl
  • Multi-Request - hb_http_multi.erl
  • Supervisor - hb_http_client_sup.erl
  • Rate Limiting - ar_rate_limiter.erl
  • Gun Client - Gun HTTP/2 client

Notes

  1. Origin: Adapted from Arweave project
  2. Default Client: Gun is preferred over HTTPC
  3. Connection Pool: Maintained per peer in gen_server state
  4. Reconnection: Automatic with one retry attempt
  5. Streaming: Gun provides efficient chunked transfer encoding
  6. Metrics: Prometheus integration optional (disabled in tests)
  7. Monitor Callback: Async invocation in separate process
  8. HTTP/2: Supported via Gun (not HTTPC)
  9. Size Limits: Enforced during streaming to prevent memory exhaustion
  10. Cookie Handling: Supports multiple cookie header lines
  11. Timeout: Per-request timer with configurable duration
  12. Error Classification: Detailed status classes for monitoring
  13. Process Monitoring: Automatic cleanup on connection death
  14. Rate Limiting: Throttling before request execution
  15. Async Metrics: Recorded in spawned process to avoid blocking