Revert "Added rate limiter queue for telegram"

This reverts commit 6a4409d625.
This commit is contained in:
Alishahryar1 2026-01-29 23:16:01 -08:00
parent bd0caee25f
commit 6eca7377ce
6 changed files with 1 additions and 163 deletions

View file

@ -41,7 +41,6 @@ async def lifespan(app: FastAPI):
from messaging.telegram import TelegramPlatform
from messaging.handler import ClaudeMessageHandler
from messaging.session import SessionStore
from messaging.rate_limited_platform import RateLimitedPlatform
from cli.manager import CLISessionManager
@ -71,15 +70,10 @@ async def lifespan(app: FastAPI):
)
# Create Telegram platform
raw_platform = TelegramPlatform(
messaging_platform = TelegramPlatform(
session_path=os.path.join(data_path, "claude_bot.session")
)
# Wrap with rate limiter
messaging_platform = RateLimitedPlatform(
raw_platform, rps=settings.telegram_rps
)
# Create and register message handler
message_handler = ClaudeMessageHandler(
platform=messaging_platform,

View file

@ -64,7 +64,6 @@ class Settings(BaseSettings):
telegram_api_id: Optional[str] = None
telegram_api_hash: Optional[str] = None
allowed_telegram_user_id: Optional[str] = None
telegram_rps: float = 1.0
claude_workspace: str = "./agent_workspace"
allowed_dir: str = ""
max_cli_sessions: int = 10

View file

@ -1,80 +0,0 @@
import asyncio
import logging
from typing import Optional, Callable, Awaitable
from .base import MessagingPlatform
from .models import IncomingMessage
from .rate_limiter import GlobalRateLimiter
logger = logging.getLogger(__name__)
class RateLimitedPlatform(MessagingPlatform):
"""
A wrapper around MessagingPlatform that ensures outgoing messages
are sent according to the global rate limit.
"""
def __init__(self, platform: MessagingPlatform, rps: float = 1.0):
self._platform = platform
# RateLimitQueue expects calls and period (per).
# For simplicity, we use 1 call per (1/rps) window if rps < 1,
# or int(rps) calls per 1.0s window.
if rps >= 1.0:
self._limiter = GlobalRateLimiter(calls=int(rps), period=1.0)
else:
self._limiter = GlobalRateLimiter(calls=1, period=1.0 / rps)
logger.info(f"RateLimitedPlatform initialized with RPS={rps}")
@property
def name(self) -> str:
return f"rate_limited_{self._platform.name}"
async def start(self) -> None:
await self._platform.start()
async def stop(self) -> None:
self._limiter.stop()
await self._platform.stop()
async def send_message(
self,
chat_id: str,
text: str,
reply_to: Optional[str] = None,
parse_mode: Optional[str] = None,
) -> str:
"""Queues the message for sending."""
async def _send():
return await self._platform.send_message(
chat_id, text, reply_to, parse_mode
)
return await self._limiter.execute(_send)
async def edit_message(
self,
chat_id: str,
message_id: str,
text: str,
parse_mode: Optional[str] = None,
) -> None:
"""Queues the message for editing."""
async def _edit():
return await self._platform.edit_message(
chat_id, message_id, text, parse_mode
)
await self._limiter.execute(_edit)
def on_message(
self,
handler: Callable[[IncomingMessage], Awaitable[None]],
) -> None:
self._platform.on_message(handler)
@property
def is_connected(self) -> bool:
return self._platform.is_connected

View file

@ -1,66 +0,0 @@
import asyncio
import logging
import threading
from typing import Callable, Any, TypeVar, Coroutine
from ratelimitqueue import RateLimitQueue
logger = logging.getLogger(__name__)
T = TypeVar("T")
class GlobalRateLimiter:
"""
A global rate limiter that ensures operations are executed at a specific rate.
Uses ratelimitqueue.RateLimitQueue for the heavy lifting.
Since RateLimitQueue is thread-based/blocking, we run a worker in a separate thread
to avoid blocking the asyncio event loop.
"""
def __init__(self, calls: int = 1, period: float = 1.0):
self.queue = RateLimitQueue(calls=calls, per=period)
self._stop_event = threading.Event()
self._worker_thread = threading.Thread(target=self._worker, daemon=True)
self._worker_thread.start()
logger.info(f"GlobalRateLimiter started with {calls} calls per {period}s")
async def execute(self, func: Callable[[], Coroutine[Any, Any, T]]) -> T:
"""
Schedule a function to be executed within the rate limit.
Returns the result of the function.
"""
loop = asyncio.get_running_loop()
future = loop.create_future()
self.queue.put((func, future, loop))
return await future
def _worker(self):
"""Worker thread that pulls from the RateLimitQueue and executes tasks."""
while not self._stop_event.is_set():
try:
# get() blocks based on rate limit
# We don't use timeout here to avoid the skip issue
# instead we depend on the queue blocking
item = self.queue.get(block=True)
task_fn, future, loop = item
# Execute the task
async def run_task(f=task_fn, fut=future, l=loop):
try:
result = await f()
l.call_soon_threadsafe(fut.set_result, result)
except Exception as e:
l.call_soon_threadsafe(fut.set_exception, e)
asyncio.run_coroutine_threadsafe(run_task(), loop)
self.queue.task_done()
except Exception as e:
logger.error(f"Error in GlobalRateLimiter worker: {e}")
time.sleep(0.1)
def stop(self):
"""Stop the background worker."""
self._stop_event.set()
if self._worker_thread.is_alive():
self._worker_thread.join(timeout=1.0)

View file

@ -14,7 +14,6 @@ dependencies = [
"websockets>=13.0",
"telethon>=1.35.0",
"pydantic-settings>=2.12.0",
"ratelimitqueue>=0.2.2",
]
[dependency-groups]

8
uv.lock generated
View file

@ -51,7 +51,6 @@ dependencies = [
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "python-dotenv" },
{ name = "ratelimitqueue" },
{ name = "telethon" },
{ name = "tiktoken" },
{ name = "uvicorn" },
@ -72,7 +71,6 @@ requires-dist = [
{ name = "pydantic", specifier = ">=2.0.0" },
{ name = "pydantic-settings", specifier = ">=2.12.0" },
{ name = "python-dotenv", specifier = ">=1.0.0" },
{ name = "ratelimitqueue", specifier = ">=0.2.2" },
{ name = "telethon", specifier = ">=1.35.0" },
{ name = "tiktoken", specifier = ">=0.7.0" },
{ name = "uvicorn", specifier = ">=0.34.0" },
@ -857,12 +855,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/fa/de/02b54f42487e3d3c6efb3f89428677074ca7bf43aae402517bc7cca949f3/PyYAML-6.0.2-cp313-cp313-win_amd64.whl", hash = "sha256:8388ee1976c416731879ac16da0aff3f63b286ffdd57cdeb95f3f2e085687563", size = 156446, upload-time = "2024-08-06T20:33:04.33Z" },
]
[[package]]
name = "ratelimitqueue"
version = "0.2.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e6/63/c55beb712d49c08024e437d6881dcb69763026cb29236fe966e495ae1b7d/ratelimitqueue-0.2.2.tar.gz", hash = "sha256:ae4123ef5033e2028a52e9857b213cd3e86576161d6a3fccc5e7a75cce229998", size = 4720, upload-time = "2018-08-21T12:22:56.159Z" }
[[package]]
name = "regex"
version = "2026.1.15"