Rate Limiters#
Rate-limiting algorithms that count requests in Redis and reject traffic once
a threshold is reached. All algorithms live in core_redis.rate_limits and
share a common pattern:
Each limiter is instantiated once (e.g. at module level or in a DI container) with a
key_prefixand optionalredis_kwargs.is_allowed(identifier, ...)is called on every incoming request.The identifier can be any string: a user ID, IP address, API key, etc.
from core_redis.rate_limits import FixedWindow # or any other algorithm
limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})
allowed = limiter.is_allowed("user_123", limit=100, window=60)
Algorithm comparison#
Algorithm |
Memory / user |
Burst handling |
Accuracy |
Best for |
|---|---|---|---|---|
FixedWindow |
1 counter |
Poor (2x at boundary) |
Low |
Simple use cases |
SlidingWindowLog |
N timestamps |
Excellent (no burst) |
Very high |
Precise control |
TokenBucket |
2 values |
Controlled (up to cap) |
High |
Production APIs |
LeakyBucket |
1 queue |
Smoothed (queue + drop) |
High |
Constant output rate |
FixedWindow#
Divides time into fixed-size buckets and tracks a request counter per bucket. A request is allowed while the counter is within limit; once the bucket rolls over the counter resets.
- class core_redis.rate_limits.FixedWindow(key_prefix: str = 'rate_limit:fixed:', redis_kwargs: Dict[str, Any] | None = None)[source]#
Fixed-window rate limiter backed by Redis. The time axis is divided into fixed-size buckets (windows). Each bucket has its own counter stored in Redis. A request is allowed when the counter for the current bucket has not yet reached limit; otherwise it is rejected until the bucket rolls over.
Burst problem
Because the window boundary is a hard reset, a client can issue up to 2 × limit requests in a short period by timing requests around the boundary:
In the last second of window N send limit requests → all allowed.
In the first second of window N + 1 send limit requests → all allowed.
Both bursts together happen within
~2seconds even though the nominal rate is limit requests per window seconds. If smooth traffic is required, prefer a sliding-window or token-bucket algorithm instead.- Parameters:
key_prefix – String prepended to every Redis key. Default:
"rate_limit:fixed:".redis_kwargs – Keyword arguments forwarded verbatim to
RedisClient(e.g.{"host": "localhost", "port": 6379, "db": 0}).
Example
from core_redis.rate_limits import FixedWindow limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379}) # Returns True while the counter is within the limit, False once exceeded. allowed = limiter.is_allowed("user_123", limit=100, window=60)
- __init__(key_prefix: str = 'rate_limit:fixed:', redis_kwargs: Dict[str, Any] | None = None) None[source]#
- is_allowed(identifier: str, limit: int = 100, window: int = 60) bool[source]#
Check whether a request from identifier is within the rate limit. The counter for the current fixed window is atomically incremented via a Redis pipeline. On the first increment the key is given a TTL equal to window so Redis cleans it up automatically once the window closes.
- Parameters:
identifier – Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key).
limit – Maximum number of requests allowed within a single window. Default:
100.window – Window duration in seconds. Default:
60.
- Returns:
Trueif the request is within the limit and should be allowed;Falseif the limit has been exceeded.
Note
INCRandEXPIREare dispatched in a single pipeline batch to minimize round-trips. The operations are not wrapped in a Lua script, so in the unlikely event of a crash between the two commands the key may persist without a TTL. In production deployments with strict correctness requirements, consider replacing the pipeline with a Lua script evaluated viaEVAL.
from core_redis.rate_limits import FixedWindow
limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})
allowed = limiter.is_allowed("user_123", limit=100, window=60)
The counter is incremented and the TTL is set in a single Redis pipeline call
(INCR + EXPIRE), keeping round-trips to one per request.
A common pattern is to guard outbound HTTP calls so a client never exceeds an upstream API’s rate limit:
import requests
from core_redis.rate_limits import FixedWindow
limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379})
def call_api(user_id: str) -> None:
if not limiter.is_allowed(user_id, limit=100, window=60):
print(f"[{user_id}] BLOCKED —> rate limit exceeded")
return
response = requests.get("https://api.example.com/data", timeout=5)
print(f"[{user_id}] {response.status_code}")
Warning
Burst problem: because the window boundary is a hard reset, a client
can send up to 2 × limit requests in rapid succession by timing them
around the window edge (limit at the end of window N, then limit at the
start of window N+1). If smooth traffic enforcement is required, use a
sliding-window or token-bucket algorithm instead.
SlidingWindowLog#
Stores a timestamp for every request in a Redis sorted set. On each call,
entries older than now - window are pruned before counting, so the window
always reflects exactly the last window seconds and the burst problem does
not occur.
Returns a (allowed, remaining) tuple so callers know how many slots are
left without a second round-trip.
- class core_redis.rate_limits.SlidingWindowLog(key_prefix: str = 'rate_limit:sliding:', redis_kwargs: Dict[str, Any] | None = None)[source]#
Sliding-window log rate limiter backed by Redis. Each request’s timestamp is stored in a Redis sorted set (score = timestamp, member = timestamp as string).
On every call, entries older than
current_time − windoware pruned and the remaining count is compared against limit. Because the window moves with each request, there is no fixed boundary to exploit, the burst problem present inFixedWindowdoes not occur.Trade-offs vs. Fixed Window
Accuracy: every decision is based on the exact request history for the last window seconds; there are no boundary artifacts.
Memory: up to limit entries are stored per identifier instead of a single counter, so memory usage scales with limit.
Writes on every request: allowed requests write a new sorted-set member; blocked requests still perform a
ZREMRANGEBYSCORE + ZCARDread.
- Parameters:
key_prefix – String prepended to every Redis key. Default:
"rate_limit:sliding:".redis_kwargs – Keyword arguments forwarded verbatim to
RedisClient(e.g.{"host": "localhost", "port": 6379, "db": 0}).
Example
from core_redis.rate_limits import SlidingWindowLog limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379}) allowed, remaining = limiter.is_allowed("user_123", limit=100, window=60)
- __init__(key_prefix: str = 'rate_limit:sliding:', redis_kwargs: Dict[str, Any] | None = None) None[source]#
- is_allowed(identifier: str, limit: int = 100, window: int = 60) Tuple[bool, int][source]#
Check whether a request from identifier is within the rate limit. Stale entries (older than
current_time − window) are pruned first, then the remaining count is evaluated against limit.If the request is allowed, the current timestamp is appended to the sorted set and the key TTL is refreshed.
- Parameters:
identifier – Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key).
limit – Maximum number of requests allowed within the rolling window. Default:
100.window – Rolling window duration in seconds. Default:
60.
- Returns:
A
(allowed, remaining)tuple:allowed:
Trueif the request is within the limit,Falseif it should be rejected.remaining: number of requests still available in the current window (
0when the request is blocked).
Note
ZREMRANGEBYSCOREandZCARDare issued in one pipeline batch to prune and count in a single round-trip. The conditionalZADD + EXPIREis a second pipeline batch executed only when the request is allowed. For strict atomicity under very high concurrency, replace both pipelines with a single Lua script evaluated viaEVAL.
from core_redis.rate_limits import SlidingWindowLog
limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379})
allowed, remaining = limiter.is_allowed("user_123", limit=100, window=60)
if not allowed:
print("Rate limit exceeded")
else:
print(f"{remaining} requests remaining in this window")
HTTP-guard pattern:
import requests
from core_redis.rate_limits import SlidingWindowLog
limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379})
def call_api(user_id: str) -> None:
allowed, remaining = limiter.is_allowed(user_id, limit=100, window=60)
if not allowed:
print(f"[{user_id}] BLOCKED —> rate limit exceeded")
return
response = requests.get("https://api.example.com/data", timeout=5)
print(f"[{user_id}] {response.status_code} ({remaining} remaining)")
Note
Trade-offs vs. FixedWindow
Accuracy: no boundary artefacts; any window-second period contains at most limit requests.
Memory: stores up to limit timestamps per identifier instead of a single counter.
Round-trips: two pipeline batches per allowed request (
ZREMRANGEBYSCORE + ZCARD, thenZADD + EXPIRE); one batch for blocked requests.
TokenBucket#
Maintains a virtual token bucket per identifier in a Redis hash. Tokens refill continuously at refill_rate per second up to capacity. Each request consumes tokens_per_request tokens. A request is allowed when the bucket has enough tokens; otherwise it is rejected.
Returns a (allowed, available_tokens) tuple.
- class core_redis.rate_limits.TokenBucket(key_prefix: str = 'rate_limit:token_bucket:', redis_kwargs: Dict[str, Any] | None = None)[source]#
Token-bucket rate limiter backed by Redis. A virtual bucket holds up to capacity tokens. Each request consumes tokens_per_request tokens. Tokens are replenished continuously at refill_rate tokens per second up to capacity. A request is allowed when the bucket contains enough tokens; otherwise it is rejected.
Characteristics
Burst-friendly: the bucket can absorb a short burst of up to capacity requests before throttling begins, unlike fixed or sliding windows that spread the budget evenly.
Smooth long-term rate: over time, throughput converges to refill_rate requests/second regardless of burst patterns.
Variable cost: tokens_per_request lets different operations consume different amounts (e.g. a bulk export costs 10 tokens, a lightweight read costs 1).
Redis storage
Each identifier maps to a Redis hash with two fields:
tokens: current token count (float string).last_refill: Unix timestamp of the last write (float string).
The hash TTL is set to
ceil(capacity / refill_rate), the time it takes to fully refill an empty bucket. If the key expires (the identifier has been idle that long) the next request re-initializes the bucket as full, which is the correct behavior.- Parameters:
key_prefix – String prepended to every Redis key. Default:
"rate_limit:token_bucket:".redis_kwargs – Keyword arguments forwarded verbatim to
RedisClient(e.g.{"host": "localhost", "port": 6379, "db": 0}).
Example
from core_redis.rate_limits import TokenBucket limiter = TokenBucket(redis_kwargs={"host": "localhost", "port": 6379}) allowed, tokens = limiter.is_allowed( "user_123", capacity=100, refill_rate=10.0 )
- __init__(key_prefix: str = 'rate_limit:token_bucket:', redis_kwargs: Dict[str, Any] | None = None) None[source]#
- is_allowed(identifier: str, capacity: int = 100, refill_rate: float = 10.0, tokens_per_request: int = 1) Tuple[bool, int][source]#
Check whether a request from identifier can be served.
The bucket state is read with
HGETALL, tokens are refilled proportionally to the elapsed time since the last write, and if enough tokens are available, the updated state is persisted with aHSET + EXPIREpipeline.- Parameters:
identifier – Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key).
capacity – Maximum number of tokens the bucket can hold (burst ceiling). Default:
100.refill_rate – Tokens added per second. Long-term throughput converges to this value. Default:
10.0.tokens_per_request – Tokens consumed by this request. Use values greater than
1for expensive operations. Default:1.
- Returns:
A
(allowed, available_tokens)tuple:allowed:
Trueif the bucket had enough tokens and the request is permitted;Falseif it should be rejected.available_tokens: tokens remaining after this request (when allowed) or tokens currently in the bucket (when blocked).
- Raises:
ValueError – If tokens_per_request exceeds capacity (a request that can never be satisfied).
Note
The read (
HGETALL) and write (HSET + EXPIRE) are two separate operations. A concurrent request between them could produce a marginal over-count under very high contention. For strict correctness, replace the read-modify-write with a Lua script evaluated viaEVAL.
from core_redis.rate_limits import TokenBucket
limiter = TokenBucket(redis_kwargs={"host": "localhost", "port": 6379})
allowed, tokens = limiter.is_allowed(
"user_123",
capacity=100, # max burst size
refill_rate=10.0, # tokens added per second
)
if not allowed:
print(f"Rate limited —> {tokens} tokens available")
else:
print(f"Allowed —> {tokens} tokens remaining")
Variable-cost operations are supported via tokens_per_request:
# A bulk export costs 10 tokens; a lightweight read costs 1
allowed, tokens = limiter.is_allowed(
"user_123", capacity=100, refill_rate=10.0, tokens_per_request=10
)
Note
Trade-offs vs. SlidingWindowLog
Burst-friendly: up to capacity requests can fire instantly before throttling begins;
SlidingWindowLogspreads the budget evenly across the window.Memory: one hash with two fields per identifier regardless of request volume;
SlidingWindowLogstores one entry per request.Round-trips: one
HGETALLread + oneHSET + EXPIREpipeline write per allowed request; zero writes when blocked.
LeakyBucket#
Maintains a virtual queue per identifier in a Redis hash. Incoming requests
fill the queue; the queue drains at a fixed leak_rate requests per second
regardless of arrival rate. A request is accepted when the queue has room;
otherwise it is rejected immediately. Unlike TokenBucket, the output rate
is strictly constant: bursts are absorbed into the queue and processed at the
leak rate, never served faster.
Returns a (allowed, available) tuple where available is the number of
free queue slots after this request (0 when blocked).
- class core_redis.rate_limits.LeakyBucket(key_prefix: str = 'rate_limit:leaky_bucket:', redis_kwargs: Dict[str, Any] | None = None)[source]#
Leaky-bucket rate limiter backed by Redis.
Incoming requests fill a virtual queue (the “bucket”). The queue drains at a fixed leak_rate requests per second regardless of how fast requests arrive. A request is accepted when the queue has room; otherwise it is rejected immediately. The output rate is strictly constant — unlike
TokenBucket, bursts are not served immediately but are absorbed into the queue and processed at the leak rate.Characteristics
Strictly constant output rate — downstream systems receive requests at exactly leak_rate per second, making it ideal for protecting third-party APIs with hard per-second quotas.
Burst absorption — short bursts are accepted up to capacity and queued rather than dropped; excess beyond capacity is rejected.
No burst acceleration — unlike Token Bucket, a full bucket does not let a burst through faster than leak_rate.
Redis storage
Each identifier maps to a Redis hash with two fields:
queue_size: current number of requests in the queue (float string).last_leak: Unix timestamp of the last write (float string).
The hash TTL is set to
ceil(capacity / leak_rate)— the time it takes to fully drain a filled queue. If the key expires (the identifier has been idle that long) the next request initialises a fresh empty queue.- Parameters:
key_prefix – String prepended to every Redis key. Default:
"rate_limit:leaky_bucket:".redis_kwargs – Keyword arguments forwarded verbatim to
RedisClient(e.g.{"host": "localhost", "port": 6379, "db": 0}).
Example
from core_redis.rate_limits import LeakyBucket limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379}) allowed, available = limiter.is_allowed( "user_123", capacity=100, leak_rate=10.0 )
- __init__(key_prefix: str = 'rate_limit:leaky_bucket:', redis_kwargs: Dict[str, Any] | None = None) None[source]#
- is_allowed(identifier: str, capacity: int = 100, leak_rate: float = 10.0) Tuple[bool, int][source]#
Check whether a request from identifier can be queued.
The queue state is read with
HGETALL, leaked requests are drained proportionally to the elapsed time, and — if the queue has room — the new request is enqueued and the state is persisted with anHSET + EXPIREpipeline.- Parameters:
identifier – Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key).
capacity – Maximum queue depth (number of requests that can be buffered). Default:
100.leak_rate – Requests drained per second (output rate). Default:
10.0.
- Returns:
A
(allowed, available)tuple:allowed —
Trueif the request was accepted into the queue;Falseif the queue was full and it was dropped.available — remaining free slots in the queue after this request (when allowed), or
0(when blocked).
Note
The read (
HGETALL) and write (HSET + EXPIRE) are two separate operations. A concurrent request between them could produce a marginal over-count under very high contention. For strict correctness, replace the read-modify-write with a Lua script evaluated viaEVAL.
from core_redis.rate_limits import LeakyBucket
limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379})
allowed, available = limiter.is_allowed(
"user_123",
capacity=100, # max queue depth
leak_rate=10.0, # requests drained per second
)
if not allowed:
print("Queue full —> retry later")
else:
print(f"Queued —> {available} slots remaining")
HTTP-guard pattern:
import requests
from core_redis.rate_limits import LeakyBucket
limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379})
def call_api(user_id: str) -> None:
allowed, available = limiter.is_allowed(user_id, capacity=100, leak_rate=10.0)
if not allowed:
print(f"[{user_id}] BLOCKED —> queue full")
return
response = requests.get("https://api.example.com/data", timeout=5)
print(f"[{user_id}] {response.status_code} ({available} slots remaining)")
Note
Trade-offs vs. TokenBucket
Constant output rate: downstream systems receive requests at exactly leak_rate per second;
TokenBucketcan burst all tokens instantly.No burst acceleration: a full queue is accepted up to capacity and processed steadily;
TokenBucketserves stored tokens without delay.Memory: one hash with two fields per identifier, same as
TokenBucket.Round-trips: one
HGETALLread + oneHSET + EXPIREpipeline write per allowed request; zero writes when blocked.