Rate Limiters#

Rate-limiting algorithms that count requests in Redis and reject traffic once a threshold is reached. All algorithms live in core_redis.rate_limits and share a common pattern:

  • Each limiter is instantiated once (e.g. at module level or in a DI container) with a key_prefix and optional redis_kwargs.

  • is_allowed(identifier, ...) is called on every incoming request.

  • The identifier can be any string: a user ID, IP address, API key, etc.

from core_redis.rate_limits import FixedWindow  # or any other algorithm

limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})
allowed = limiter.is_allowed("user_123", limit=100, window=60)

Algorithm comparison#

Algorithm

Memory / user

Burst handling

Accuracy

Best for

FixedWindow

1 counter

Poor (2x at boundary)

Low

Simple use cases

SlidingWindowLog

N timestamps

Excellent (no burst)

Very high

Precise control

TokenBucket

2 values

Controlled (up to cap)

High

Production APIs

LeakyBucket

1 queue

Smoothed (queue + drop)

High

Constant output rate

FixedWindow#

Divides time into fixed-size buckets and tracks a request counter per bucket. A request is allowed while the counter is within limit; once the bucket rolls over the counter resets.

class core_redis.rate_limits.FixedWindow(key_prefix: str = 'rate_limit:fixed:', redis_kwargs: Dict[str, Any] | None = None)[source]#

Fixed-window rate limiter backed by Redis. The time axis is divided into fixed-size buckets (windows). Each bucket has its own counter stored in Redis. A request is allowed when the counter for the current bucket has not yet reached limit; otherwise it is rejected until the bucket rolls over.

Burst problem

Because the window boundary is a hard reset, a client can issue up to 2 × limit requests in a short period by timing requests around the boundary:

  • In the last second of window N send limit requests → all allowed.

  • In the first second of window N + 1 send limit requests → all allowed.

Both bursts together happen within ~2 seconds even though the nominal rate is limit requests per window seconds. If smooth traffic is required, prefer a sliding-window or token-bucket algorithm instead.

Parameters:
  • key_prefix – String prepended to every Redis key. Default: "rate_limit:fixed:".

  • redis_kwargs – Keyword arguments forwarded verbatim to RedisClient (e.g. {"host": "localhost", "port": 6379, "db": 0}).

Example

from core_redis.rate_limits import FixedWindow

limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})
# Returns True while the counter is within the limit, False once exceeded.
allowed = limiter.is_allowed("user_123", limit=100, window=60)
__init__(key_prefix: str = 'rate_limit:fixed:', redis_kwargs: Dict[str, Any] | None = None) None[source]#
is_allowed(identifier: str, limit: int = 100, window: int = 60) bool[source]#

Check whether a request from identifier is within the rate limit. The counter for the current fixed window is atomically incremented via a Redis pipeline. On the first increment the key is given a TTL equal to window so Redis cleans it up automatically once the window closes.

Parameters:
  • identifier – Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key).

  • limit – Maximum number of requests allowed within a single window. Default: 100.

  • window – Window duration in seconds. Default: 60.

Returns:

True if the request is within the limit and should be allowed; False if the limit has been exceeded.

Note

INCR and EXPIRE are dispatched in a single pipeline batch to minimize round-trips. The operations are not wrapped in a Lua script, so in the unlikely event of a crash between the two commands the key may persist without a TTL. In production deployments with strict correctness requirements, consider replacing the pipeline with a Lua script evaluated via EVAL.

from core_redis.rate_limits import FixedWindow

limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})
allowed = limiter.is_allowed("user_123", limit=100, window=60)

The counter is incremented and the TTL is set in a single Redis pipeline call (INCR + EXPIRE), keeping round-trips to one per request.

A common pattern is to guard outbound HTTP calls so a client never exceeds an upstream API’s rate limit:

import requests
from core_redis.rate_limits import FixedWindow

limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})

def call_api(user_id: str) -> None:
    if not limiter.is_allowed(user_id, limit=100, window=60):
        print(f"[{user_id}] BLOCKED —> rate limit exceeded")
        return

    response = requests.get("https://api.example.com/data", timeout=5)
    print(f"[{user_id}] {response.status_code}")

Warning

Burst problem: because the window boundary is a hard reset, a client can send up to 2 × limit requests in rapid succession by timing them around the window edge (limit at the end of window N, then limit at the start of window N+1). If smooth traffic enforcement is required, use a sliding-window or token-bucket algorithm instead.

SlidingWindowLog#

Stores a timestamp for every request in a Redis sorted set. On each call, entries older than now - window are pruned before counting, so the window always reflects exactly the last window seconds and the burst problem does not occur.

Returns a (allowed, remaining) tuple so callers know how many slots are left without a second round-trip.

class core_redis.rate_limits.SlidingWindowLog(key_prefix: str = 'rate_limit:sliding:', redis_kwargs: Dict[str, Any] | None = None)[source]#

Sliding-window log rate limiter backed by Redis. Each request’s timestamp is stored in a Redis sorted set (score = timestamp, member = timestamp as string).

On every call, entries older than current_time window are pruned and the remaining count is compared against limit. Because the window moves with each request, there is no fixed boundary to exploit, the burst problem present in FixedWindow does not occur.

Trade-offs vs. Fixed Window

  • Accuracy: every decision is based on the exact request history for the last window seconds; there are no boundary artifacts.

  • Memory: up to limit entries are stored per identifier instead of a single counter, so memory usage scales with limit.

  • Writes on every request: allowed requests write a new sorted-set member; blocked requests still perform a ZREMRANGEBYSCORE + ZCARD read.

Parameters:
  • key_prefix – String prepended to every Redis key. Default: "rate_limit:sliding:".

  • redis_kwargs – Keyword arguments forwarded verbatim to RedisClient (e.g. {"host": "localhost", "port": 6379, "db": 0}).

Example

from core_redis.rate_limits import SlidingWindowLog

limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379})
allowed, remaining = limiter.is_allowed("user_123", limit=100, window=60)
__init__(key_prefix: str = 'rate_limit:sliding:', redis_kwargs: Dict[str, Any] | None = None) None[source]#
is_allowed(identifier: str, limit: int = 100, window: int = 60) Tuple[bool, int][source]#

Check whether a request from identifier is within the rate limit. Stale entries (older than current_time window) are pruned first, then the remaining count is evaluated against limit.

If the request is allowed, the current timestamp is appended to the sorted set and the key TTL is refreshed.

Parameters:
  • identifier – Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key).

  • limit – Maximum number of requests allowed within the rolling window. Default: 100.

  • window – Rolling window duration in seconds. Default: 60.

Returns:

A (allowed, remaining) tuple:

  • allowed: True if the request is within the limit, False if it should be rejected.

  • remaining: number of requests still available in the current window (0 when the request is blocked).

Note

ZREMRANGEBYSCORE and ZCARD are issued in one pipeline batch to prune and count in a single round-trip. The conditional ZADD + EXPIRE is a second pipeline batch executed only when the request is allowed. For strict atomicity under very high concurrency, replace both pipelines with a single Lua script evaluated via EVAL.

from core_redis.rate_limits import SlidingWindowLog

limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379})
allowed, remaining = limiter.is_allowed("user_123", limit=100, window=60)

if not allowed:
    print("Rate limit exceeded")
else:
    print(f"{remaining} requests remaining in this window")

HTTP-guard pattern:

import requests
from core_redis.rate_limits import SlidingWindowLog

limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379})

def call_api(user_id: str) -> None:
    allowed, remaining = limiter.is_allowed(user_id, limit=100, window=60)
    if not allowed:
        print(f"[{user_id}] BLOCKED —> rate limit exceeded")
        return

    response = requests.get("https://api.example.com/data", timeout=5)
    print(f"[{user_id}] {response.status_code}  ({remaining} remaining)")

Note

Trade-offs vs. FixedWindow

  • Accuracy: no boundary artefacts; any window-second period contains at most limit requests.

  • Memory: stores up to limit timestamps per identifier instead of a single counter.

  • Round-trips: two pipeline batches per allowed request (ZREMRANGEBYSCORE + ZCARD, then ZADD + EXPIRE); one batch for blocked requests.

TokenBucket#

Maintains a virtual token bucket per identifier in a Redis hash. Tokens refill continuously at refill_rate per second up to capacity. Each request consumes tokens_per_request tokens. A request is allowed when the bucket has enough tokens; otherwise it is rejected.

Returns a (allowed, available_tokens) tuple.

class core_redis.rate_limits.TokenBucket(key_prefix: str = 'rate_limit:token_bucket:', redis_kwargs: Dict[str, Any] | None = None)[source]#

Token-bucket rate limiter backed by Redis. A virtual bucket holds up to capacity tokens. Each request consumes tokens_per_request tokens. Tokens are replenished continuously at refill_rate tokens per second up to capacity. A request is allowed when the bucket contains enough tokens; otherwise it is rejected.

Characteristics

  • Burst-friendly: the bucket can absorb a short burst of up to capacity requests before throttling begins, unlike fixed or sliding windows that spread the budget evenly.

  • Smooth long-term rate: over time, throughput converges to refill_rate requests/second regardless of burst patterns.

  • Variable cost: tokens_per_request lets different operations consume different amounts (e.g. a bulk export costs 10 tokens, a lightweight read costs 1).

Redis storage

Each identifier maps to a Redis hash with two fields:

  • tokens: current token count (float string).

  • last_refill: Unix timestamp of the last write (float string).

The hash TTL is set to ceil(capacity / refill_rate), the time it takes to fully refill an empty bucket. If the key expires (the identifier has been idle that long) the next request re-initializes the bucket as full, which is the correct behavior.

Parameters:
  • key_prefix – String prepended to every Redis key. Default: "rate_limit:token_bucket:".

  • redis_kwargs – Keyword arguments forwarded verbatim to RedisClient (e.g. {"host": "localhost", "port": 6379, "db": 0}).

Example

from core_redis.rate_limits import TokenBucket

limiter = TokenBucket(redis_kwargs={"host": "localhost", "port": 6379})

allowed, tokens = limiter.is_allowed(
    "user_123", capacity=100, refill_rate=10.0
)
__init__(key_prefix: str = 'rate_limit:token_bucket:', redis_kwargs: Dict[str, Any] | None = None) None[source]#
is_allowed(identifier: str, capacity: int = 100, refill_rate: float = 10.0, tokens_per_request: int = 1) Tuple[bool, int][source]#

Check whether a request from identifier can be served.

The bucket state is read with HGETALL, tokens are refilled proportionally to the elapsed time since the last write, and if enough tokens are available, the updated state is persisted with a HSET + EXPIRE pipeline.

Parameters:
  • identifier – Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key).

  • capacity – Maximum number of tokens the bucket can hold (burst ceiling). Default: 100.

  • refill_rate – Tokens added per second. Long-term throughput converges to this value. Default: 10.0.

  • tokens_per_request – Tokens consumed by this request. Use values greater than 1 for expensive operations. Default: 1.

Returns:

A (allowed, available_tokens) tuple:

  • allowed: True if the bucket had enough tokens and the request is permitted; False if it should be rejected.

  • available_tokens: tokens remaining after this request (when allowed) or tokens currently in the bucket (when blocked).

Raises:

ValueError – If tokens_per_request exceeds capacity (a request that can never be satisfied).

Note

The read (HGETALL) and write (HSET + EXPIRE) are two separate operations. A concurrent request between them could produce a marginal over-count under very high contention. For strict correctness, replace the read-modify-write with a Lua script evaluated via EVAL.

from core_redis.rate_limits import TokenBucket

limiter = TokenBucket(redis_kwargs={"host": "localhost", "port": 6379})

allowed, tokens = limiter.is_allowed(
    "user_123",
    capacity=100,       # max burst size
    refill_rate=10.0,   # tokens added per second
)
if not allowed:
    print(f"Rate limited —> {tokens} tokens available")
else:
    print(f"Allowed —> {tokens} tokens remaining")

Variable-cost operations are supported via tokens_per_request:

# A bulk export costs 10 tokens; a lightweight read costs 1
allowed, tokens = limiter.is_allowed(
    "user_123", capacity=100, refill_rate=10.0, tokens_per_request=10
)

Note

Trade-offs vs. SlidingWindowLog

  • Burst-friendly: up to capacity requests can fire instantly before throttling begins; SlidingWindowLog spreads the budget evenly across the window.

  • Memory: one hash with two fields per identifier regardless of request volume; SlidingWindowLog stores one entry per request.

  • Round-trips: one HGETALL read + one HSET + EXPIRE pipeline write per allowed request; zero writes when blocked.

LeakyBucket#

Maintains a virtual queue per identifier in a Redis hash. Incoming requests fill the queue; the queue drains at a fixed leak_rate requests per second regardless of arrival rate. A request is accepted when the queue has room; otherwise it is rejected immediately. Unlike TokenBucket, the output rate is strictly constant: bursts are absorbed into the queue and processed at the leak rate, never served faster.

Returns a (allowed, available) tuple where available is the number of free queue slots after this request (0 when blocked).

class core_redis.rate_limits.LeakyBucket(key_prefix: str = 'rate_limit:leaky_bucket:', redis_kwargs: Dict[str, Any] | None = None)[source]#

Leaky-bucket rate limiter backed by Redis.

Incoming requests fill a virtual queue (the “bucket”). The queue drains at a fixed leak_rate requests per second regardless of how fast requests arrive. A request is accepted when the queue has room; otherwise it is rejected immediately. The output rate is strictly constant — unlike TokenBucket, bursts are not served immediately but are absorbed into the queue and processed at the leak rate.

Characteristics

  • Strictly constant output rate — downstream systems receive requests at exactly leak_rate per second, making it ideal for protecting third-party APIs with hard per-second quotas.

  • Burst absorption — short bursts are accepted up to capacity and queued rather than dropped; excess beyond capacity is rejected.

  • No burst acceleration — unlike Token Bucket, a full bucket does not let a burst through faster than leak_rate.

Redis storage

Each identifier maps to a Redis hash with two fields:

  • queue_size: current number of requests in the queue (float string).

  • last_leak: Unix timestamp of the last write (float string).

The hash TTL is set to ceil(capacity / leak_rate) — the time it takes to fully drain a filled queue. If the key expires (the identifier has been idle that long) the next request initialises a fresh empty queue.

Parameters:
  • key_prefix – String prepended to every Redis key. Default: "rate_limit:leaky_bucket:".

  • redis_kwargs – Keyword arguments forwarded verbatim to RedisClient (e.g. {"host": "localhost", "port": 6379, "db": 0}).

Example

from core_redis.rate_limits import LeakyBucket

limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379})

allowed, available = limiter.is_allowed(
    "user_123", capacity=100, leak_rate=10.0
)
__init__(key_prefix: str = 'rate_limit:leaky_bucket:', redis_kwargs: Dict[str, Any] | None = None) None[source]#
is_allowed(identifier: str, capacity: int = 100, leak_rate: float = 10.0) Tuple[bool, int][source]#

Check whether a request from identifier can be queued.

The queue state is read with HGETALL, leaked requests are drained proportionally to the elapsed time, and — if the queue has room — the new request is enqueued and the state is persisted with an HSET + EXPIRE pipeline.

Parameters:
  • identifier – Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key).

  • capacity – Maximum queue depth (number of requests that can be buffered). Default: 100.

  • leak_rate – Requests drained per second (output rate). Default: 10.0.

Returns:

A (allowed, available) tuple:

  • allowedTrue if the request was accepted into the queue; False if the queue was full and it was dropped.

  • available — remaining free slots in the queue after this request (when allowed), or 0 (when blocked).

Note

The read (HGETALL) and write (HSET + EXPIRE) are two separate operations. A concurrent request between them could produce a marginal over-count under very high contention. For strict correctness, replace the read-modify-write with a Lua script evaluated via EVAL.

from core_redis.rate_limits import LeakyBucket

limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379})

allowed, available = limiter.is_allowed(
    "user_123",
    capacity=100,     # max queue depth
    leak_rate=10.0,   # requests drained per second
)
if not allowed:
    print("Queue full —> retry later")
else:
    print(f"Queued —> {available} slots remaining")

HTTP-guard pattern:

import requests
from core_redis.rate_limits import LeakyBucket

limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379})

def call_api(user_id: str) -> None:
    allowed, available = limiter.is_allowed(user_id, capacity=100, leak_rate=10.0)
    if not allowed:
        print(f"[{user_id}] BLOCKED —> queue full")
        return

    response = requests.get("https://api.example.com/data", timeout=5)
    print(f"[{user_id}] {response.status_code}  ({available} slots remaining)")

Note

Trade-offs vs. TokenBucket

  • Constant output rate: downstream systems receive requests at exactly leak_rate per second; TokenBucket can burst all tokens instantly.

  • No burst acceleration: a full queue is accepted up to capacity and processed steadily; TokenBucket serves stored tokens without delay.

  • Memory: one hash with two fields per identifier, same as TokenBucket.

  • Round-trips: one HGETALL read + one HSET + EXPIRE pipeline write per allowed request; zero writes when blocked.