Source code for core_redis.rate_limits.fixed_window

# -*- coding: utf-8 -*-

"""Fixed-window rate limiting."""

import time
from typing import Any
from typing import Dict
from typing import Optional

from core_redis.client import RedisClient


[docs] class FixedWindow: # pylint: disable=too-few-public-methods """ Fixed-window rate limiter backed by Redis. The time axis is divided into fixed-size buckets (windows). Each bucket has its own counter stored in Redis. A request is allowed when the counter for the current bucket has not yet reached *limit*; otherwise it is rejected until the bucket rolls over. **Burst problem** Because the window boundary is a hard reset, a client can issue up to *2 × limit* requests in a short period by timing requests around the boundary: * In the last second of window *N* send *limit* requests → all allowed. * In the first second of window *N + 1* send *limit* requests → all allowed. Both bursts together happen within ``~2`` seconds even though the nominal rate is *limit* requests per *window* seconds. If smooth traffic is required, prefer a sliding-window or token-bucket algorithm instead. :param key_prefix: String prepended to every Redis key. Default: ``"rate_limit:fixed:"``. :param redis_kwargs: Keyword arguments forwarded verbatim to :class:`~core_redis.client.RedisClient` (e.g. ``{"host": "localhost", "port": 6379, "db": 0}``). Example: .. code-block:: python from core_redis.rate_limits import FixedWindow limiter = FixedWindow(redis_kwargs={"host": "localhost", "port": 6379}) # Returns True while the counter is within the limit, False once exceeded. allowed = limiter.is_allowed("user_123", limit=100, window=60) """
[docs] def __init__( self, key_prefix: str = "rate_limit:fixed:", redis_kwargs: Optional[Dict[str, Any]] = None, ) -> None: self._key_prefix = key_prefix self._client = RedisClient(**(redis_kwargs or {}))
[docs] def is_allowed( self, identifier: str, limit: int = 100, window: int = 60, ) -> bool: """ Check whether a request from *identifier* is within the rate limit. The counter for the current fixed window is atomically incremented via a Redis pipeline. On the first increment the key is given a TTL equal to *window* so Redis cleans it up automatically once the window closes. :param identifier: Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key). :param limit: Maximum number of requests allowed within a single window. Default: ``100``. :param window: Window duration in seconds. Default: ``60``. :returns: ``True`` if the request is within the limit and should be allowed; ``False`` if the limit has been exceeded. .. note:: ``INCR`` and ``EXPIRE`` are dispatched in a single pipeline batch to minimize round-trips. The operations are not wrapped in a Lua script, so in the unlikely event of a crash between the two commands the key may persist without a TTL. In production deployments with strict correctness requirements, consider replacing the pipeline with a Lua script evaluated via ``EVAL``. """ window_start = (int(time.time()) // window) * window key = f"{self._key_prefix}{identifier}:{window_start}" pipe = self._client.client.pipeline() pipe.incr(key) pipe.expire(key, window) count, _ = pipe.execute() return count <= limit