mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-01 18:19:08 +00:00
feat: added discord indexer
This commit is contained in:
parent
0391c2290e
commit
4b3c662478
4 changed files with 361 additions and 1 deletions
|
@ -0,0 +1,112 @@
|
|||
"""Add DISCORD_CONNECTOR to SearchSourceConnectorType and DocumentType enums
|
||||
|
||||
Revision ID: 9
|
||||
Revises: 8
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "9"
|
||||
down_revision: Union[str, None] = "8"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
# Define the ENUM type name and the new value
|
||||
CONNECTOR_ENUM = "searchsourceconnectortype"
|
||||
CONNECTOR_NEW_VALUE = "DISCORD_CONNECTOR"
|
||||
DOCUMENT_ENUM = "documenttype"
|
||||
DOCUMENT_NEW_VALUE = "DISCORD_CONNECTOR"
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema - add DISCORD_CONNECTOR to connector and document enum."""
|
||||
# Add DISCORD_CONNECTOR to searchsourceconnectortype
|
||||
op.execute(f"ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{CONNECTOR_NEW_VALUE}'")
|
||||
# Add DISCORD_CONNECTOR to documenttype
|
||||
op.execute(f"ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{DOCUMENT_NEW_VALUE}'")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema - remove DISCORD_CONNECTOR from connector and document enum."""
|
||||
|
||||
# Old enum name
|
||||
old_connector_enum_name = f"{CONNECTOR_ENUM}_old"
|
||||
old_document_enum_name = f"{DOCUMENT_ENUM}_old"
|
||||
|
||||
old_connector_values = (
|
||||
"SERPER_API",
|
||||
"TAVILY_API",
|
||||
"LINKUP_API",
|
||||
"SLACK_CONNECTOR",
|
||||
"NOTION_CONNECTOR",
|
||||
"GITHUB_CONNECTOR",
|
||||
"LINEAR_CONNECTOR",
|
||||
)
|
||||
old_document_values = (
|
||||
"EXTENSION",
|
||||
"CRAWLED_URL",
|
||||
"FILE",
|
||||
"SLACK_CONNECTOR",
|
||||
"NOTION_CONNECTOR",
|
||||
"YOUTUBE_VIDEO",
|
||||
"GITHUB_CONNECTOR",
|
||||
"LINEAR_CONNECTOR",
|
||||
)
|
||||
|
||||
old_connector_values_sql = ", ".join([f"'{v}'" for v in old_connector_values])
|
||||
old_document_values_sql = ", ".join([f"'{v}'" for v in old_document_values])
|
||||
|
||||
# Table and column names (adjust if different)
|
||||
connector_table_name = "search_source_connectors"
|
||||
connector_column_name = "connector_type"
|
||||
document_table_name = "documents"
|
||||
document_column_name = "document_type"
|
||||
|
||||
# Connector Enum Downgrade Steps
|
||||
# 1. Rename the current connector enum type
|
||||
op.execute(f"ALTER TYPE {CONNECTOR_ENUM} RENAME TO {old_connector_enum_name}")
|
||||
|
||||
# 2. Create the new connector enum type with the old values
|
||||
op.execute(f"CREATE TYPE {CONNECTOR_ENUM} AS ENUM({old_connector_values_sql})")
|
||||
|
||||
# 3. Update the connector table:
|
||||
op.execute(
|
||||
f"ALTER TABLE {connector_table_name} "
|
||||
f"ALTER COLUMN {connector_column_name} "
|
||||
f"TYPE {CONNECTOR_ENUM} "
|
||||
f"USING {connector_column_name}::text::{CONNECTOR_ENUM}"
|
||||
)
|
||||
|
||||
# 4. Drop the old connector enum type
|
||||
op.execute(f"DROP TYPE {old_connector_enum_name}")
|
||||
|
||||
|
||||
# Document Enum Downgrade Steps
|
||||
# 1. Rename the current document enum type
|
||||
op.execute(f"ALTER TYPE {DOCUMENT_ENUM} RENAME TO {old_document_enum_name}")
|
||||
|
||||
# 2. Create the new document enum type with the old values
|
||||
op.execute(f"CREATE TYPE {DOCUMENT_ENUM} AS ENUM({old_document_values_sql})")
|
||||
|
||||
# 3. Delete rows with the new value from the documents table
|
||||
op.execute(
|
||||
f"DELETE FROM {document_table_name} WHERE {document_column_name}::text = '{DOCUMENT_NEW_VALUE}'"
|
||||
)
|
||||
|
||||
# 4. Alter the document table to use the new enum type (casting old values)
|
||||
op.execute(
|
||||
f"ALTER TABLE {document_table_name} "
|
||||
f"ALTER COLUMN {document_column_name} "
|
||||
f"TYPE {DOCUMENT_ENUM} "
|
||||
f"USING {document_column_name}::text::{DOCUMENT_ENUM}"
|
||||
)
|
||||
|
||||
# 5. Drop the old enum types
|
||||
op.execute(f"DROP TYPE {old_document_enum_name}")
|
||||
|
||||
# ### end Alembic commands ###
|
|
@ -50,6 +50,7 @@ class DocumentType(str, Enum):
|
|||
YOUTUBE_VIDEO = "YOUTUBE_VIDEO"
|
||||
GITHUB_CONNECTOR = "GITHUB_CONNECTOR"
|
||||
LINEAR_CONNECTOR = "LINEAR_CONNECTOR"
|
||||
DISCORD_CONNECTOR = "DISCORD_CONNECTOR"
|
||||
|
||||
class SearchSourceConnectorType(str, Enum):
|
||||
SERPER_API = "SERPER_API" # NOT IMPLEMENTED YET : DON'T REMEMBER WHY : MOST PROBABLY BECAUSE WE NEED TO CRAWL THE RESULTS RETURNED BY IT
|
||||
|
@ -59,6 +60,7 @@ class SearchSourceConnectorType(str, Enum):
|
|||
NOTION_CONNECTOR = "NOTION_CONNECTOR"
|
||||
GITHUB_CONNECTOR = "GITHUB_CONNECTOR"
|
||||
LINEAR_CONNECTOR = "LINEAR_CONNECTOR"
|
||||
DISCORD_CONNECTOR = "DISCORD_CONNECTOR"
|
||||
|
||||
class ChatType(str, Enum):
|
||||
GENERAL = "GENERAL"
|
||||
|
|
|
@ -19,7 +19,7 @@ from app.schemas import SearchSourceConnectorCreate, SearchSourceConnectorUpdate
|
|||
from app.users import current_active_user
|
||||
from app.utils.check_ownership import check_ownership
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues
|
||||
from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues, index_discord_messages
|
||||
from app.connectors.github_connector import GitHubConnector
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
|
@ -378,6 +378,30 @@ async def index_connector_content(
|
|||
background_tasks.add_task(run_linear_indexing_with_new_session, connector_id, search_space_id)
|
||||
response_message = "Linear indexing started in the background."
|
||||
|
||||
elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
|
||||
# Determine the time range that will be indexed
|
||||
if not connector.last_indexed_at:
|
||||
start_date = "365 days ago"
|
||||
else:
|
||||
today = datetime.now().date()
|
||||
if connector.last_indexed_at.date() == today:
|
||||
# If last indexed today, go back 1 day to ensure we don't miss anything
|
||||
start_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
else:
|
||||
start_date = connector.last_indexed_at.strftime("%Y-%m-%d")
|
||||
|
||||
indexing_from = start_date
|
||||
indexing_to = today_str
|
||||
|
||||
# Run indexing in background
|
||||
logger.info(
|
||||
f"Triggering Discord indexing for connector {connector_id} into search space {search_space_id}"
|
||||
)
|
||||
background_tasks.add_task(
|
||||
run_discord_indexing_with_new_session, connector_id, search_space_id
|
||||
)
|
||||
response_message = "Discord indexing started in the background."
|
||||
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
|
@ -577,3 +601,45 @@ async def run_linear_indexing(
|
|||
await session.rollback()
|
||||
logger.error(f"Critical error in run_linear_indexing for connector {connector_id}: {e}", exc_info=True)
|
||||
# Optionally update status in DB to indicate failure
|
||||
|
||||
# Add new helper functions for discord indexing
|
||||
async def run_discord_indexing_with_new_session(
|
||||
connector_id: int,
|
||||
search_space_id: int
|
||||
):
|
||||
"""
|
||||
Create a new session and run the Discord indexing task.
|
||||
This prevents session leaks by creating a dedicated session for the background task.
|
||||
"""
|
||||
async with async_session_maker() as session:
|
||||
await run_discord_indexing(session, connector_id, search_space_id)
|
||||
|
||||
async def run_discord_indexing(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int
|
||||
):
|
||||
"""
|
||||
Background task to run Discord indexing.
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the Discord connector
|
||||
search_space_id: ID of the search space
|
||||
"""
|
||||
try:
|
||||
# Index Discord messages without updating last_indexed_at (we'll do it separately)
|
||||
documents_processed, error_or_warning = await index_discord_messages(
|
||||
session=session,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
update_last_indexed=False # Don't update timestamp in the indexing function
|
||||
)
|
||||
|
||||
# Only update last_indexed_at if indexing was successful (either new docs or updated docs)
|
||||
if documents_processed > 0:
|
||||
await update_connector_last_indexed(session, connector_id)
|
||||
logger.info(f"Discord indexing completed successfully: {documents_processed} documents processed")
|
||||
else:
|
||||
logger.error(f"Discord indexing failed or no documents processed: {error_or_warning}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in background Discord indexing task: {str(e)}")
|
|
@ -11,6 +11,8 @@ from app.connectors.slack_history import SlackHistory
|
|||
from app.connectors.notion_history import NotionHistoryConnector
|
||||
from app.connectors.github_connector import GitHubConnector
|
||||
from app.connectors.linear_connector import LinearConnector
|
||||
from app.connectors.discord_connector import DiscordConnector
|
||||
from discord import DiscordException
|
||||
from slack_sdk.errors import SlackApiError
|
||||
import logging
|
||||
|
||||
|
@ -912,3 +914,181 @@ async def index_linear_issues(
|
|||
await session.rollback()
|
||||
logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True)
|
||||
return 0, f"Failed to index Linear issues: {str(e)}"
|
||||
|
||||
async def index_discord_messages(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
update_last_indexed: bool = True
|
||||
) -> Tuple[int, Optional[str]]:
|
||||
"""
|
||||
Index Discord messages from all accessible channels.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the Discord connector
|
||||
search_space_id: ID of the search space to store documents in
|
||||
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
||||
|
||||
Returns:
|
||||
Tuple containing (number of documents indexed, error message or None)
|
||||
"""
|
||||
try:
|
||||
# Get the connector
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector)
|
||||
.filter(
|
||||
SearchSourceConnector.id == connector_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
|
||||
if not connector:
|
||||
return 0, f"Connector with ID {connector_id} not found or is not a Discord connector"
|
||||
|
||||
# Get the Discord token from the connector config
|
||||
discord_token = connector.config.get("DISCORD_BOT_TOKEN")
|
||||
if not discord_token:
|
||||
return 0, "Discord token not found in connector config"
|
||||
|
||||
# Initialize Discord client
|
||||
discord_client = DiscordConnector(token=discord_token)
|
||||
|
||||
# Calculate date range
|
||||
end_date = datetime.now(timezone.utc)
|
||||
|
||||
# Use last_indexed_at as start date if available, otherwise use 365 days ago
|
||||
if connector.last_indexed_at:
|
||||
start_date = connector.last_indexed_at.replace(tzinfo=timezone.utc)
|
||||
logger.info(f"Using last_indexed_at ({start_date.strftime('%Y-%m-%d')}) as start date")
|
||||
else:
|
||||
start_date = end_date - timedelta(days=365) # Use 365 days as default
|
||||
logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (365 days ago) as start date")
|
||||
|
||||
# Format dates for Discord API
|
||||
start_date_str = start_date.isoformat()
|
||||
end_date_str = end_date.isoformat()
|
||||
|
||||
documents_indexed = 0
|
||||
documents_skipped = 0
|
||||
skipped_guilds = []
|
||||
|
||||
try:
|
||||
await discord_client.start_bot()
|
||||
guilds = await discord_client.get_guilds()
|
||||
logger.info(f"Found {len(guilds)} guilds")
|
||||
except Exception as e:
|
||||
await discord_client.close_bot()
|
||||
return 0, f"Failed to get Discord guilds: {str(e)}"
|
||||
if not guilds:
|
||||
await discord_client.close_bot()
|
||||
return 0, "No Discord guilds found"
|
||||
|
||||
# Process each guild
|
||||
for guild in guilds:
|
||||
guild_id = guild["id"]
|
||||
guild_name = guild["name"]
|
||||
logger.info(f"Processing guild: {guild_name} ({guild_id})")
|
||||
try:
|
||||
channels = await discord_client.get_text_channels(guild_id)
|
||||
|
||||
if not channels:
|
||||
logger.info(f"No channels found in guild {guild_name}. Skipping.")
|
||||
skipped_guilds.append(f"{guild_name} (no channels)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
for channel in channels:
|
||||
channel_id = channel["id"]
|
||||
channel_name = channel["name"]
|
||||
|
||||
try:
|
||||
messages = await discord_client.get_channel_history(
|
||||
channel_id=channel_id,
|
||||
start_date=start_date_str,
|
||||
end_date=end_date_str,
|
||||
limit=1000
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get messages for channel {channel_name}: {str(e)}")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
if not messages:
|
||||
continue
|
||||
|
||||
for message in messages:
|
||||
try:
|
||||
content = message.get("content", "")
|
||||
if not content:
|
||||
continue
|
||||
|
||||
content_hash = generate_content_hash(content)
|
||||
existing_doc_by_hash_result = await session.execute(
|
||||
select(Document).where(Document.content_hash == content_hash)
|
||||
)
|
||||
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
|
||||
|
||||
if existing_document_by_hash:
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
summary_content = f"Discord message by {message.get('author_name', 'Unknown')} in {channel_name} ({guild_name})\n\n{content}"
|
||||
summary_embedding = config.embedding_model_instance.embed(summary_content)
|
||||
chunks = [
|
||||
Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
|
||||
for chunk in config.chunker_instance.chunk(content)
|
||||
]
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=f"Discord - {guild_name}#{channel_name}",
|
||||
document_type=DocumentType.DISCORD_CONNECTOR,
|
||||
document_metadata={
|
||||
"guild_id": guild_id,
|
||||
"guild_name": guild_name,
|
||||
"channel_id": channel_id,
|
||||
"channel_name": channel_name,
|
||||
"message_id": message.get("id"),
|
||||
"author_id": message.get("author_id"),
|
||||
"author_name": message.get("author_name"),
|
||||
"created_at": message.get("created_at"),
|
||||
"indexed_at": datetime.now(timezone.utc).isoformat()
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks
|
||||
)
|
||||
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Discord message: {str(e)}", exc_info=True)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing guild {guild_name}: {str(e)}", exc_info=True)
|
||||
skipped_guilds.append(f"{guild_name} (processing error)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
if update_last_indexed and documents_indexed > 0:
|
||||
connector.last_indexed_at = datetime.now(timezone.utc)
|
||||
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
|
||||
|
||||
await session.commit()
|
||||
await discord_client.close_bot()
|
||||
logger.info(f"Discord indexing completed: {documents_indexed} new messages, {documents_skipped} skipped")
|
||||
return documents_indexed, None
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True)
|
||||
return 0, f"Database error: {str(db_error)}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True)
|
||||
return 0, f"Failed to index Discord messages: {str(e)}"
|
||||
|
|
Loading…
Add table
Reference in a new issue