ar_rate_limiter.erl - Request Rate Limiting Server
Overview
Purpose: Peer request rate limiting with configurable thresholds
Module: ar_rate_limiter
Behavior: gen_server
Window: 30 seconds (30,000 ms default)
Strategy: Token bucket with deferred execution
This module implements a GenServer-based rate limiter that throttles requests to peers based on path-specific limits. It uses a sliding window approach with queued timestamps to track request rates and automatically delays requests that would exceed configured limits.
Dependencies
- Erlang/OTP:
gen_server,queue - Arweave:
hb_opts,hb_maps,hb_path - Includes:
include/hb.hrl
Public Functions Overview
%% Server Management
-spec start_link(Opts) -> {ok, PID} | {error, Reason}
when
Opts :: map(),
PID :: pid(),
Reason :: term().
%% Rate Limiting
-spec throttle(Peer, Path, Opts) -> ok
when
Peer :: binary() | string(),
Path :: binary() | string(),
Opts :: map().
%% Control
-spec off() -> ok.
-spec on() -> ok.Architecture
Rate Limiting Model
+-----------------------------------------------------+
| ar_rate_limiter (GenServer) |
| |
| State: #{ |
| traces => #{ |
| {Peer1, Type1} => {Count, Queue[Timestamps]}, |
| {Peer2, Type2} => {Count, Queue[Timestamps]}, |
| ... |
| }, |
| off => boolean(), |
| opts => map() |
| } |
| |
| Sliding Window (30s default): |
| [--------------------|---------] |
| Old (discarded) | Recent (counted) |
+-----------------------------------------------------+Request Flow
Client Request
|
v
throttle(Peer, Path, Opts)
|
+-> Exempt Peer? --Yes--> Allow immediately
| |
| No
| v
+-> Exempt Path? --Yes--> Allow immediately
| |
| No
| v
+-> gen_server:call({throttle, Peer, Path})
|
v
Check Rate
|
+-> Under 80% of limit? --Yes--> Reply OK
| |
| v
| Update traces
|
+-> Over threshold? --Yes--> Delay 1 second
|
v
Retry throttleTimestamp Queue
Queue Structure (per Peer+Type):
{Count, Queue} where Queue = [T1, T2, T3, ..., Tn]
Example (30s window, Now = 100000):
Timestamps: [70001, 75000, 80000, 90000, 95000]
| |
+-- Expired (>30s) +-- Recent (<30s)
After cut_trace:
Timestamps: [80000, 90000, 95000]
Count: 3Public Functions
1. start_link/1
-spec start_link(Opts) -> {ok, PID} | {error, Reason}
when
Opts :: map(),
PID :: pid(),
Reason :: term().Description: Start the rate limiter GenServer. Registers the server as ar_rate_limiter.
throttle_period- Sliding window period in ms (default: 30000)throttle_rpm_by_path- Map of path patterns to{Type, RequestsPerMinute}throttle_exempt_peers- List of peers exempt from rate limitingthrottle_exempt_paths- List of path patterns exempt from rate limiting
-module(ar_rate_limiter_start_test).
-include_lib("eunit/include/eunit.hrl").
start_link_basic_test() ->
Opts = #{
throttle_period => 30000,
throttle_rpm_by_path => {api, 60}
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
?assert(is_pid(PID)),
?assert(is_process_alive(PID)),
?assertEqual(PID, whereis(ar_rate_limiter)),
gen_server:stop(PID).
start_link_with_exemptions_test() ->
Opts = #{
throttle_exempt_peers => [<<"trusted-peer">>],
throttle_exempt_paths => [<<"/health">>, <<"/metrics">>]
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
?assert(is_process_alive(PID)),
gen_server:stop(PID).2. throttle/3
-spec throttle(Peer, Path, Opts) -> ok
when
Peer :: binary() | string(),
Path :: binary() | string(),
Opts :: map().Description: Throttle requests to a peer for a specific path. Blocks until it's safe to proceed based on rate limits. Automatically exempts configured peers and paths.
Behavior:- Check if peer is in
throttle_exempt_peers→ Allow immediately - Check if path matches
throttle_exempt_paths→ Allow immediately - Call GenServer to check rate limit
- If under threshold (80% of limit) → Allow immediately
- If over threshold → Delay 1 second and retry
Blocking: This function blocks the caller until rate limit allows
Test Code:-module(ar_rate_limiter_throttle_test).
-include_lib("eunit/include/eunit.hrl").
throttle_exempt_peer_test() ->
Opts = #{
throttle_exempt_peers => [<<"trusted-peer">>],
throttle_rpm_by_path => {api, 60}
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
% Exempt peer should not be throttled
Start = os:system_time(millisecond),
ok = ar_rate_limiter:throttle(<<"trusted-peer">>, <<"/api/data">>, Opts),
End = os:system_time(millisecond),
?assert((End - Start) < 100), % Should be instant
gen_server:stop(PID).
throttle_exempt_path_test() ->
Opts = #{
throttle_exempt_paths => [<<"/health">>, <<"/metrics">>],
throttle_rpm_by_path => {default, 60}
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
% Exempt path should not be throttled
Start = os:system_time(millisecond),
ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/health">>, Opts),
End = os:system_time(millisecond),
?assert((End - Start) < 100),
gen_server:stop(PID).
throttle_under_limit_test() ->
Opts = #{
throttle_period => 30000,
throttle_rpm_by_path => {api, 60} % 60 requests per minute
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
% First few requests should be instant
lists:foreach(
fun(_) ->
Start = os:system_time(millisecond),
ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts),
End = os:system_time(millisecond),
?assert((End - Start) < 100)
end,
lists:seq(1, 10)
),
gen_server:stop(PID).
throttle_over_limit_test() ->
Opts = #{
throttle_period => 30000,
throttle_rpm_by_path => {api, 10} % 10 requests per minute = 5 per 30s
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
% Make requests up to 80% of half-limit (80% of 5 = 4 requests)
lists:foreach(
fun(_) ->
ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts)
end,
lists:seq(1, 4)
),
% Next request should be delayed (over 80% threshold)
Start = os:system_time(millisecond),
ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts),
End = os:system_time(millisecond),
?assert((End - Start) >= 1000), % Should wait at least 1 second
gen_server:stop(PID).
throttle_different_peers_test() ->
Opts = #{
throttle_rpm_by_path => {api, 60}
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
% Different peers should have independent limits
ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts),
ok = ar_rate_limiter:throttle(<<"peer2">>, <<"/api/data">>, Opts),
ok = ar_rate_limiter:throttle(<<"peer3">>, <<"/api/data">>, Opts),
gen_server:stop(PID).
throttle_no_server_test() ->
% If server not running, should not crash
Opts = #{},
ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts).3. off/0
-spec off() -> ok.Description: Turn rate limiting off globally. All subsequent throttle/3 calls will return immediately without checking limits.
-module(ar_rate_limiter_off_test).
-include_lib("eunit/include/eunit.hrl").
off_disables_limiting_test() ->
Opts = #{
throttle_rpm_by_path => {api, 60}
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
% Turn off rate limiting
ok = ar_rate_limiter:off(),
% Make many requests quickly - should all succeed instantly
Start = os:system_time(millisecond),
lists:foreach(
fun(_) ->
ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts)
end,
lists:seq(1, 100)
),
End = os:system_time(millisecond),
?assert((End - Start) < 1000), % Should be very fast
gen_server:stop(PID).4. on/0
-spec on() -> ok.Description: Turn rate limiting back on globally after it was turned off with off/0.
-module(ar_rate_limiter_on_test).
-include_lib("eunit/include/eunit.hrl").
on_enables_limiting_test() ->
Opts = #{
throttle_rpm_by_path => {api, 60}
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
% Turn off, then back on
ok = ar_rate_limiter:off(),
ok = ar_rate_limiter:on(),
% Rate limiting should work again
lists:foreach(
fun(_) ->
ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts)
end,
lists:seq(1, 4)
),
gen_server:stop(PID).
on_off_toggle_test() ->
Opts = #{
throttle_rpm_by_path => {api, 60}
},
{ok, PID} = ar_rate_limiter:start_link(Opts),
ok = ar_rate_limiter:off(),
ok = ar_rate_limiter:on(),
ok = ar_rate_limiter:off(),
ok = ar_rate_limiter:on(),
gen_server:stop(PID).GenServer Callbacks
init/1
-spec init(Opts) -> {ok, State}
when
Opts :: map(),
State :: #state{}.Description: Initialize rate limiter state with empty traces map.
Initial State:#state{
traces = #{}, % No tracked peers yet
off = false, % Rate limiting enabled
opts = Opts % Configuration
}handle_call/3
Messages:{throttle, Peer, Path}- Check rate limit (delegates tohandle_cast)
When off = true:
Returns ok immediately without checking limits.
When off = false:
Delegates to async handle_cast for non-blocking processing.
handle_cast/2
Messages:{throttle, Peer, Path, From}
Process rate limit check:
- Get limit for path from
throttle_rpm_by_path - Get current trace for
{Peer, Type} - Update trace (remove expired timestamps)
- Check if under 80% of half-limit (30s window)
- If under: Reply OK and update trace
- If over: Schedule retry after 1 second
turn_off
Set off = true to disable rate limiting.
turn_on
Set off = false to enable rate limiting.
handle_info/2
Logs unhandled messages as warnings.
terminate/2
Clean shutdown (no special cleanup needed).
Internal Functions
cut_trace/4
-spec cut_trace(N, Trace, Now, Opts) -> {NewN, NewTrace}
when
N :: non_neg_integer(),
Trace :: queue:queue(),
Now :: integer(),
Opts :: map(),
NewN :: non_neg_integer(),
NewTrace :: queue:queue().Description: Remove expired timestamps from the trace queue. Recursively dequeues timestamps older than the throttle period (default 30 seconds).
Algorithm:1. Dequeue oldest timestamp
2. If timestamp < (Now - Period):
- Decrement count
- Recurse with remaining queue
3. Else:
- Return current count and queueConfiguration
Throttle Period
throttle_period => 30000 % 30 seconds in millisecondsDefault: 30,000 ms (30 seconds)
Description: Sliding window size for counting requests
RPM by Path
throttle_rpm_by_path => #{
<<"/api/v1/.*">> => {api_v1, 60}, % 60 requests per minute
<<"/api/v2/.*">> => {api_v2, 120}, % 120 requests per minute
<<"/tx/.*">> => {transactions, 30}, % 30 requests per minute
<<".*">> => {default, 100} % 100 requests per minute (catch-all)
}Format: {Type, RequestsPerMinute}
Type- Identifier for grouping rate limitsRequestsPerMinute- Maximum requests allowed per 60 seconds
Note: Actual limit is half for 30-second window, and throttling starts at 80% of that.
Exempt Peers
throttle_exempt_peers => [
<<"127.0.0.1">>,
<<"trusted-node-1.arweave.net">>,
<<"admin-user">>
]Description: List of peers that bypass rate limiting entirely.
Exempt Paths
throttle_exempt_paths => [
<<"/health">>,
<<"/metrics">>,
<<"/status">>
]Description: List of path patterns that bypass rate limiting (regex supported).
Rate Calculation
Effective Limits
Given configuration:
throttle_rpm_by_path:60requests per minutethrottle_period:30000ms (30 seconds)
RPM = 60 % Requests per minute
HalfLimit = RPM div 2 % 30 (for 30s window)
Threshold = HalfLimit * 80 / 100 % 24 (80% of half limit)- Window: 30 seconds
- Hard limit: 30 requests per 30 seconds
- Throttle starts: After 24 requests (80% of 30)
- Action: 1-second delays for additional requests
Example Timeline
RPM = 60, Window = 30s, Threshold = 24
Time (s) | Requests | Action
---------|----------|--------------------
0 | 1-24 | Allowed immediately
25 | 25 | Delayed 1s
26 | 26 | Delayed 1s
27 | 27 | Delayed 1s
...
30 | (window) | Old requests expire
31 | 31 | Allowed (if under threshold after expiry)Common Patterns
%% Start rate limiter in supervision tree
{ar_rate_limiter,
{ar_rate_limiter, start_link, [#{
throttle_period => 30000,
throttle_rpm_by_path => #{
<<"/api/.*">> => {api, 100},
<<"/tx/.*">> => {tx, 50}
},
throttle_exempt_peers => [<<"127.0.0.1">>],
throttle_exempt_paths => [<<"/health">>]
}]},
permanent, 5000, worker, [ar_rate_limiter]
}
%% Throttle before making request
throttle_and_request(Peer, Path, Data, Opts) ->
ok = ar_rate_limiter:throttle(Peer, Path, Opts),
make_http_request(Peer, Path, Data).
%% Disable during maintenance
maintenance_mode() ->
ar_rate_limiter:off(),
% ... perform maintenance ...
ar_rate_limiter:on().
%% Path-specific limits
Opts = #{
throttle_rpm_by_path => #{
<<"/api/v1/.*">> => {api_v1, 60},
<<"/api/v2/.*">> => {api_v2, 120},
<<"/graphql">> => {graphql, 30},
<<".*">> => {default, 100}
}
},
ar_rate_limiter:throttle(Peer, <<"/api/v2/query">>, Opts).
%% Exempt trusted peers
Opts = #{
throttle_exempt_peers => [
<<"localhost">>,
<<"monitoring-service">>,
<<"backup-node">>
]
},
ar_rate_limiter:throttle(<<"localhost">>, Path, Opts). % Instant
%% Per-peer tracking
% Peer1 and Peer2 have independent rate limits
ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts),
ar_rate_limiter:throttle(<<"peer2">>, <<"/api/data">>, Opts),
% Each peer gets their own 60 RPM allowanceState Structure
#state{
traces = #{
{<<"peer1">>, api} => {5, queue:from_list([T1, T2, T3, T4, T5])},
{<<"peer2">>, api} => {3, queue:from_list([T6, T7, T8])},
{<<"peer1">>, tx} => {2, queue:from_list([T9, T10])}
},
off = false,
opts = #{...}
}traces- Map of{Peer, Type}to{Count, TimestampQueue}off- Boolean flag to globally disable rate limitingopts- Configuration options
- Key:
{Peer, Type}tuple - Value:
{Count, Queue}where:Count- Number of recent requests (within window)Queue- Queue of timestamps (in milliseconds)
Performance Characteristics
Time Complexity
throttle/3(under limit): O(1) amortizedthrottle/3(over limit): O(1) + 1 second delaycut_trace/4: O(E) where E is number of expired entries- State lookup: O(1) (hash map)
- Queue operations: O(1) for in/out
Space Complexity
- Per peer-type: O(W) where W is requests in window
- Total: O(P × T × W) where:
- P = number of unique peers
- T = number of unique types
- W = average requests per window
Memory Management
- Expired timestamps automatically removed
- No memory leak (sliding window)
- Worst case: ~1KB per active peer-type pair
Comparison with Alternatives
vs. Token Bucket
% Token Bucket: Fixed rate replenishment
% ar_rate_limiter: Sliding window with delayed execution
% Token Bucket: Immediate rejection when empty
% ar_rate_limiter: Delays request until safevs. Leaky Bucket
% Leaky Bucket: Constant rate output
% ar_rate_limiter: Bursty allowed up to threshold
% Leaky Bucket: Queue requests
% ar_rate_limiter: Block callervs. Fixed Window
% Fixed Window: Resets at interval boundaries
% ar_rate_limiter: True sliding window (no edge effects)Edge Cases
Server Not Running
% throttle/3 catches noproc and returns ok
% Graceful degradation - no rate limitingClock Skew
% Uses os:system_time(millisecond)
% Monotonic within same VM
% Issues if system time adjusted backwardsDelayed Message Processing
% erlang:send_after used for retries
% Delayed messages processed in order
% Potential queue buildup under heavy loadTesting Strategies
Unit Tests
% Test exemptions
test_exempt_peers() ->
% Verify exempt peers bypass limiting
% Test threshold calculation
test_threshold() ->
% Verify 80% of half-limit calculationIntegration Tests
% Test actual delays
test_delay_timing() ->
% Measure actual delay duration
% Test sliding window
test_window_expiry() ->
% Verify old requests expire correctlyLoad Tests
% Test concurrent requests
test_concurrent_throttling() ->
% Multiple processes hitting same peer
% Test many peers
test_many_peers() ->
% Verify independent peer trackingMonitoring
Events Logged
?event({approaching_peer_rpm_limit,
{peer, Peer},
{path, Path},
{minute_limit, Limit},
{caller, From}
})Metrics to Track
- Requests delayed per peer
- Average delay time
- Traces map size (memory usage)
- Requests by path type
- Exempt request count
References
- Token Bucket Algorithm - Rate limiting strategy
- Sliding Window - Time-based request counting
- GenServer - OTP behavior for server processes
- Erlang Queues - Efficient FIFO data structure
Notes
- 80% Threshold: Throttling starts at 80% of half-limit to provide headroom
- 30-Second Window: Half of 60-second minute for finer-grained control
- 1-Second Delays: Fixed delay when over threshold (not adaptive)
- Per-Peer Tracking: Each peer has independent rate limits
- Type Grouping: Multiple paths can share same rate limit via type
- Blocking Calls:
throttle/3blocks caller until safe to proceed - Graceful Degradation: Returns OK if server not running
- No Backpressure: Delayed requests kept in message queue
- Monotonic Time: Uses
os:system_time(millisecond)for consistency - Regex Paths: Exempt paths support regex pattern matching