mirror of
https://github.com/Alishahryar1/free-claude-code.git
synced 2026-04-26 10:31:07 +00:00
Some checks are pending
CI / checks (push) Waiting to run
Consolidates the incremental refactor work into a single change set: modular web tools (api/web_tools), native Anthropic request building and SSE block policy, OpenAI conversion and error handling, provider transports and rate limiting, messaging handler and tree queue, safe logging, smoke tests, and broad test coverage.
60 lines
1.7 KiB
Python
60 lines
1.7 KiB
Python
"""Shared strict sliding-window rate limiting primitives."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
from collections import deque
|
|
|
|
|
|
class StrictSlidingWindowLimiter:
|
|
"""Strict sliding window limiter.
|
|
|
|
Guarantees: at most ``rate_limit`` acquisitions in any interval of length
|
|
``rate_window`` (seconds).
|
|
|
|
Implemented as an async context manager so call sites can do::
|
|
|
|
async with limiter:
|
|
...
|
|
"""
|
|
|
|
def __init__(self, rate_limit: int, rate_window: float) -> None:
|
|
if rate_limit <= 0:
|
|
raise ValueError("rate_limit must be > 0")
|
|
if rate_window <= 0:
|
|
raise ValueError("rate_window must be > 0")
|
|
|
|
self._rate_limit = int(rate_limit)
|
|
self._rate_window = float(rate_window)
|
|
self._times: deque[float] = deque()
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def acquire(self) -> None:
|
|
while True:
|
|
wait_time = 0.0
|
|
async with self._lock:
|
|
now = time.monotonic()
|
|
cutoff = now - self._rate_window
|
|
|
|
while self._times and self._times[0] <= cutoff:
|
|
self._times.popleft()
|
|
|
|
if len(self._times) < self._rate_limit:
|
|
self._times.append(now)
|
|
return
|
|
|
|
oldest = self._times[0]
|
|
wait_time = max(0.0, (oldest + self._rate_window) - now)
|
|
|
|
if wait_time > 0:
|
|
await asyncio.sleep(wait_time)
|
|
else:
|
|
await asyncio.sleep(0)
|
|
|
|
async def __aenter__(self) -> StrictSlidingWindowLimiter:
|
|
await self.acquire()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb) -> bool:
|
|
return False
|