hb_http_client.erl - HTTP Client Connection Manager
Overview
Purpose: Connection pooling and HTTP client wrapper for Gun/HTTPC
Module: hb_http_client
Behavior: gen_server
Pattern: Connection pool → Gun/HTTPC → Response streaming
This module wraps Gun (and optionally HTTPC) HTTP clients with connection pooling, request management, and Prometheus metrics integration. It maintains persistent connections to peers, handles request/response cycles, and provides detailed performance monitoring.
Origin
This module originates from the Arweave project and has been modified for use in HyperBEAM. The Gun client wrapper provides connection pooling, automatic reconnection, and metrics collection.
Core Responsibilities
- Connection Pooling: Maintain connections to remote peers
- Client Abstraction: Support both Gun and HTTPC backends
- Request Handling: Queue and dispatch HTTP requests
- Response Streaming: Handle chunked transfer encoding
- Metrics Collection: Record request durations and status codes
- Reconnection: Automatic connection reestablishment on failure
- Monitoring: Optional HTTP monitor callback invocation
Dependencies
- HyperBEAM:
hb_ao,hb_message,hb_util,hb_maps,hb_opts,hb_singleton - Rate Limiting:
ar_rate_limiter - HTTP:
gun,httpc - Metrics:
prometheus,prometheus_cowboy,prometheus_http - OTP:
gen_server,inet - Includes:
include/hb.hrl
State Record
-record(state, {
pid_by_peer = #{}, % Peer -> Connection PID mapping
status_by_pid = #{}, % PID -> Status mapping
opts = #{} % Configuration options
}).Public Functions Overview
%% Supervisor Interface
-spec start_link(Opts) -> {ok, pid()} | {error, Reason}.
%% Request Interface
-spec req(Args, Opts) -> {ok, Status, Headers, Body} | {error, Reason}.
-spec req(Args, ReestablishedConnection, Opts) -> {ok, Status, Headers, Body} | {error, Reason}.Public Functions
1. start_link/1
-spec start_link(Opts) -> {ok, pid()} | {error, Reason}
when
Opts :: map(),
Reason :: term().Description: Start HTTP client gen_server under supervision.
Test Code:-module(hb_http_client_test).
-include_lib("eunit/include/eunit.hrl").
start_link_test() ->
%% Stop if already running
case whereis(hb_http_client) of
undefined -> ok;
ExistingPid -> gen_server:stop(ExistingPid)
end,
{ok, Pid} = hb_http_client:start_link(#{}),
?assert(is_pid(Pid)),
?assert(is_process_alive(Pid)),
gen_server:stop(Pid).2. req/2, req/3
-spec req(Args, Opts) -> {ok, Status, Headers, Body} | {error, Reason}
when
Args :: #{
peer := binary(),
path := binary(),
method := binary(),
headers := map(),
body := binary()
},
Opts :: map(),
Status :: integer(),
Headers :: [{binary(), binary()}],
Body :: binary(),
Reason :: term().Description: Execute HTTP request using Gun or HTTPC backend. Handles connection pooling, retry logic, and metrics recording.
Test Code:req_httpc_success_test() ->
%% Start a simple HTTP server
{ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}, {reuseaddr, true}]),
{ok, Port} = inet:port(ListenSock),
%% Spawn server handler
spawn(fun() ->
{ok, Sock} = gen_tcp:accept(ListenSock),
{ok, _Request} = gen_tcp:recv(Sock, 0, 5000),
Response = <<"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok">>,
gen_tcp:send(Sock, Response),
gen_tcp:close(Sock),
gen_tcp:close(ListenSock)
end),
%% Make request
Args = #{
peer => iolist_to_binary([<<"http://127.0.0.1:">>, integer_to_binary(Port)]),
path => <<"/">>,
method => <<"GET">>,
headers => #{},
body => <<>>
},
Result = hb_http_client:req(Args, #{http_client => httpc}),
?assertMatch({ok, 200, _, <<"ok">>}, Result).
req_httpc_connection_refused_test() ->
%% Test httpc backend - connection to non-existent server
Args = #{
peer => <<"http://127.0.0.1:59999">>,
path => <<"/">>,
method => <<"GET">>,
headers => #{},
body => <<>>
},
Result = hb_http_client:req(Args, #{http_client => httpc}),
?assertMatch({error, _}, Result).Args = #{
peer => <<"http://example.com">>,
path => <<"/api/endpoint">>,
method => <<"GET">> | <<"POST">> | <<"PUT">> | ...,
headers => #{<<"content-type">> => <<"application/json">>},
body => <<"request payload">>,
limit => infinity | Bytes,
is_peer_request => true | false
}req(Args, Opts) ->
case hb_opts:get(http_client, gun, Opts) of
gun -> gun_req(Args, false, Opts);
httpc -> httpc_req(Args, false, Opts)
end.HTTP Client Implementations
Gun Client (Default)
gun_req(Args, ReestablishedConnection, Opts) ->
#{peer := Peer, path := Path, method := Method} = Args,
{ok, PID} = gen_server:call(?MODULE, {get_connection, Args, Opts}, infinity),
ar_rate_limiter:throttle(Peer, Path, Opts),
Response = request(PID, Args, Opts),
record_duration(Details, Opts),
Response.- Connection pooling via gen_server
- Automatic reconnection on failure
- Rate limiting support
- Streaming response handling
- HTTP/2 support (via Gun)
GunOpts = #{
retry => 0,
retry_timeout => 500,
connect_timeout => 5000,
http_opts => #{keepalive => infinity},
protocols => [http],
transport => tcp | tls
}HTTPC Client (Fallback)
httpc_req(Args, _, Opts) ->
#{peer := Peer, path := Path, method := Method, headers := Headers, body := Body} = Args,
{Host, Port} = parse_peer(Peer, Opts),
URL = binary_to_list(iolist_to_binary([Scheme, "://", Host, ":", Port, Path])),
Request = case Method of
get -> {URL, HeaderKV};
_ -> {URL, HeaderKV, ContentType, Body}
end,
{ok, {{_, Status, _}, RespHeaders, RespBody}} =
httpc:request(Method, Request, [], [{full_result, true}, {body_format, binary}]),
{ok, Status, RespHeaders, RespBody}.- No connection pooling (stateless)
- Simple request/response model
- Fallback option for Gun issues
- Built-in Erlang/OTP client
Connection Management
get_connection Callback
handle_call({get_connection, Args, Opts}, From, State) ->
#{peer := Peer} = Args,
case maps:get(Peer, State#state.pid_by_peer, undefined) of
undefined ->
% Open new connection
{ok, PID} = open_connection(Peer, Opts),
NewState = State#{
pid_by_peer => maps:put(Peer, PID, State#state.pid_by_peer),
status_by_pid => maps:put(PID, {up, []}, State#state.status_by_pid)
},
{reply, {ok, PID}, NewState};
PID ->
% Reuse existing connection
{reply, {ok, PID}, State}
end.Connection Monitoring
handle_info({'DOWN', _Ref, process, PID, Reason}, State) ->
% Find peer for this PID
Peer = find_peer_by_pid(PID, State#state.pid_by_peer),
% Reply error to pending requests
Pending = get_pending_requests(PID, State#state.status_by_pid),
reply_error(Pending, Reason),
% Remove from state
NewState = State#{
pid_by_peer => maps:remove(Peer, State#state.pid_by_peer),
status_by_pid => maps:remove(PID, State#state.status_by_pid)
},
{noreply, NewState}.Request Execution
request/3
request(PID, Args, Opts) ->
Timer = inet:start_timer(hb_opts:get(http_request_send_timeout, no_request_send_timeout, Opts)),
Method = hb_maps:get(method, Args),
Path = hb_maps:get(path, Args),
Headers = prepare_headers(hb_maps:get(headers, Args, #{}), Opts),
Body = hb_maps:get(body, Args, <<>>),
Ref = gun:request(PID, Method, Path, Headers, Body),
Response = await_response(#{
pid => PID,
stream_ref => Ref,
timer => Timer,
limit => hb_maps:get(limit, Args, infinity),
counter => 0,
acc => [],
method => Method,
path => Path
}, Opts),
inet:stop_timer(Timer),
Response.await_response/2
await_response(Args, Opts) ->
#{pid := PID, stream_ref := Ref, timer := Timer, limit := Limit} = Args,
case gun:await(PID, Ref, inet:timeout(Timer)) of
{response, fin, Status, Headers} ->
{ok, Status, Headers, <<>>};
{response, nofin, Status, Headers} ->
await_response(Args#{status => Status, headers => Headers}, Opts);
{data, nofin, Data} ->
% Accumulate data chunk
case check_limit(Limit, Args, Data) of
ok -> await_response(Args#{acc := [Args#.acc | Data]}, Opts);
{error, too_much_data} -> {error, too_much_data}
end;
{data, fin, Data} ->
% Final data chunk
FinData = iolist_to_binary([Args#.acc | Data]),
{ok, Args#.status, Args#.headers, FinData};
{error, Reason} ->
gun:cancel(PID, Ref),
{error, Reason}
end.- Streaming response handling
- Data size limiting
- Timeout management
- Automatic cleanup on error
Header Processing
Cookie Normalization
prepare_headers(HeaderMap, Opts) ->
HeadersWithoutCookie = hb_maps:to_list(
hb_maps:without([<<"cookie">>], HeaderMap, Opts)
),
CookieLines = case hb_maps:get(<<"cookie">>, HeaderMap, [], Opts) of
BinCookieLine when is_binary(BinCookieLine) -> [BinCookieLine];
CookieLinesList -> CookieLinesList
end,
CookieHeaders = [{<<"cookie">>, Line} || Line <- CookieLines],
HeadersWithoutCookie ++ CookieHeaders.content-typehandled separately for HTTPCcookiecan be binary or list of binaries- Multiple cookie headers supported
Metrics Integration
Prometheus Initialization
init_prometheus(Opts) ->
% Request duration histogram
prometheus_histogram:declare([
{name, http_request_duration_seconds},
{help, "HTTP request duration in seconds"},
{labels, [request_method, request_path, status_class]},
{buckets, [0.1, 0.5, 1, 2, 5, 10, 30]}
]),
% Request counters
prometheus_counter:declare([
{name, gun_requests_total},
{help, "Total Gun HTTP requests"},
{labels, [method, path, status_class]}
]),
% Upload/download bytes
prometheus_counter:declare([
{name, http_client_uploaded_bytes_total},
{help, "Total bytes uploaded"},
{labels, [path]}
]),
prometheus_counter:declare([
{name, http_client_downloaded_bytes_total},
{help, "Total bytes downloaded"},
{labels, [path]}
]).Duration Recording
record_duration(Details, Opts) ->
spawn(fun() ->
% Record to Prometheus
case application:get_application(prometheus) of
undefined -> ok;
_ ->
prometheus_histogram:observe(
http_request_duration_seconds,
[
hb_util:list(maps:get(<<"request-method">>, Details)),
hb_util:list(maps:get(<<"request-path">>, Details)),
hb_util:list(maps:get(<<"status-class">>, Details))
],
maps:get(<<"duration">>, Details)
)
end,
% Invoke HTTP monitor if configured
maybe_invoke_monitor(Details#{<<"path">> => <<"duration">>}, Opts)
end).HTTP Monitoring
maybe_invoke_monitor(Details, Opts) ->
case hb_ao:get(<<"http_monitor">>, Opts, Opts) of
not_found -> ok;
Monitor ->
MaybeWithReference = case hb_ao:get(<<"http_reference">>, Opts, Opts) of
not_found -> Details;
Ref -> Details#{<<"reference">> => Ref}
end,
Req = Monitor#{
<<"body">> => hb_message:commit(
MaybeWithReference#{<<"method">> => <<"POST">>},
Opts
)
},
ReqMsgs = hb_singleton:from(Req, Opts),
hb_ao:resolve_many(ReqMsgs, Opts)
end.#{
<<"request-method">> => <<"POST">>,
<<"request-path">> => <<"/api/endpoint">>,
<<"status-class">> => <<"2xx">>,
<<"duration">> => 150, % milliseconds
<<"reference">> => <<"custom-ref">> % optional
}Status Classification
get_status_class/1
get_status_class({ok, {{Status, _}, _, _, _, _}}) ->
get_status_class(Status);
get_status_class({error, connection_closed}) -> <<"connection_closed">>;
get_status_class({error, connect_timeout}) -> <<"connect_timeout">>;
get_status_class({error, timeout}) -> <<"timeout">>;
get_status_class({error, {shutdown, timeout}}) -> <<"shutdown_timeout">>;
get_status_class({error, econnrefused}) -> <<"econnrefused">>;
get_status_class({error, {shutdown, econnrefused}}) -> <<"shutdown_econnrefused">>;
get_status_class({error, {shutdown, ehostunreach}}) -> <<"shutdown_ehostunreach">>;
get_status_class({error, {shutdown, normal}}) -> <<"shutdown_normal">>;
get_status_class({error, {closed, _}}) -> <<"closed">>;
get_status_class({error, noproc}) -> <<"noproc">>;
get_status_class(208) -> <<"already_processed">>;
get_status_class(Status) when is_integer(Status), Status > 0 ->
hb_util:bin(prometheus_http:status_class(Status)).1xx- Informational2xx- Success3xx- Redirection4xx- Client Error5xx- Server Error- Custom error classifications for network issues
Method Conversion
method_to_bin(get) -> <<"GET">>;
method_to_bin(post) -> <<"POST">>;
method_to_bin(put) -> <<"PUT">>;
method_to_bin(head) -> <<"HEAD">>;
method_to_bin(delete) -> <<"DELETE">>;
method_to_bin(connect) -> <<"CONNECT">>;
method_to_bin(options) -> <<"OPTIONS">>;
method_to_bin(trace) -> <<"TRACE">>;
method_to_bin(patch) -> <<"PATCH">>;
method_to_bin(_) -> <<"unknown">>.Peer Parsing
parse_peer(Peer, Opts) ->
Parsed = uri_string:parse(Peer),
case Parsed of
#{host := Host, port := Port} ->
{hb_util:list(Host), Port};
URI = #{host := Host} ->
{
hb_util:list(Host),
case hb_maps:get(scheme, URI, undefined, Opts) of
<<"https">> -> 443;
_ -> hb_opts:get(port, 8734, Opts)
end
}
end.http://example.com:8080→{"example.com", 8080}https://example.com→{"example.com", 443}example.com→{"example.com", 8734}(default port)
Common Patterns
%% Basic request with Gun
Args = #{
peer => <<"http://example.com">>,
path => <<"/api/data">>,
method => <<"GET">>,
headers => #{},
body => <<>>
},
{ok, Status, Headers, Body} = hb_http_client:req(Args, #{}).
%% POST with JSON body
Args = #{
peer => <<"http://api.example.com">>,
path => <<"/v1/endpoint">>,
method => <<"POST">>,
headers => #{
<<"content-type">> => <<"application/json">>
},
body => <<"{\"key\":\"value\"}">>
},
{ok, 200, Headers, Response} = hb_http_client:req(Args, #{}).
%% Request with size limit
Args = #{
peer => <<"http://example.com">>,
path => <<"/large-file">>,
method => <<"GET">>,
headers => #{},
body => <<>>,
limit => 1048576 % 1MB limit
},
case hb_http_client:req(Args, #{}) of
{ok, _, _, Data} -> process(Data);
{error, too_much_data} -> handle_oversized()
end.
%% Use HTTPC instead of Gun
Opts = #{http_client => httpc},
{ok, Status, Headers, Body} = hb_http_client:req(Args, Opts).
%% With HTTP monitor
Opts = #{
http_monitor => #{
<<"device">> => <<"logger@1.0">>,
<<"path">> => <<"/log">>
},
http_reference => <<"request-123">>
},
{ok, _, _, _} = hb_http_client:req(Args, Opts).Configuration Options
Opts = #{
% Client selection
http_client => gun | httpc, % Default: gun
% Timeouts
http_request_send_timeout => Milliseconds, % Request timeout
connect_timeout => 5000, % Connection timeout
% Size limits
limit => Bytes | infinity, % Response size limit
% Network
port => 8734, % Default port when not specified
% Monitoring
http_monitor => MonitorMessage, % HTTP monitor configuration
http_reference => Reference, % Custom reference for monitoring
% Metrics
prometheus => true | false, % Enable Prometheus (default: not test mode)
% Connection
http_opts => #{keepalive => infinity},
retry => 0,
retry_timeout => 500
}.Error Handling
% Successful responses
{ok, Status, Headers, Body}
% Network errors
{error, connection_closed}
{error, connect_timeout}
{error, timeout}
{error, econnrefused}
% Shutdown errors
{error, {shutdown, normal}}
{error, {shutdown, timeout}}
{error, {shutdown, econnrefused}}
{error, {shutdown, ehostunreach}}
% Process errors
{error, noproc}
{error, client_error}
% Data size errors
{error, too_much_data}Performance Features
- Connection Pooling: Reuse TCP connections across requests
- Parallel Requests: Multiple concurrent requests per connection
- Streaming: Efficient memory usage for large responses
- Rate Limiting: Integrated with
ar_rate_limiter - Metrics: Detailed Prometheus instrumentation
- Reconnection: Automatic on connection failure
- Timeout Management: Per-request timeout control
Metrics Collected
Histograms
http_request_duration_seconds- Request duration by method, path, status class
Counters
gun_requests_total- Total requests by method, path, status classhttp_client_uploaded_bytes_total- Bytes uploaded by pathhttp_client_downloaded_bytes_total- Bytes downloaded by path
Labels
request_method- HTTP method (GET, POST, etc.)request_path- Request pathstatus_class- Response status classificationmethod- Method atom (for Gun requests)path- Path string
References
- HTTP Core -
hb_http.erl - Multi-Request -
hb_http_multi.erl - Supervisor -
hb_http_client_sup.erl - Rate Limiting -
ar_rate_limiter.erl - Gun Client - Gun HTTP/2 client
Notes
- Origin: Adapted from Arweave project
- Default Client: Gun is preferred over HTTPC
- Connection Pool: Maintained per peer in gen_server state
- Reconnection: Automatic with one retry attempt
- Streaming: Gun provides efficient chunked transfer encoding
- Metrics: Prometheus integration optional (disabled in tests)
- Monitor Callback: Async invocation in separate process
- HTTP/2: Supported via Gun (not HTTPC)
- Size Limits: Enforced during streaming to prevent memory exhaustion
- Cookie Handling: Supports multiple cookie header lines
- Timeout: Per-request timer with configurable duration
- Error Classification: Detailed status classes for monitoring
- Process Monitoring: Automatic cleanup on connection death
- Rate Limiting: Throttling before request execution
- Async Metrics: Recorded in spawned process to avoid blocking