Source code for core_redis.rate_limits.sliding_window

# -*- coding: utf-8 -*-

"""Sliding-window log rate limiting."""

import os
import time
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple

from core_redis.client import RedisClient


[docs] class SlidingWindowLog: # pylint: disable=too-few-public-methods """ Sliding-window log rate limiter backed by Redis. Each request's timestamp is stored in a Redis sorted set (score = timestamp, member = timestamp as string). On every call, entries older than ``current_time − window`` are pruned and the remaining count is compared against *limit*. Because the window moves with each request, there is no fixed boundary to exploit, the burst problem present in :class:`~core_redis.rate_limits.fixed_window.FixedWindow` does not occur. **Trade-offs vs. Fixed Window** * **Accuracy**: every decision is based on the exact request history for the last *window* seconds; there are no boundary artifacts. * **Memory**: up to *limit* entries are stored per identifier instead of a single counter, so memory usage scales with *limit*. * **Writes on every request**: allowed requests write a new sorted-set member; blocked requests still perform a ``ZREMRANGEBYSCORE + ZCARD`` read. :param key_prefix: String prepended to every Redis key. Default: ``"rate_limit:sliding:"``. :param redis_kwargs: Keyword arguments forwarded verbatim to :class:`~core_redis.client.RedisClient` (e.g. ``{"host": "localhost", "port": 6379, "db": 0}``). Example: .. code-block:: python from core_redis.rate_limits import SlidingWindowLog limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379}) allowed, remaining = limiter.is_allowed("user_123", limit=100, window=60) """
[docs] def __init__( self, key_prefix: str = "rate_limit:sliding:", redis_kwargs: Optional[Dict[str, Any]] = None, ) -> None: self._key_prefix = key_prefix self._client = RedisClient(**(redis_kwargs or {}))
[docs] def is_allowed( self, identifier: str, limit: int = 100, window: int = 60, ) -> Tuple[bool, int]: """ Check whether a request from *identifier* is within the rate limit. Stale entries (older than ``current_time − window``) are pruned first, then the remaining count is evaluated against *limit*. If the request is allowed, the current timestamp is appended to the sorted set and the key TTL is refreshed. :param identifier: Unique key for the subject being rate-limited (e.g. a user ID, IP address, or API key). :param limit: Maximum number of requests allowed within the rolling window. Default: ``100``. :param window: Rolling window duration in seconds. Default: ``60``. :returns: A ``(allowed, remaining)`` tuple: * **allowed**: ``True`` if the request is within the limit, ``False`` if it should be rejected. * **remaining**: number of requests still available in the current window (``0`` when the request is blocked). .. note:: ``ZREMRANGEBYSCORE`` and ``ZCARD`` are issued in one pipeline batch to prune and count in a single round-trip. The conditional ``ZADD + EXPIRE`` is a second pipeline batch executed only when the request is allowed. For strict atomicity under very high concurrency, replace both pipelines with a single Lua script evaluated via ``EVAL``. """ current_time = time.time() window_start = current_time - window key = f"{self._key_prefix}{identifier}" pipe = self._client.client.pipeline() pipe.zremrangebyscore(key, 0, window_start) pipe.zcard(key) _, count = pipe.execute() if count < limit: pipe = self._client.client.pipeline() member = f"{current_time}:{os.urandom(4).hex()}" pipe.zadd(key, {member: current_time}) pipe.expire(key, window) pipe.execute() return True, limit - count - 1 return False, 0