Skip to content

ar_rate_limiter.erl - Request Rate Limiting Server

Overview

Purpose: Peer request rate limiting with configurable thresholds
Module: ar_rate_limiter
Behavior: gen_server
Window: 30 seconds (30,000 ms default)
Strategy: Token bucket with deferred execution

This module implements a GenServer-based rate limiter that throttles requests to peers based on path-specific limits. It uses a sliding window approach with queued timestamps to track request rates and automatically delays requests that would exceed configured limits.

Dependencies

  • Erlang/OTP: gen_server, queue
  • Arweave: hb_opts, hb_maps, hb_path
  • Includes: include/hb.hrl

Public Functions Overview

%% Server Management
-spec start_link(Opts) -> {ok, PID} | {error, Reason}
    when
        Opts :: map(),
        PID :: pid(),
        Reason :: term().
 
%% Rate Limiting
-spec throttle(Peer, Path, Opts) -> ok
    when
        Peer :: binary() | string(),
        Path :: binary() | string(),
        Opts :: map().
 
%% Control
-spec off() -> ok.
-spec on() -> ok.

Architecture

Rate Limiting Model

+-----------------------------------------------------+
|         ar_rate_limiter (GenServer)                 |
|                                                     |
|  State: #{                                          |
|    traces => #{                                     |
|      {Peer1, Type1} => {Count, Queue[Timestamps]},  |
|      {Peer2, Type2} => {Count, Queue[Timestamps]},  |
|      ...                                            |
|    },                                               |
|    off => boolean(),                                |
|    opts => map()                                    |
|  }                                                  |
|                                                     |
|  Sliding Window (30s default):                      |
|  [--------------------|---------]                   |
|   Old (discarded)    | Recent (counted)             |
+-----------------------------------------------------+

Request Flow

Client Request
      |
      v
   throttle(Peer, Path, Opts)
      |
      +-> Exempt Peer? --Yes--> Allow immediately
      |        |
      |        No
      |        v
      +-> Exempt Path? --Yes--> Allow immediately
      |        |
      |        No
      |        v
      +-> gen_server:call({throttle, Peer, Path})
              |
              v
          Check Rate
              |
              +-> Under 80% of limit? --Yes--> Reply OK
              |                                  |
              |                                  v
              |                              Update traces
              |
              +-> Over threshold? --Yes--> Delay 1 second
                                            |
                                            v
                                        Retry throttle

Timestamp Queue

Queue Structure (per Peer+Type):
{Count, Queue} where Queue = [T1, T2, T3, ..., Tn]
 
Example (30s window, Now = 100000):
  Timestamps: [70001, 75000, 80000, 90000, 95000]
                 |                    |
                 +-- Expired (>30s)   +-- Recent (<30s)
                 
After cut_trace:
  Timestamps: [80000, 90000, 95000]
  Count: 3

Public Functions

1. start_link/1

-spec start_link(Opts) -> {ok, PID} | {error, Reason}
    when
        Opts :: map(),
        PID :: pid(),
        Reason :: term().

Description: Start the rate limiter GenServer. Registers the server as ar_rate_limiter.

Options:
  • throttle_period - Sliding window period in ms (default: 30000)
  • throttle_rpm_by_path - Map of path patterns to {Type, RequestsPerMinute}
  • throttle_exempt_peers - List of peers exempt from rate limiting
  • throttle_exempt_paths - List of path patterns exempt from rate limiting
Test Code:
-module(ar_rate_limiter_start_test).
-include_lib("eunit/include/eunit.hrl").
 
start_link_basic_test() ->
    Opts = #{
        throttle_period => 30000,
        throttle_rpm_by_path => {api, 60}
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    ?assert(is_pid(PID)),
    ?assert(is_process_alive(PID)),
    ?assertEqual(PID, whereis(ar_rate_limiter)),
    gen_server:stop(PID).
 
start_link_with_exemptions_test() ->
    Opts = #{
        throttle_exempt_peers => [<<"trusted-peer">>],
        throttle_exempt_paths => [<<"/health">>, <<"/metrics">>]
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    ?assert(is_process_alive(PID)),
    gen_server:stop(PID).

2. throttle/3

-spec throttle(Peer, Path, Opts) -> ok
    when
        Peer :: binary() | string(),
        Path :: binary() | string(),
        Opts :: map().

Description: Throttle requests to a peer for a specific path. Blocks until it's safe to proceed based on rate limits. Automatically exempts configured peers and paths.

Behavior:
  1. Check if peer is in throttle_exempt_peers → Allow immediately
  2. Check if path matches throttle_exempt_paths → Allow immediately
  3. Call GenServer to check rate limit
  4. If under threshold (80% of limit) → Allow immediately
  5. If over threshold → Delay 1 second and retry

Blocking: This function blocks the caller until rate limit allows

Test Code:
-module(ar_rate_limiter_throttle_test).
-include_lib("eunit/include/eunit.hrl").
 
throttle_exempt_peer_test() ->
    Opts = #{
        throttle_exempt_peers => [<<"trusted-peer">>],
        throttle_rpm_by_path => {api, 60}
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    
    % Exempt peer should not be throttled
    Start = os:system_time(millisecond),
    ok = ar_rate_limiter:throttle(<<"trusted-peer">>, <<"/api/data">>, Opts),
    End = os:system_time(millisecond),
    
    ?assert((End - Start) < 100),  % Should be instant
    gen_server:stop(PID).
 
throttle_exempt_path_test() ->
    Opts = #{
        throttle_exempt_paths => [<<"/health">>, <<"/metrics">>],
        throttle_rpm_by_path => {default, 60}
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    
    % Exempt path should not be throttled
    Start = os:system_time(millisecond),
    ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/health">>, Opts),
    End = os:system_time(millisecond),
    
    ?assert((End - Start) < 100),
    gen_server:stop(PID).
 
throttle_under_limit_test() ->
    Opts = #{
        throttle_period => 30000,
        throttle_rpm_by_path => {api, 60}  % 60 requests per minute
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    
    % First few requests should be instant
    lists:foreach(
        fun(_) ->
            Start = os:system_time(millisecond),
            ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts),
            End = os:system_time(millisecond),
            ?assert((End - Start) < 100)
        end,
        lists:seq(1, 10)
    ),
    gen_server:stop(PID).
 
throttle_over_limit_test() ->
    Opts = #{
        throttle_period => 30000,
        throttle_rpm_by_path => {api, 10}  % 10 requests per minute = 5 per 30s
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    
    % Make requests up to 80% of half-limit (80% of 5 = 4 requests)
    lists:foreach(
        fun(_) ->
            ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts)
        end,
        lists:seq(1, 4)
    ),
    
    % Next request should be delayed (over 80% threshold)
    Start = os:system_time(millisecond),
    ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts),
    End = os:system_time(millisecond),
    
    ?assert((End - Start) >= 1000),  % Should wait at least 1 second
    gen_server:stop(PID).
 
throttle_different_peers_test() ->
    Opts = #{
        throttle_rpm_by_path => {api, 60}
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    
    % Different peers should have independent limits
    ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts),
    ok = ar_rate_limiter:throttle(<<"peer2">>, <<"/api/data">>, Opts),
    ok = ar_rate_limiter:throttle(<<"peer3">>, <<"/api/data">>, Opts),
    
    gen_server:stop(PID).
 
throttle_no_server_test() ->
    % If server not running, should not crash
    Opts = #{},
    ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts).

3. off/0

-spec off() -> ok.

Description: Turn rate limiting off globally. All subsequent throttle/3 calls will return immediately without checking limits.

Test Code:
-module(ar_rate_limiter_off_test).
-include_lib("eunit/include/eunit.hrl").
 
off_disables_limiting_test() ->
    Opts = #{
        throttle_rpm_by_path => {api, 60}
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    
    % Turn off rate limiting
    ok = ar_rate_limiter:off(),
    
    % Make many requests quickly - should all succeed instantly
    Start = os:system_time(millisecond),
    lists:foreach(
        fun(_) ->
            ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts)
        end,
        lists:seq(1, 100)
    ),
    End = os:system_time(millisecond),
    
    ?assert((End - Start) < 1000),  % Should be very fast
    gen_server:stop(PID).

4. on/0

-spec on() -> ok.

Description: Turn rate limiting back on globally after it was turned off with off/0.

Test Code:
-module(ar_rate_limiter_on_test).
-include_lib("eunit/include/eunit.hrl").
 
on_enables_limiting_test() ->
    Opts = #{
        throttle_rpm_by_path => {api, 60}
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    
    % Turn off, then back on
    ok = ar_rate_limiter:off(),
    ok = ar_rate_limiter:on(),
    
    % Rate limiting should work again
    lists:foreach(
        fun(_) ->
            ok = ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts)
        end,
        lists:seq(1, 4)
    ),
    
    gen_server:stop(PID).
 
on_off_toggle_test() ->
    Opts = #{
        throttle_rpm_by_path => {api, 60}
    },
    {ok, PID} = ar_rate_limiter:start_link(Opts),
    
    ok = ar_rate_limiter:off(),
    ok = ar_rate_limiter:on(),
    ok = ar_rate_limiter:off(),
    ok = ar_rate_limiter:on(),
    
    gen_server:stop(PID).

GenServer Callbacks

init/1

-spec init(Opts) -> {ok, State}
    when
        Opts :: map(),
        State :: #state{}.

Description: Initialize rate limiter state with empty traces map.

Initial State:
#state{
    traces = #{},      % No tracked peers yet
    off = false,       % Rate limiting enabled
    opts = Opts        % Configuration
}

handle_call/3

Messages:
  • {throttle, Peer, Path} - Check rate limit (delegates to handle_cast)

When off = true: Returns ok immediately without checking limits.

When off = false: Delegates to async handle_cast for non-blocking processing.


handle_cast/2

Messages:

{throttle, Peer, Path, From}

Process rate limit check:

  1. Get limit for path from throttle_rpm_by_path
  2. Get current trace for {Peer, Type}
  3. Update trace (remove expired timestamps)
  4. Check if under 80% of half-limit (30s window)
  5. If under: Reply OK and update trace
  6. If over: Schedule retry after 1 second

turn_off

Set off = true to disable rate limiting.

turn_on

Set off = false to enable rate limiting.


handle_info/2

Logs unhandled messages as warnings.


terminate/2

Clean shutdown (no special cleanup needed).


Internal Functions

cut_trace/4

-spec cut_trace(N, Trace, Now, Opts) -> {NewN, NewTrace}
    when
        N :: non_neg_integer(),
        Trace :: queue:queue(),
        Now :: integer(),
        Opts :: map(),
        NewN :: non_neg_integer(),
        NewTrace :: queue:queue().

Description: Remove expired timestamps from the trace queue. Recursively dequeues timestamps older than the throttle period (default 30 seconds).

Algorithm:
1. Dequeue oldest timestamp
2. If timestamp < (Now - Period):
   - Decrement count
   - Recurse with remaining queue
3. Else:
   - Return current count and queue

Configuration

Throttle Period

throttle_period => 30000  % 30 seconds in milliseconds

Default: 30,000 ms (30 seconds)
Description: Sliding window size for counting requests

RPM by Path

throttle_rpm_by_path => #{
    <<"/api/v1/.*">> => {api_v1, 60},      % 60 requests per minute
    <<"/api/v2/.*">> => {api_v2, 120},     % 120 requests per minute
    <<"/tx/.*">> => {transactions, 30},    % 30 requests per minute
    <<".*">> => {default, 100}             % 100 requests per minute (catch-all)
}

Format: {Type, RequestsPerMinute}

  • Type - Identifier for grouping rate limits
  • RequestsPerMinute - Maximum requests allowed per 60 seconds

Note: Actual limit is half for 30-second window, and throttling starts at 80% of that.

Exempt Peers

throttle_exempt_peers => [
    <<"127.0.0.1">>,
    <<"trusted-node-1.arweave.net">>,
    <<"admin-user">>
]

Description: List of peers that bypass rate limiting entirely.

Exempt Paths

throttle_exempt_paths => [
    <<"/health">>,
    <<"/metrics">>,
    <<"/status">>
]

Description: List of path patterns that bypass rate limiting (regex supported).


Rate Calculation

Effective Limits

Given configuration:

  • throttle_rpm_by_path: 60 requests per minute
  • throttle_period: 30000 ms (30 seconds)
Calculation:
RPM = 60                           % Requests per minute
HalfLimit = RPM div 2              % 30 (for 30s window)
Threshold = HalfLimit * 80 / 100   % 24 (80% of half limit)
Effective Behavior:
  • Window: 30 seconds
  • Hard limit: 30 requests per 30 seconds
  • Throttle starts: After 24 requests (80% of 30)
  • Action: 1-second delays for additional requests

Example Timeline

RPM = 60, Window = 30s, Threshold = 24
 
Time (s) | Requests | Action
---------|----------|--------------------
0        | 1-24     | Allowed immediately
25       | 25       | Delayed 1s
26       | 26       | Delayed 1s
27       | 27       | Delayed 1s
...
30       | (window) | Old requests expire
31       | 31       | Allowed (if under threshold after expiry)

Common Patterns

%% Start rate limiter in supervision tree
{ar_rate_limiter, 
    {ar_rate_limiter, start_link, [#{
        throttle_period => 30000,
        throttle_rpm_by_path => #{
            <<"/api/.*">> => {api, 100},
            <<"/tx/.*">> => {tx, 50}
        },
        throttle_exempt_peers => [<<"127.0.0.1">>],
        throttle_exempt_paths => [<<"/health">>]
    }]},
    permanent, 5000, worker, [ar_rate_limiter]
}
 
%% Throttle before making request
throttle_and_request(Peer, Path, Data, Opts) ->
    ok = ar_rate_limiter:throttle(Peer, Path, Opts),
    make_http_request(Peer, Path, Data).
 
%% Disable during maintenance
maintenance_mode() ->
    ar_rate_limiter:off(),
    % ... perform maintenance ...
    ar_rate_limiter:on().
 
%% Path-specific limits
Opts = #{
    throttle_rpm_by_path => #{
        <<"/api/v1/.*">> => {api_v1, 60},
        <<"/api/v2/.*">> => {api_v2, 120},
        <<"/graphql">> => {graphql, 30},
        <<".*">> => {default, 100}
    }
},
ar_rate_limiter:throttle(Peer, <<"/api/v2/query">>, Opts).
 
%% Exempt trusted peers
Opts = #{
    throttle_exempt_peers => [
        <<"localhost">>,
        <<"monitoring-service">>,
        <<"backup-node">>
    ]
},
ar_rate_limiter:throttle(<<"localhost">>, Path, Opts).  % Instant
 
%% Per-peer tracking
% Peer1 and Peer2 have independent rate limits
ar_rate_limiter:throttle(<<"peer1">>, <<"/api/data">>, Opts),
ar_rate_limiter:throttle(<<"peer2">>, <<"/api/data">>, Opts),
% Each peer gets their own 60 RPM allowance

State Structure

#state{
    traces = #{
        {<<"peer1">>, api} => {5, queue:from_list([T1, T2, T3, T4, T5])},
        {<<"peer2">>, api} => {3, queue:from_list([T6, T7, T8])},
        {<<"peer1">>, tx} => {2, queue:from_list([T9, T10])}
    },
    off = false,
    opts = #{...}
}
Fields:
  • traces - Map of {Peer, Type} to {Count, TimestampQueue}
  • off - Boolean flag to globally disable rate limiting
  • opts - Configuration options
Trace Entry:
  • Key: {Peer, Type} tuple
  • Value: {Count, Queue} where:
    • Count - Number of recent requests (within window)
    • Queue - Queue of timestamps (in milliseconds)

Performance Characteristics

Time Complexity

  • throttle/3 (under limit): O(1) amortized
  • throttle/3 (over limit): O(1) + 1 second delay
  • cut_trace/4: O(E) where E is number of expired entries
  • State lookup: O(1) (hash map)
  • Queue operations: O(1) for in/out

Space Complexity

  • Per peer-type: O(W) where W is requests in window
  • Total: O(P × T × W) where:
    • P = number of unique peers
    • T = number of unique types
    • W = average requests per window

Memory Management

  • Expired timestamps automatically removed
  • No memory leak (sliding window)
  • Worst case: ~1KB per active peer-type pair

Comparison with Alternatives

vs. Token Bucket

% Token Bucket: Fixed rate replenishment
% ar_rate_limiter: Sliding window with delayed execution
 
% Token Bucket: Immediate rejection when empty
% ar_rate_limiter: Delays request until safe

vs. Leaky Bucket

% Leaky Bucket: Constant rate output
% ar_rate_limiter: Bursty allowed up to threshold
 
% Leaky Bucket: Queue requests
% ar_rate_limiter: Block caller

vs. Fixed Window

% Fixed Window: Resets at interval boundaries
% ar_rate_limiter: True sliding window (no edge effects)

Edge Cases

Server Not Running

% throttle/3 catches noproc and returns ok
% Graceful degradation - no rate limiting

Clock Skew

% Uses os:system_time(millisecond)
% Monotonic within same VM
% Issues if system time adjusted backwards

Delayed Message Processing

% erlang:send_after used for retries
% Delayed messages processed in order
% Potential queue buildup under heavy load

Testing Strategies

Unit Tests

% Test exemptions
test_exempt_peers() ->
    % Verify exempt peers bypass limiting
    
% Test threshold calculation
test_threshold() ->
    % Verify 80% of half-limit calculation

Integration Tests

% Test actual delays
test_delay_timing() ->
    % Measure actual delay duration
    
% Test sliding window
test_window_expiry() ->
    % Verify old requests expire correctly

Load Tests

% Test concurrent requests
test_concurrent_throttling() ->
    % Multiple processes hitting same peer
    
% Test many peers
test_many_peers() ->
    % Verify independent peer tracking

Monitoring

Events Logged

?event({approaching_peer_rpm_limit, 
    {peer, Peer},
    {path, Path},
    {minute_limit, Limit},
    {caller, From}
})

Metrics to Track

  • Requests delayed per peer
  • Average delay time
  • Traces map size (memory usage)
  • Requests by path type
  • Exempt request count

References

  • Token Bucket Algorithm - Rate limiting strategy
  • Sliding Window - Time-based request counting
  • GenServer - OTP behavior for server processes
  • Erlang Queues - Efficient FIFO data structure

Notes

  1. 80% Threshold: Throttling starts at 80% of half-limit to provide headroom
  2. 30-Second Window: Half of 60-second minute for finer-grained control
  3. 1-Second Delays: Fixed delay when over threshold (not adaptive)
  4. Per-Peer Tracking: Each peer has independent rate limits
  5. Type Grouping: Multiple paths can share same rate limit via type
  6. Blocking Calls: throttle/3 blocks caller until safe to proceed
  7. Graceful Degradation: Returns OK if server not running
  8. No Backpressure: Delayed requests kept in message queue
  9. Monotonic Time: Uses os:system_time(millisecond) for consistency
  10. Regex Paths: Exempt paths support regex pattern matching