Source code for core_redis.rate_limits.sliding_window
# -*- coding: utf-8 -*-
"""Sliding-window log rate limiting."""
import os
import time
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple
from core_redis.client import RedisClient
[docs]
class SlidingWindowLog: # pylint: disable=too-few-public-methods
"""
Sliding-window log rate limiter backed by Redis. Each request's
timestamp is stored in a Redis sorted set (score = timestamp,
member = timestamp as string).
On every call, entries older than ``current_time − window`` are
pruned and the remaining count is compared against *limit*. Because
the window moves with each request, there is no fixed boundary to
exploit, the burst problem present in
:class:`~core_redis.rate_limits.fixed_window.FixedWindow` does not occur.
**Trade-offs vs. Fixed Window**
* **Accuracy**: every decision is based on the exact request history for
the last *window* seconds; there are no boundary artifacts.
* **Memory**: up to *limit* entries are stored per identifier instead of a
single counter, so memory usage scales with *limit*.
* **Writes on every request**: allowed requests write a new sorted-set
member; blocked requests still perform a ``ZREMRANGEBYSCORE + ZCARD``
read.
:param key_prefix:
String prepended to every Redis key.
Default: ``"rate_limit:sliding:"``.
:param redis_kwargs:
Keyword arguments forwarded verbatim to
:class:`~core_redis.client.RedisClient`
(e.g. ``{"host": "localhost", "port": 6379, "db": 0}``).
Example:
.. code-block:: python
from core_redis.rate_limits import SlidingWindowLog
limiter = SlidingWindowLog(redis_kwargs={"host": "localhost", "port": 6379})
allowed, remaining = limiter.is_allowed("user_123", limit=100, window=60)
"""
[docs]
def __init__(
self,
key_prefix: str = "rate_limit:sliding:",
redis_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
self._key_prefix = key_prefix
self._client = RedisClient(**(redis_kwargs or {}))
[docs]
def is_allowed(
self,
identifier: str,
limit: int = 100,
window: int = 60,
) -> Tuple[bool, int]:
"""
Check whether a request from *identifier* is within the
rate limit. Stale entries (older than ``current_time − window``) are
pruned first, then the remaining count is evaluated against *limit*.
If the request is allowed, the current timestamp is appended to
the sorted set and the key TTL is refreshed.
:param identifier:
Unique key for the subject being rate-limited (e.g. a user ID,
IP address, or API key).
:param limit:
Maximum number of requests allowed within the rolling window.
Default: ``100``.
:param window:
Rolling window duration in seconds. Default: ``60``.
:returns:
A ``(allowed, remaining)`` tuple:
* **allowed**: ``True`` if the request is within the limit,
``False`` if it should be rejected.
* **remaining**: number of requests still available in the current
window (``0`` when the request is blocked).
.. note::
``ZREMRANGEBYSCORE`` and ``ZCARD`` are issued in one pipeline batch
to prune and count in a single round-trip. The conditional
``ZADD + EXPIRE`` is a second pipeline batch executed only when the
request is allowed. For strict atomicity under very high
concurrency, replace both pipelines with a single Lua script
evaluated via ``EVAL``.
"""
current_time = time.time()
window_start = current_time - window
key = f"{self._key_prefix}{identifier}"
pipe = self._client.client.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
_, count = pipe.execute()
if count < limit:
pipe = self._client.client.pipeline()
member = f"{current_time}:{os.urandom(4).hex()}"
pipe.zadd(key, {member: current_time})
pipe.expire(key, window)
pipe.execute()
return True, limit - count - 1
return False, 0