From a3c91484ac29d2e96b64200549113b937c9630bf Mon Sep 17 00:00:00 2001 From: Muhamad Aji Wibisono Date: Mon, 2 Jun 2025 22:02:12 +0700 Subject: [PATCH] feat: thread yielding for bot responsivity --- .../app/connectors/discord_connector.py | 12 +++++------ .../app/tasks/connectors_indexing_tasks.py | 20 ++++++++++++++++--- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/surfsense_backend/app/connectors/discord_connector.py b/surfsense_backend/app/connectors/discord_connector.py index da1ed84..1d5c1fb 100644 --- a/surfsense_backend/app/connectors/discord_connector.py +++ b/surfsense_backend/app/connectors/discord_connector.py @@ -39,6 +39,7 @@ class DiscordConnector(commands.Bot): @self.event async def on_ready(): logger.info(f"Logged in as {self.user} (ID: {self.user.id})") + self._is_running = True @self.event async def on_connect(): @@ -61,17 +62,16 @@ class DiscordConnector(commands.Bot): raise ValueError("Discord bot token not set. Call set_token(token) first.") try: - await asyncio.wait_for(self.start(self.token), timeout=60.0) + if self._is_running: + logger.warning("Bot is already running. Use close_bot() to stop it before starting again.") + return + + await self.start(self.token) logger.info("Discord bot started successfully.") except discord.LoginFailure: logger.error("Failed to log in: Invalid token was provided. Please check your bot token.") self._is_running = False raise - except asyncio.TimeoutError: - logger.error("Timed out while trying to connect to Discord. " - "This might indicate network issues or an invalid token.") - self._is_running = False - raise except discord.PrivilegedIntentsRequired as e: logger.error(f"Privileged Intents Required: {e}. Make sure all required intents are enabled in your bot's application page.") self._is_running = False diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index 0331725..3904287 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -15,6 +15,7 @@ from app.connectors.discord_connector import DiscordConnector from slack_sdk.errors import SlackApiError import logging import asyncio +from concurrent.futures import ThreadPoolExecutor from app.utils.document_converters import generate_content_hash @@ -1091,12 +1092,25 @@ async def index_discord_messages( summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance summary_result = await summary_chain.ainvoke({"document": combined_document_string}) summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_embedding = await asyncio.to_thread( + config.embedding_model_instance.embed, summary_content + ) # Process chunks + raw_chunks = await asyncio.to_thread( + config.chunker_instance.chunk, + channel_content + ) + + chunk_texts = [chunk.text for chunk in raw_chunks if chunk.text.strip()] + chunk_embeddings = await asyncio.to_thread( + lambda texts: [config.embedding_model_instance.embed(t) for t in texts], + chunk_texts + ) + chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) - for chunk in config.chunker_instance.chunk(channel_content) + Chunk(content=raw_chunk.text, embedding=embedding) + for raw_chunk, embedding in zip(raw_chunks, chunk_embeddings) ] # Create and store new document