Rate Limiters =============================================================================== Rate-limiting algorithms that count requests in Redis and reject traffic once a threshold is reached. All algorithms live in ``core_redis.rate_limits`` and share a common pattern: - Each limiter is instantiated once (e.g. at module level or in a DI container) with a ``key_prefix`` and optional ``redis_kwargs``. - ``is_allowed(identifier, ...)`` is called on every incoming request. - The identifier can be any string: a user ID, IP address, API key, etc. .. code-block:: python from core_redis.rate_limits import FixedWindow # or any other algorithm limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379}) allowed = limiter.is_allowed("user_123", limit=100, window=60) Algorithm comparison ------------------------------------------------------------------------------- +------------------+--------------------+-------------------------+-----------+---------------------------+ | Algorithm | Memory / user | Burst handling | Accuracy | Best for | +==================+====================+=========================+===========+===========================+ | FixedWindow | 1 counter | Poor (2x at boundary) | Low | Simple use cases | +------------------+--------------------+-------------------------+-----------+---------------------------+ | SlidingWindowLog | N timestamps | Excellent (no burst) | Very high | Precise control | +------------------+--------------------+-------------------------+-----------+---------------------------+ | TokenBucket | 2 values | Controlled (up to cap) | High | Production APIs | +------------------+--------------------+-------------------------+-----------+---------------------------+ | LeakyBucket | 1 queue | Smoothed (queue + drop) | High | Constant output rate | +------------------+--------------------+-------------------------+-----------+---------------------------+ FixedWindow ------------------------------------------------------------------------------- Divides time into fixed-size buckets and tracks a request counter per bucket. A request is allowed while the counter is within *limit*; once the bucket rolls over the counter resets. .. autoclass:: core_redis.rate_limits.FixedWindow :members: :special-members: __init__ .. code-block:: python from core_redis.rate_limits import FixedWindow limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379}) allowed = limiter.is_allowed("user_123", limit=100, window=60) The counter is incremented and the TTL is set in a single Redis pipeline call (``INCR`` + ``EXPIRE``), keeping round-trips to one per request. A common pattern is to guard outbound HTTP calls so a client never exceeds an upstream API's rate limit: .. code-block:: python import requests from core_redis.rate_limits import FixedWindow limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379}) def call_api(user_id: str) -> None: if not limiter.is_allowed(user_id, limit=100, window=60): print(f"[{user_id}] BLOCKED —> rate limit exceeded") return response = requests.get("https://api.example.com/data", timeout=5) print(f"[{user_id}] {response.status_code}") .. warning:: **Burst problem**: because the window boundary is a hard reset, a client can send up to ``2 × limit`` requests in rapid succession by timing them around the window edge (*limit* at the end of window N, then *limit* at the start of window N+1). If smooth traffic enforcement is required, use a sliding-window or token-bucket algorithm instead. SlidingWindowLog ------------------------------------------------------------------------------- Stores a timestamp for every request in a Redis sorted set. On each call, entries older than ``now - window`` are pruned before counting, so the window always reflects exactly the last *window* seconds and the burst problem does not occur. Returns a ``(allowed, remaining)`` tuple so callers know how many slots are left without a second round-trip. .. autoclass:: core_redis.rate_limits.SlidingWindowLog :members: :special-members: __init__ .. code-block:: python from core_redis.rate_limits import SlidingWindowLog limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379}) allowed, remaining = limiter.is_allowed("user_123", limit=100, window=60) if not allowed: print("Rate limit exceeded") else: print(f"{remaining} requests remaining in this window") HTTP-guard pattern: .. code-block:: python import requests from core_redis.rate_limits import SlidingWindowLog limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379}) def call_api(user_id: str) -> None: allowed, remaining = limiter.is_allowed(user_id, limit=100, window=60) if not allowed: print(f"[{user_id}] BLOCKED —> rate limit exceeded") return response = requests.get("https://api.example.com/data", timeout=5) print(f"[{user_id}] {response.status_code} ({remaining} remaining)") .. note:: **Trade-offs vs. FixedWindow** * **Accuracy**: no boundary artefacts; any *window*-second period contains at most *limit* requests. * **Memory**: stores up to *limit* timestamps per identifier instead of a single counter. * **Round-trips**: two pipeline batches per allowed request (``ZREMRANGEBYSCORE + ZCARD``, then ``ZADD + EXPIRE``); one batch for blocked requests. TokenBucket ------------------------------------------------------------------------------- Maintains a virtual token bucket per identifier in a Redis hash. Tokens refill continuously at *refill_rate* per second up to *capacity*. Each request consumes *tokens_per_request* tokens. A request is allowed when the bucket has enough tokens; otherwise it is rejected. Returns a ``(allowed, available_tokens)`` tuple. .. autoclass:: core_redis.rate_limits.TokenBucket :members: :special-members: __init__ .. code-block:: python from core_redis.rate_limits import TokenBucket limiter = TokenBucket(redis_kwargs={"host": "localhost", "port": 6379}) allowed, tokens = limiter.is_allowed( "user_123", capacity=100, # max burst size refill_rate=10.0, # tokens added per second ) if not allowed: print(f"Rate limited —> {tokens} tokens available") else: print(f"Allowed —> {tokens} tokens remaining") Variable-cost operations are supported via *tokens_per_request*: .. code-block:: python # A bulk export costs 10 tokens; a lightweight read costs 1 allowed, tokens = limiter.is_allowed( "user_123", capacity=100, refill_rate=10.0, tokens_per_request=10 ) .. note:: **Trade-offs vs. SlidingWindowLog** * **Burst-friendly**: up to *capacity* requests can fire instantly before throttling begins; ``SlidingWindowLog`` spreads the budget evenly across the window. * **Memory**: one hash with two fields per identifier regardless of request volume; ``SlidingWindowLog`` stores one entry per request. * **Round-trips**: one ``HGETALL`` read + one ``HSET + EXPIRE`` pipeline write per allowed request; zero writes when blocked. LeakyBucket ------------------------------------------------------------------------------- Maintains a virtual queue per identifier in a Redis hash. Incoming requests fill the queue; the queue drains at a fixed *leak_rate* requests per second regardless of arrival rate. A request is accepted when the queue has room; otherwise it is rejected immediately. Unlike ``TokenBucket``, the output rate is strictly constant: bursts are absorbed into the queue and processed at the leak rate, never served faster. Returns a ``(allowed, available)`` tuple where *available* is the number of free queue slots after this request (``0`` when blocked). .. autoclass:: core_redis.rate_limits.LeakyBucket :members: :special-members: __init__ .. code-block:: python from core_redis.rate_limits import LeakyBucket limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379}) allowed, available = limiter.is_allowed( "user_123", capacity=100, # max queue depth leak_rate=10.0, # requests drained per second ) if not allowed: print("Queue full —> retry later") else: print(f"Queued —> {available} slots remaining") HTTP-guard pattern: .. code-block:: python import requests from core_redis.rate_limits import LeakyBucket limiter = LeakyBucket(redis_kwargs={"host": "localhost", "port": 6379}) def call_api(user_id: str) -> None: allowed, available = limiter.is_allowed(user_id, capacity=100, leak_rate=10.0) if not allowed: print(f"[{user_id}] BLOCKED —> queue full") return response = requests.get("https://api.example.com/data", timeout=5) print(f"[{user_id}] {response.status_code} ({available} slots remaining)") .. note:: **Trade-offs vs. TokenBucket** * **Constant output rate**: downstream systems receive requests at exactly *leak_rate* per second; ``TokenBucket`` can burst all tokens instantly. * **No burst acceleration**: a full queue is accepted up to *capacity* and processed steadily; ``TokenBucket`` serves stored tokens without delay. * **Memory**: one hash with two fields per identifier, same as ``TokenBucket``. * **Round-trips**: one ``HGETALL`` read + one ``HSET + EXPIRE`` pipeline write per allowed request; zero writes when blocked.