free-claude-code/core/rate_limit.py
Alishahryar1 f3a7528d49
Some checks are pending
CI / checks (push) Waiting to run
Major refactor: API, providers, messaging, and Anthropic protocol
Consolidates the incremental refactor work into a single change set: modular web tools (api/web_tools), native Anthropic request building and SSE block policy, OpenAI conversion and error handling, provider transports and rate limiting, messaging handler and tree queue, safe logging, smoke tests, and broad test coverage.
2026-04-26 03:01:14 -07:00

60 lines
1.7 KiB
Python

"""Shared strict sliding-window rate limiting primitives."""
from __future__ import annotations
import asyncio
import time
from collections import deque
class StrictSlidingWindowLimiter:
"""Strict sliding window limiter.
Guarantees: at most ``rate_limit`` acquisitions in any interval of length
``rate_window`` (seconds).
Implemented as an async context manager so call sites can do::
async with limiter:
...
"""
def __init__(self, rate_limit: int, rate_window: float) -> None:
if rate_limit <= 0:
raise ValueError("rate_limit must be > 0")
if rate_window <= 0:
raise ValueError("rate_window must be > 0")
self._rate_limit = int(rate_limit)
self._rate_window = float(rate_window)
self._times: deque[float] = deque()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
while True:
wait_time = 0.0
async with self._lock:
now = time.monotonic()
cutoff = now - self._rate_window
while self._times and self._times[0] <= cutoff:
self._times.popleft()
if len(self._times) < self._rate_limit:
self._times.append(now)
return
oldest = self._times[0]
wait_time = max(0.0, (oldest + self._rate_window) - now)
if wait_time > 0:
await asyncio.sleep(wait_time)
else:
await asyncio.sleep(0)
async def __aenter__(self) -> StrictSlidingWindowLimiter:
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb) -> bool:
return False