mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-02 10:39:13 +00:00
- Integrated TaskLoggingService to log the start, progress, success, and failure of podcast generation tasks. - Updated user ID handling to ensure it is consistently converted to a string across various tasks. - Modified frontend success message to direct users to the logs tab for status updates on podcast generation.
1653 lines
75 KiB
Python
1653 lines
75 KiB
Python
from typing import Optional, Tuple
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.future import select
|
|
from datetime import datetime, timedelta, timezone
|
|
from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchSourceConnectorType, SearchSpace
|
|
from app.config import config
|
|
from app.prompts import SUMMARY_PROMPT_TEMPLATE
|
|
from app.services.llm_service import get_user_long_context_llm
|
|
from app.services.task_logging_service import TaskLoggingService
|
|
from app.connectors.slack_history import SlackHistory
|
|
from app.connectors.notion_history import NotionHistoryConnector
|
|
from app.connectors.github_connector import GitHubConnector
|
|
from app.connectors.linear_connector import LinearConnector
|
|
from app.connectors.discord_connector import DiscordConnector
|
|
from slack_sdk.errors import SlackApiError
|
|
import logging
|
|
import asyncio
|
|
|
|
from app.utils.document_converters import generate_content_hash
|
|
|
|
# Set up logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def index_slack_messages(
|
|
session: AsyncSession,
|
|
connector_id: int,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
update_last_indexed: bool = True
|
|
) -> Tuple[int, Optional[str]]:
|
|
"""
|
|
Index Slack messages from all accessible channels.
|
|
|
|
Args:
|
|
session: Database session
|
|
connector_id: ID of the Slack connector
|
|
search_space_id: ID of the search space to store documents in
|
|
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
|
|
|
Returns:
|
|
Tuple containing (number of documents indexed, error message or None)
|
|
"""
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Log task start
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="slack_messages_indexing",
|
|
source="connector_indexing_task",
|
|
message=f"Starting Slack messages indexing for connector {connector_id}",
|
|
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
|
|
)
|
|
|
|
try:
|
|
# Get the connector
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Retrieving Slack connector {connector_id} from database",
|
|
{"stage": "connector_retrieval"}
|
|
)
|
|
|
|
result = await session.execute(
|
|
select(SearchSourceConnector)
|
|
.filter(
|
|
SearchSourceConnector.id == connector_id,
|
|
SearchSourceConnector.connector_type == SearchSourceConnectorType.SLACK_CONNECTOR
|
|
)
|
|
)
|
|
connector = result.scalars().first()
|
|
|
|
if not connector:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Connector with ID {connector_id} not found or is not a Slack connector",
|
|
"Connector not found",
|
|
{"error_type": "ConnectorNotFound"}
|
|
)
|
|
return 0, f"Connector with ID {connector_id} not found or is not a Slack connector"
|
|
|
|
# Get the Slack token from the connector config
|
|
slack_token = connector.config.get("SLACK_BOT_TOKEN")
|
|
if not slack_token:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Slack token not found in connector config for connector {connector_id}",
|
|
"Missing Slack token",
|
|
{"error_type": "MissingToken"}
|
|
)
|
|
return 0, "Slack token not found in connector config"
|
|
|
|
# Initialize Slack client
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Initializing Slack client for connector {connector_id}",
|
|
{"stage": "client_initialization"}
|
|
)
|
|
|
|
slack_client = SlackHistory(token=slack_token)
|
|
|
|
# Calculate date range
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Calculating date range for Slack indexing",
|
|
{"stage": "date_calculation", "provided_start_date": start_date, "provided_end_date": end_date}
|
|
)
|
|
|
|
if start_date is None or end_date is None:
|
|
# Fall back to calculating dates based on last_indexed_at
|
|
calculated_end_date = datetime.now()
|
|
|
|
# Use last_indexed_at as start date if available, otherwise use 365 days ago
|
|
if connector.last_indexed_at:
|
|
# Convert dates to be comparable (both timezone-naive)
|
|
last_indexed_naive = connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at
|
|
|
|
# Check if last_indexed_at is in the future or after end_date
|
|
if last_indexed_naive > calculated_end_date:
|
|
logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.")
|
|
calculated_start_date = calculated_end_date - timedelta(days=365)
|
|
else:
|
|
calculated_start_date = last_indexed_naive
|
|
logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date")
|
|
else:
|
|
calculated_start_date = calculated_end_date - timedelta(days=365) # Use 365 days as default
|
|
logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date")
|
|
|
|
# Use calculated dates if not provided
|
|
start_date_str = start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
|
|
end_date_str = end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
|
|
else:
|
|
# Use provided dates
|
|
start_date_str = start_date
|
|
end_date_str = end_date
|
|
|
|
logger.info(f"Indexing Slack messages from {start_date_str} to {end_date_str}")
|
|
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Fetching Slack channels from {start_date_str} to {end_date_str}",
|
|
{"stage": "fetch_channels", "start_date": start_date_str, "end_date": end_date_str}
|
|
)
|
|
|
|
# Get all channels
|
|
try:
|
|
channels = slack_client.get_all_channels()
|
|
except Exception as e:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to get Slack channels for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": "ChannelFetchError"}
|
|
)
|
|
return 0, f"Failed to get Slack channels: {str(e)}"
|
|
|
|
if not channels:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"No Slack channels found for connector {connector_id}",
|
|
{"channels_found": 0}
|
|
)
|
|
return 0, "No Slack channels found"
|
|
|
|
# Track the number of documents indexed
|
|
documents_indexed = 0
|
|
documents_skipped = 0
|
|
skipped_channels = []
|
|
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Starting to process {len(channels)} Slack channels",
|
|
{"stage": "process_channels", "total_channels": len(channels)}
|
|
)
|
|
|
|
# Process each channel
|
|
for channel_obj in channels: # Modified loop to iterate over list of channel objects
|
|
channel_id = channel_obj["id"]
|
|
channel_name = channel_obj["name"]
|
|
is_private = channel_obj["is_private"]
|
|
is_member = channel_obj["is_member"] # This might be False for public channels too
|
|
|
|
try:
|
|
# If it's a private channel and the bot is not a member, skip.
|
|
# For public channels, if they are listed by conversations.list, the bot can typically read history.
|
|
# The `not_in_channel` error in get_conversation_history will be the ultimate gatekeeper if history is inaccessible.
|
|
if is_private and not is_member:
|
|
logger.warning(f"Bot is not a member of private channel {channel_name} ({channel_id}). Skipping.")
|
|
skipped_channels.append(f"{channel_name} (private, bot not a member)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Get messages for this channel
|
|
# The get_history_by_date_range now uses get_conversation_history,
|
|
# which handles 'not_in_channel' by returning [] and logging.
|
|
messages, error = slack_client.get_history_by_date_range(
|
|
channel_id=channel_id,
|
|
start_date=start_date_str,
|
|
end_date=end_date_str,
|
|
limit=1000 # Limit to 1000 messages per channel
|
|
)
|
|
|
|
if error:
|
|
logger.warning(f"Error getting messages from channel {channel_name}: {error}")
|
|
skipped_channels.append(f"{channel_name} (error: {error})")
|
|
documents_skipped += 1
|
|
continue # Skip this channel if there's an error
|
|
|
|
if not messages:
|
|
logger.info(f"No messages found in channel {channel_name} for the specified date range.")
|
|
documents_skipped += 1
|
|
continue # Skip if no messages
|
|
|
|
# Format messages with user info
|
|
formatted_messages = []
|
|
for msg in messages:
|
|
# Skip bot messages and system messages
|
|
if msg.get("subtype") in ["bot_message", "channel_join", "channel_leave"]:
|
|
continue
|
|
|
|
formatted_msg = slack_client.format_message(msg, include_user_info=True)
|
|
formatted_messages.append(formatted_msg)
|
|
|
|
if not formatted_messages:
|
|
logger.info(f"No valid messages found in channel {channel_name} after filtering.")
|
|
documents_skipped += 1
|
|
continue # Skip if no valid messages after filtering
|
|
|
|
# Convert messages to markdown format
|
|
channel_content = f"# Slack Channel: {channel_name}\n\n"
|
|
|
|
for msg in formatted_messages:
|
|
user_name = msg.get("user_name", "Unknown User")
|
|
timestamp = msg.get("datetime", "Unknown Time")
|
|
text = msg.get("text", "")
|
|
|
|
channel_content += f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n"
|
|
|
|
# Format document metadata
|
|
metadata_sections = [
|
|
("METADATA", [
|
|
f"CHANNEL_NAME: {channel_name}",
|
|
f"CHANNEL_ID: {channel_id}",
|
|
# f"START_DATE: {start_date_str}",
|
|
# f"END_DATE: {end_date_str}",
|
|
f"MESSAGE_COUNT: {len(formatted_messages)}"
|
|
]),
|
|
("CONTENT", [
|
|
"FORMAT: markdown",
|
|
"TEXT_START",
|
|
channel_content,
|
|
"TEXT_END"
|
|
])
|
|
]
|
|
|
|
# Build the document string
|
|
document_parts = []
|
|
document_parts.append("<DOCUMENT>")
|
|
|
|
for section_title, section_content in metadata_sections:
|
|
document_parts.append(f"<{section_title}>")
|
|
document_parts.extend(section_content)
|
|
document_parts.append(f"</{section_title}>")
|
|
|
|
document_parts.append("</DOCUMENT>")
|
|
combined_document_string = '\n'.join(document_parts)
|
|
content_hash = generate_content_hash(combined_document_string, search_space_id)
|
|
|
|
# Check if document with this content hash already exists
|
|
existing_doc_by_hash_result = await session.execute(
|
|
select(Document).where(Document.content_hash == content_hash)
|
|
)
|
|
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
|
|
|
|
if existing_document_by_hash:
|
|
logger.info(f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing.")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Get user's long context LLM
|
|
user_llm = await get_user_long_context_llm(session, user_id)
|
|
if not user_llm:
|
|
logger.error(f"No long context LLM configured for user {user_id}")
|
|
skipped_channels.append(f"{channel_name} (no LLM configured)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Generate summary
|
|
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
|
|
summary_result = await summary_chain.ainvoke({"document": combined_document_string})
|
|
summary_content = summary_result.content
|
|
summary_embedding = config.embedding_model_instance.embed(summary_content)
|
|
|
|
# Process chunks
|
|
chunks = [
|
|
Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
|
|
for chunk in config.chunker_instance.chunk(channel_content)
|
|
]
|
|
|
|
# Create and store new document
|
|
document = Document(
|
|
search_space_id=search_space_id,
|
|
title=f"Slack - {channel_name}",
|
|
document_type=DocumentType.SLACK_CONNECTOR,
|
|
document_metadata={
|
|
"channel_name": channel_name,
|
|
"channel_id": channel_id,
|
|
"start_date": start_date_str,
|
|
"end_date": end_date_str,
|
|
"message_count": len(formatted_messages),
|
|
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
},
|
|
content=summary_content,
|
|
embedding=summary_embedding,
|
|
chunks=chunks,
|
|
content_hash=content_hash,
|
|
)
|
|
|
|
session.add(document)
|
|
documents_indexed += 1
|
|
logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages")
|
|
|
|
except SlackApiError as slack_error:
|
|
logger.error(f"Slack API error for channel {channel_name}: {str(slack_error)}")
|
|
skipped_channels.append(f"{channel_name} (Slack API error)")
|
|
documents_skipped += 1
|
|
continue # Skip this channel and continue with others
|
|
except Exception as e:
|
|
logger.error(f"Error processing channel {channel_name}: {str(e)}")
|
|
skipped_channels.append(f"{channel_name} (processing error)")
|
|
documents_skipped += 1
|
|
continue # Skip this channel and continue with others
|
|
|
|
# Update the last_indexed_at timestamp for the connector only if requested
|
|
# and if we successfully indexed at least one channel
|
|
total_processed = documents_indexed
|
|
if update_last_indexed and total_processed > 0:
|
|
connector.last_indexed_at = datetime.now()
|
|
|
|
# Commit all changes
|
|
await session.commit()
|
|
|
|
# Prepare result message
|
|
result_message = None
|
|
if skipped_channels:
|
|
result_message = f"Processed {total_processed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}"
|
|
else:
|
|
result_message = f"Processed {total_processed} channels."
|
|
|
|
# Log success
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully completed Slack indexing for connector {connector_id}",
|
|
{
|
|
"channels_processed": total_processed,
|
|
"documents_indexed": documents_indexed,
|
|
"documents_skipped": documents_skipped,
|
|
"skipped_channels_count": len(skipped_channels),
|
|
"result_message": result_message
|
|
}
|
|
)
|
|
|
|
logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
|
|
return total_processed, result_message
|
|
|
|
except SQLAlchemyError as db_error:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Database error during Slack indexing for connector {connector_id}",
|
|
str(db_error),
|
|
{"error_type": "SQLAlchemyError"}
|
|
)
|
|
logger.error(f"Database error: {str(db_error)}")
|
|
return 0, f"Database error: {str(db_error)}"
|
|
except Exception as e:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to index Slack messages for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": type(e).__name__}
|
|
)
|
|
logger.error(f"Failed to index Slack messages: {str(e)}")
|
|
return 0, f"Failed to index Slack messages: {str(e)}"
|
|
|
|
async def index_notion_pages(
|
|
session: AsyncSession,
|
|
connector_id: int,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
update_last_indexed: bool = True
|
|
) -> Tuple[int, Optional[str]]:
|
|
"""
|
|
Index Notion pages from all accessible pages.
|
|
|
|
Args:
|
|
session: Database session
|
|
connector_id: ID of the Notion connector
|
|
search_space_id: ID of the search space to store documents in
|
|
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
|
|
|
Returns:
|
|
Tuple containing (number of documents indexed, error message or None)
|
|
"""
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Log task start
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="notion_pages_indexing",
|
|
source="connector_indexing_task",
|
|
message=f"Starting Notion pages indexing for connector {connector_id}",
|
|
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
|
|
)
|
|
|
|
try:
|
|
# Get the connector
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Retrieving Notion connector {connector_id} from database",
|
|
{"stage": "connector_retrieval"}
|
|
)
|
|
|
|
result = await session.execute(
|
|
select(SearchSourceConnector)
|
|
.filter(
|
|
SearchSourceConnector.id == connector_id,
|
|
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR
|
|
)
|
|
)
|
|
connector = result.scalars().first()
|
|
|
|
if not connector:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Connector with ID {connector_id} not found or is not a Notion connector",
|
|
"Connector not found",
|
|
{"error_type": "ConnectorNotFound"}
|
|
)
|
|
return 0, f"Connector with ID {connector_id} not found or is not a Notion connector"
|
|
|
|
# Get the Notion token from the connector config
|
|
notion_token = connector.config.get("NOTION_INTEGRATION_TOKEN")
|
|
if not notion_token:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Notion integration token not found in connector config for connector {connector_id}",
|
|
"Missing Notion token",
|
|
{"error_type": "MissingToken"}
|
|
)
|
|
return 0, "Notion integration token not found in connector config"
|
|
|
|
# Initialize Notion client
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Initializing Notion client for connector {connector_id}",
|
|
{"stage": "client_initialization"}
|
|
)
|
|
|
|
logger.info(f"Initializing Notion client for connector {connector_id}")
|
|
notion_client = NotionHistoryConnector(token=notion_token)
|
|
|
|
# Calculate date range
|
|
if start_date is None or end_date is None:
|
|
# Fall back to calculating dates
|
|
calculated_end_date = datetime.now()
|
|
calculated_start_date = calculated_end_date - timedelta(days=365) # Check for last 1 year of pages
|
|
|
|
# Use calculated dates if not provided
|
|
if start_date is None:
|
|
start_date_iso = calculated_start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
else:
|
|
# Convert YYYY-MM-DD to ISO format
|
|
start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
if end_date is None:
|
|
end_date_iso = calculated_end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
else:
|
|
# Convert YYYY-MM-DD to ISO format
|
|
end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
else:
|
|
# Convert provided dates to ISO format for Notion API
|
|
start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
|
|
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Fetching Notion pages from {start_date_iso} to {end_date_iso}",
|
|
{"stage": "fetch_pages", "start_date": start_date_iso, "end_date": end_date_iso}
|
|
)
|
|
|
|
# Get all pages
|
|
try:
|
|
pages = notion_client.get_all_pages(start_date=start_date_iso, end_date=end_date_iso)
|
|
logger.info(f"Found {len(pages)} Notion pages")
|
|
except Exception as e:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to get Notion pages for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": "PageFetchError"}
|
|
)
|
|
logger.error(f"Error fetching Notion pages: {str(e)}", exc_info=True)
|
|
return 0, f"Failed to get Notion pages: {str(e)}"
|
|
|
|
if not pages:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"No Notion pages found for connector {connector_id}",
|
|
{"pages_found": 0}
|
|
)
|
|
logger.info("No Notion pages found to index")
|
|
return 0, "No Notion pages found"
|
|
|
|
# Track the number of documents indexed
|
|
documents_indexed = 0
|
|
documents_skipped = 0
|
|
skipped_pages = []
|
|
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Starting to process {len(pages)} Notion pages",
|
|
{"stage": "process_pages", "total_pages": len(pages)}
|
|
)
|
|
|
|
# Process each page
|
|
for page in pages:
|
|
try:
|
|
page_id = page.get("page_id")
|
|
page_title = page.get("title", f"Untitled page ({page_id})")
|
|
page_content = page.get("content", [])
|
|
|
|
logger.info(f"Processing Notion page: {page_title} ({page_id})")
|
|
|
|
if not page_content:
|
|
logger.info(f"No content found in page {page_title}. Skipping.")
|
|
skipped_pages.append(f"{page_title} (no content)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Convert page content to markdown format
|
|
markdown_content = f"# Notion Page: {page_title}\n\n"
|
|
|
|
# Process blocks recursively
|
|
def process_blocks(blocks, level=0):
|
|
result = ""
|
|
for block in blocks:
|
|
block_type = block.get("type")
|
|
block_content = block.get("content", "")
|
|
children = block.get("children", [])
|
|
|
|
# Add indentation based on level
|
|
indent = " " * level
|
|
|
|
# Format based on block type
|
|
if block_type in ["paragraph", "text"]:
|
|
result += f"{indent}{block_content}\n\n"
|
|
elif block_type in ["heading_1", "header"]:
|
|
result += f"{indent}# {block_content}\n\n"
|
|
elif block_type == "heading_2":
|
|
result += f"{indent}## {block_content}\n\n"
|
|
elif block_type == "heading_3":
|
|
result += f"{indent}### {block_content}\n\n"
|
|
elif block_type == "bulleted_list_item":
|
|
result += f"{indent}* {block_content}\n"
|
|
elif block_type == "numbered_list_item":
|
|
result += f"{indent}1. {block_content}\n"
|
|
elif block_type == "to_do":
|
|
result += f"{indent}- [ ] {block_content}\n"
|
|
elif block_type == "toggle":
|
|
result += f"{indent}> {block_content}\n"
|
|
elif block_type == "code":
|
|
result += f"{indent}```\n{block_content}\n```\n\n"
|
|
elif block_type == "quote":
|
|
result += f"{indent}> {block_content}\n\n"
|
|
elif block_type == "callout":
|
|
result += f"{indent}> **Note:** {block_content}\n\n"
|
|
elif block_type == "image":
|
|
result += f"{indent}\n\n"
|
|
else:
|
|
# Default for other block types
|
|
if block_content:
|
|
result += f"{indent}{block_content}\n\n"
|
|
|
|
# Process children recursively
|
|
if children:
|
|
result += process_blocks(children, level + 1)
|
|
|
|
return result
|
|
|
|
logger.debug(f"Converting {len(page_content)} blocks to markdown for page {page_title}")
|
|
markdown_content += process_blocks(page_content)
|
|
|
|
# Format document metadata
|
|
metadata_sections = [
|
|
("METADATA", [
|
|
f"PAGE_TITLE: {page_title}",
|
|
f"PAGE_ID: {page_id}"
|
|
]),
|
|
("CONTENT", [
|
|
"FORMAT: markdown",
|
|
"TEXT_START",
|
|
markdown_content,
|
|
"TEXT_END"
|
|
])
|
|
]
|
|
|
|
# Build the document string
|
|
document_parts = []
|
|
document_parts.append("<DOCUMENT>")
|
|
|
|
for section_title, section_content in metadata_sections:
|
|
document_parts.append(f"<{section_title}>")
|
|
document_parts.extend(section_content)
|
|
document_parts.append(f"</{section_title}>")
|
|
|
|
document_parts.append("</DOCUMENT>")
|
|
combined_document_string = '\n'.join(document_parts)
|
|
content_hash = generate_content_hash(combined_document_string, search_space_id)
|
|
|
|
# Check if document with this content hash already exists
|
|
existing_doc_by_hash_result = await session.execute(
|
|
select(Document).where(Document.content_hash == content_hash)
|
|
)
|
|
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
|
|
|
|
if existing_document_by_hash:
|
|
logger.info(f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing.")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Get user's long context LLM
|
|
user_llm = await get_user_long_context_llm(session, user_id)
|
|
if not user_llm:
|
|
logger.error(f"No long context LLM configured for user {user_id}")
|
|
skipped_pages.append(f"{page_title} (no LLM configured)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Generate summary
|
|
logger.debug(f"Generating summary for page {page_title}")
|
|
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
|
|
summary_result = await summary_chain.ainvoke({"document": combined_document_string})
|
|
summary_content = summary_result.content
|
|
summary_embedding = config.embedding_model_instance.embed(summary_content)
|
|
|
|
# Process chunks
|
|
logger.debug(f"Chunking content for page {page_title}")
|
|
chunks = [
|
|
Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
|
|
for chunk in config.chunker_instance.chunk(markdown_content)
|
|
]
|
|
|
|
# Create and store new document
|
|
document = Document(
|
|
search_space_id=search_space_id,
|
|
title=f"Notion - {page_title}",
|
|
document_type=DocumentType.NOTION_CONNECTOR,
|
|
document_metadata={
|
|
"page_title": page_title,
|
|
"page_id": page_id,
|
|
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
},
|
|
content=summary_content,
|
|
content_hash=content_hash,
|
|
embedding=summary_embedding,
|
|
chunks=chunks
|
|
)
|
|
|
|
session.add(document)
|
|
documents_indexed += 1
|
|
logger.info(f"Successfully indexed new Notion page: {page_title}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Notion page {page.get('title', 'Unknown')}: {str(e)}", exc_info=True)
|
|
skipped_pages.append(f"{page.get('title', 'Unknown')} (processing error)")
|
|
documents_skipped += 1
|
|
continue # Skip this page and continue with others
|
|
|
|
# Update the last_indexed_at timestamp for the connector only if requested
|
|
# and if we successfully indexed at least one page
|
|
total_processed = documents_indexed
|
|
if update_last_indexed and total_processed > 0:
|
|
connector.last_indexed_at = datetime.now()
|
|
logger.info(f"Updated last_indexed_at for connector {connector_id}")
|
|
|
|
# Commit all changes
|
|
await session.commit()
|
|
|
|
# Prepare result message
|
|
result_message = None
|
|
if skipped_pages:
|
|
result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}"
|
|
else:
|
|
result_message = f"Processed {total_processed} pages."
|
|
|
|
# Log success
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully completed Notion indexing for connector {connector_id}",
|
|
{
|
|
"pages_processed": total_processed,
|
|
"documents_indexed": documents_indexed,
|
|
"documents_skipped": documents_skipped,
|
|
"skipped_pages_count": len(skipped_pages),
|
|
"result_message": result_message
|
|
}
|
|
)
|
|
|
|
logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped")
|
|
return total_processed, result_message
|
|
|
|
except SQLAlchemyError as db_error:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Database error during Notion indexing for connector {connector_id}",
|
|
str(db_error),
|
|
{"error_type": "SQLAlchemyError"}
|
|
)
|
|
logger.error(f"Database error during Notion indexing: {str(db_error)}", exc_info=True)
|
|
return 0, f"Database error: {str(db_error)}"
|
|
except Exception as e:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to index Notion pages for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": type(e).__name__}
|
|
)
|
|
logger.error(f"Failed to index Notion pages: {str(e)}", exc_info=True)
|
|
return 0, f"Failed to index Notion pages: {str(e)}"
|
|
|
|
async def index_github_repos(
|
|
session: AsyncSession,
|
|
connector_id: int,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
update_last_indexed: bool = True
|
|
) -> Tuple[int, Optional[str]]:
|
|
"""
|
|
Index code and documentation files from accessible GitHub repositories.
|
|
|
|
Args:
|
|
session: Database session
|
|
connector_id: ID of the GitHub connector
|
|
search_space_id: ID of the search space to store documents in
|
|
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
|
|
|
Returns:
|
|
Tuple containing (number of documents indexed, error message or None)
|
|
"""
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Log task start
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="github_repos_indexing",
|
|
source="connector_indexing_task",
|
|
message=f"Starting GitHub repositories indexing for connector {connector_id}",
|
|
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
|
|
)
|
|
|
|
documents_processed = 0
|
|
errors = []
|
|
|
|
try:
|
|
# 1. Get the GitHub connector from the database
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Retrieving GitHub connector {connector_id} from database",
|
|
{"stage": "connector_retrieval"}
|
|
)
|
|
|
|
result = await session.execute(
|
|
select(SearchSourceConnector)
|
|
.filter(
|
|
SearchSourceConnector.id == connector_id,
|
|
SearchSourceConnector.connector_type == SearchSourceConnectorType.GITHUB_CONNECTOR
|
|
)
|
|
)
|
|
connector = result.scalars().first()
|
|
|
|
if not connector:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Connector with ID {connector_id} not found or is not a GitHub connector",
|
|
"Connector not found",
|
|
{"error_type": "ConnectorNotFound"}
|
|
)
|
|
return 0, f"Connector with ID {connector_id} not found or is not a GitHub connector"
|
|
|
|
# 2. Get the GitHub PAT and selected repositories from the connector config
|
|
github_pat = connector.config.get("GITHUB_PAT")
|
|
repo_full_names_to_index = connector.config.get("repo_full_names")
|
|
|
|
if not github_pat:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"GitHub Personal Access Token (PAT) not found in connector config for connector {connector_id}",
|
|
"Missing GitHub PAT",
|
|
{"error_type": "MissingToken"}
|
|
)
|
|
return 0, "GitHub Personal Access Token (PAT) not found in connector config"
|
|
|
|
if not repo_full_names_to_index or not isinstance(repo_full_names_to_index, list):
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"'repo_full_names' not found or is not a list in connector config for connector {connector_id}",
|
|
"Invalid repo configuration",
|
|
{"error_type": "InvalidConfiguration"}
|
|
)
|
|
return 0, "'repo_full_names' not found or is not a list in connector config"
|
|
|
|
# 3. Initialize GitHub connector client
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Initializing GitHub client for connector {connector_id}",
|
|
{"stage": "client_initialization", "repo_count": len(repo_full_names_to_index)}
|
|
)
|
|
|
|
try:
|
|
github_client = GitHubConnector(token=github_pat)
|
|
except ValueError as e:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to initialize GitHub client for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": "ClientInitializationError"}
|
|
)
|
|
return 0, f"Failed to initialize GitHub client: {str(e)}"
|
|
|
|
# 4. Validate selected repositories
|
|
# For simplicity, we'll proceed with the list provided.
|
|
# If a repo is inaccessible, get_repository_files will likely fail gracefully later.
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Starting indexing for {len(repo_full_names_to_index)} selected repositories",
|
|
{"stage": "repo_processing", "repo_count": len(repo_full_names_to_index), "start_date": start_date, "end_date": end_date}
|
|
)
|
|
|
|
logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.")
|
|
if start_date and end_date:
|
|
logger.info(f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)")
|
|
|
|
# 6. Iterate through selected repositories and index files
|
|
for repo_full_name in repo_full_names_to_index:
|
|
if not repo_full_name or not isinstance(repo_full_name, str):
|
|
logger.warning(f"Skipping invalid repository entry: {repo_full_name}")
|
|
continue
|
|
|
|
logger.info(f"Processing repository: {repo_full_name}")
|
|
try:
|
|
files_to_index = github_client.get_repository_files(repo_full_name)
|
|
if not files_to_index:
|
|
logger.info(f"No indexable files found in repository: {repo_full_name}")
|
|
continue
|
|
|
|
logger.info(f"Found {len(files_to_index)} files to process in {repo_full_name}")
|
|
|
|
for file_info in files_to_index:
|
|
file_path = file_info.get("path")
|
|
file_url = file_info.get("url")
|
|
file_sha = file_info.get("sha")
|
|
file_type = file_info.get("type") # 'code' or 'doc'
|
|
full_path_key = f"{repo_full_name}/{file_path}"
|
|
|
|
if not file_path or not file_url or not file_sha:
|
|
logger.warning(f"Skipping file with missing info in {repo_full_name}: {file_info}")
|
|
continue
|
|
|
|
# Get file content
|
|
file_content = github_client.get_file_content(repo_full_name, file_path)
|
|
|
|
if file_content is None:
|
|
logger.warning(f"Could not retrieve content for {full_path_key}. Skipping.")
|
|
continue # Skip if content fetch failed
|
|
|
|
content_hash = generate_content_hash(file_content, search_space_id)
|
|
|
|
# Check if document with this content hash already exists
|
|
existing_doc_by_hash_result = await session.execute(
|
|
select(Document).where(Document.content_hash == content_hash)
|
|
)
|
|
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
|
|
|
|
if existing_document_by_hash:
|
|
logger.info(f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing.")
|
|
continue
|
|
|
|
# Use file_content directly for chunking, maybe summary for main content?
|
|
# For now, let's use the full content for both, might need refinement
|
|
summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary
|
|
summary_embedding = config.embedding_model_instance.embed(summary_content)
|
|
|
|
# Chunk the content
|
|
try:
|
|
chunks_data = [
|
|
Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
|
|
for chunk in config.code_chunker_instance.chunk(file_content)
|
|
]
|
|
except Exception as chunk_err:
|
|
logger.error(f"Failed to chunk file {full_path_key}: {chunk_err}")
|
|
errors.append(f"Chunking failed for {full_path_key}: {chunk_err}")
|
|
continue # Skip this file if chunking fails
|
|
|
|
doc_metadata = {
|
|
"repository_full_name": repo_full_name,
|
|
"file_path": file_path,
|
|
"full_path": full_path_key, # For easier lookup
|
|
"url": file_url,
|
|
"sha": file_sha,
|
|
"type": file_type,
|
|
"indexed_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
# Create new document
|
|
logger.info(f"Creating new document for file: {full_path_key}")
|
|
document = Document(
|
|
title=f"GitHub - {file_path}",
|
|
document_type=DocumentType.GITHUB_CONNECTOR,
|
|
document_metadata=doc_metadata,
|
|
content=summary_content, # Store summary
|
|
content_hash=content_hash,
|
|
embedding=summary_embedding,
|
|
search_space_id=search_space_id,
|
|
chunks=chunks_data # Associate chunks directly
|
|
)
|
|
session.add(document)
|
|
documents_processed += 1
|
|
|
|
except Exception as repo_err:
|
|
logger.error(f"Failed to process repository {repo_full_name}: {repo_err}")
|
|
errors.append(f"Failed processing {repo_full_name}: {repo_err}")
|
|
|
|
# Commit all changes at the end
|
|
await session.commit()
|
|
logger.info(f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files.")
|
|
|
|
# Log success
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully completed GitHub indexing for connector {connector_id}",
|
|
{
|
|
"documents_processed": documents_processed,
|
|
"errors_count": len(errors),
|
|
"repo_count": len(repo_full_names_to_index)
|
|
}
|
|
)
|
|
|
|
except SQLAlchemyError as db_err:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Database error during GitHub indexing for connector {connector_id}",
|
|
str(db_err),
|
|
{"error_type": "SQLAlchemyError"}
|
|
)
|
|
logger.error(f"Database error during GitHub indexing for connector {connector_id}: {db_err}")
|
|
errors.append(f"Database error: {db_err}")
|
|
return documents_processed, "; ".join(errors) if errors else str(db_err)
|
|
except Exception as e:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Unexpected error during GitHub indexing for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": type(e).__name__}
|
|
)
|
|
logger.error(f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", exc_info=True)
|
|
errors.append(f"Unexpected error: {e}")
|
|
return documents_processed, "; ".join(errors) if errors else str(e)
|
|
|
|
error_message = "; ".join(errors) if errors else None
|
|
return documents_processed, error_message
|
|
|
|
async def index_linear_issues(
|
|
session: AsyncSession,
|
|
connector_id: int,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
update_last_indexed: bool = True
|
|
) -> Tuple[int, Optional[str]]:
|
|
"""
|
|
Index Linear issues and comments.
|
|
|
|
Args:
|
|
session: Database session
|
|
connector_id: ID of the Linear connector
|
|
search_space_id: ID of the search space to store documents in
|
|
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
|
|
|
Returns:
|
|
Tuple containing (number of documents indexed, error message or None)
|
|
"""
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Log task start
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="linear_issues_indexing",
|
|
source="connector_indexing_task",
|
|
message=f"Starting Linear issues indexing for connector {connector_id}",
|
|
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
|
|
)
|
|
|
|
try:
|
|
# Get the connector
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Retrieving Linear connector {connector_id} from database",
|
|
{"stage": "connector_retrieval"}
|
|
)
|
|
|
|
result = await session.execute(
|
|
select(SearchSourceConnector)
|
|
.filter(
|
|
SearchSourceConnector.id == connector_id,
|
|
SearchSourceConnector.connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR
|
|
)
|
|
)
|
|
connector = result.scalars().first()
|
|
|
|
if not connector:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Connector with ID {connector_id} not found or is not a Linear connector",
|
|
"Connector not found",
|
|
{"error_type": "ConnectorNotFound"}
|
|
)
|
|
return 0, f"Connector with ID {connector_id} not found or is not a Linear connector"
|
|
|
|
# Get the Linear token from the connector config
|
|
linear_token = connector.config.get("LINEAR_API_KEY")
|
|
if not linear_token:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Linear API token not found in connector config for connector {connector_id}",
|
|
"Missing Linear token",
|
|
{"error_type": "MissingToken"}
|
|
)
|
|
return 0, "Linear API token not found in connector config"
|
|
|
|
# Initialize Linear client
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Initializing Linear client for connector {connector_id}",
|
|
{"stage": "client_initialization"}
|
|
)
|
|
|
|
linear_client = LinearConnector(token=linear_token)
|
|
|
|
# Calculate date range
|
|
if start_date is None or end_date is None:
|
|
# Fall back to calculating dates based on last_indexed_at
|
|
calculated_end_date = datetime.now()
|
|
|
|
# Use last_indexed_at as start date if available, otherwise use 365 days ago
|
|
if connector.last_indexed_at:
|
|
# Convert dates to be comparable (both timezone-naive)
|
|
last_indexed_naive = connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at
|
|
|
|
# Check if last_indexed_at is in the future or after end_date
|
|
if last_indexed_naive > calculated_end_date:
|
|
logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.")
|
|
calculated_start_date = calculated_end_date - timedelta(days=365)
|
|
else:
|
|
calculated_start_date = last_indexed_naive
|
|
logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date")
|
|
else:
|
|
calculated_start_date = calculated_end_date - timedelta(days=365) # Use 365 days as default
|
|
logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date")
|
|
|
|
# Use calculated dates if not provided
|
|
start_date_str = start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
|
|
end_date_str = end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
|
|
else:
|
|
# Use provided dates
|
|
start_date_str = start_date
|
|
end_date_str = end_date
|
|
|
|
logger.info(f"Fetching Linear issues from {start_date_str} to {end_date_str}")
|
|
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Fetching Linear issues from {start_date_str} to {end_date_str}",
|
|
{"stage": "fetch_issues", "start_date": start_date_str, "end_date": end_date_str}
|
|
)
|
|
|
|
# Get issues within date range
|
|
try:
|
|
issues, error = linear_client.get_issues_by_date_range(
|
|
start_date=start_date_str,
|
|
end_date=end_date_str,
|
|
include_comments=True
|
|
)
|
|
|
|
if error:
|
|
logger.error(f"Failed to get Linear issues: {error}")
|
|
|
|
# Don't treat "No issues found" as an error that should stop indexing
|
|
if "No issues found" in error:
|
|
logger.info("No issues found is not a critical error, continuing with update")
|
|
if update_last_indexed:
|
|
connector.last_indexed_at = datetime.now()
|
|
await session.commit()
|
|
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found")
|
|
return 0, None
|
|
else:
|
|
return 0, f"Failed to get Linear issues: {error}"
|
|
|
|
logger.info(f"Retrieved {len(issues)} issues from Linear API")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Exception when calling Linear API: {str(e)}", exc_info=True)
|
|
return 0, f"Failed to get Linear issues: {str(e)}"
|
|
|
|
if not issues:
|
|
logger.info("No Linear issues found for the specified date range")
|
|
if update_last_indexed:
|
|
connector.last_indexed_at = datetime.now()
|
|
await session.commit()
|
|
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found")
|
|
return 0, None # Return None instead of error message when no issues found
|
|
|
|
# Log issue IDs and titles for debugging
|
|
logger.info("Issues retrieved from Linear API:")
|
|
for idx, issue in enumerate(issues[:10]): # Log first 10 issues
|
|
logger.info(f" {idx+1}. {issue.get('identifier', 'Unknown')} - {issue.get('title', 'Unknown')} - Created: {issue.get('createdAt', 'Unknown')} - Updated: {issue.get('updatedAt', 'Unknown')}")
|
|
if len(issues) > 10:
|
|
logger.info(f" ...and {len(issues) - 10} more issues")
|
|
|
|
# Track the number of documents indexed
|
|
documents_indexed = 0
|
|
documents_skipped = 0
|
|
skipped_issues = []
|
|
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Starting to process {len(issues)} Linear issues",
|
|
{"stage": "process_issues", "total_issues": len(issues)}
|
|
)
|
|
|
|
# Process each issue
|
|
for issue in issues:
|
|
try:
|
|
issue_id = issue.get("id")
|
|
issue_identifier = issue.get("identifier", "")
|
|
issue_title = issue.get("title", "")
|
|
|
|
if not issue_id or not issue_title:
|
|
logger.warning(f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}")
|
|
skipped_issues.append(f"{issue_identifier or 'Unknown'} (missing data)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Format the issue first to get well-structured data
|
|
formatted_issue = linear_client.format_issue(issue)
|
|
|
|
# Convert issue to markdown format
|
|
issue_content = linear_client.format_issue_to_markdown(formatted_issue)
|
|
|
|
if not issue_content:
|
|
logger.warning(f"Skipping issue with no content: {issue_identifier} - {issue_title}")
|
|
skipped_issues.append(f"{issue_identifier} (no content)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Create a short summary for the embedding
|
|
# This avoids using the LLM and just uses the issue data directly
|
|
state = formatted_issue.get("state", "Unknown")
|
|
description = formatted_issue.get("description", "")
|
|
# Truncate description if it's too long for the summary
|
|
if description and len(description) > 500:
|
|
description = description[:497] + "..."
|
|
|
|
# Create a simple summary from the issue data
|
|
summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n"
|
|
if description:
|
|
summary_content += f"Description: {description}\n\n"
|
|
|
|
# Add comment count
|
|
comment_count = len(formatted_issue.get("comments", []))
|
|
summary_content += f"Comments: {comment_count}"
|
|
|
|
content_hash = generate_content_hash(issue_content, search_space_id)
|
|
|
|
# Check if document with this content hash already exists
|
|
existing_doc_by_hash_result = await session.execute(
|
|
select(Document).where(Document.content_hash == content_hash)
|
|
)
|
|
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
|
|
|
|
if existing_document_by_hash:
|
|
logger.info(f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing.")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Generate embedding for the summary
|
|
summary_embedding = config.embedding_model_instance.embed(summary_content)
|
|
|
|
# Process chunks - using the full issue content with comments
|
|
chunks = [
|
|
Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
|
|
for chunk in config.chunker_instance.chunk(issue_content)
|
|
]
|
|
|
|
# Create and store new document
|
|
logger.info(f"Creating new document for issue {issue_identifier} - {issue_title}")
|
|
document = Document(
|
|
search_space_id=search_space_id,
|
|
title=f"Linear - {issue_identifier}: {issue_title}",
|
|
document_type=DocumentType.LINEAR_CONNECTOR,
|
|
document_metadata={
|
|
"issue_id": issue_id,
|
|
"issue_identifier": issue_identifier,
|
|
"issue_title": issue_title,
|
|
"state": state,
|
|
"comment_count": comment_count,
|
|
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
},
|
|
content=summary_content,
|
|
content_hash=content_hash,
|
|
embedding=summary_embedding,
|
|
chunks=chunks
|
|
)
|
|
|
|
session.add(document)
|
|
documents_indexed += 1
|
|
logger.info(f"Successfully indexed new issue {issue_identifier} - {issue_title}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}", exc_info=True)
|
|
skipped_issues.append(f"{issue.get('identifier', 'Unknown')} (processing error)")
|
|
documents_skipped += 1
|
|
continue # Skip this issue and continue with others
|
|
|
|
# Update the last_indexed_at timestamp for the connector only if requested
|
|
total_processed = documents_indexed
|
|
if update_last_indexed:
|
|
connector.last_indexed_at = datetime.now()
|
|
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
|
|
|
|
# Commit all changes
|
|
await session.commit()
|
|
logger.info(f"Successfully committed all Linear document changes to database")
|
|
|
|
# Log success
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully completed Linear indexing for connector {connector_id}",
|
|
{
|
|
"issues_processed": total_processed,
|
|
"documents_indexed": documents_indexed,
|
|
"documents_skipped": documents_skipped,
|
|
"skipped_issues_count": len(skipped_issues)
|
|
}
|
|
)
|
|
|
|
logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped")
|
|
return total_processed, None # Return None as the error message to indicate success
|
|
|
|
except SQLAlchemyError as db_error:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Database error during Linear indexing for connector {connector_id}",
|
|
str(db_error),
|
|
{"error_type": "SQLAlchemyError"}
|
|
)
|
|
logger.error(f"Database error: {str(db_error)}", exc_info=True)
|
|
return 0, f"Database error: {str(db_error)}"
|
|
except Exception as e:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to index Linear issues for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": type(e).__name__}
|
|
)
|
|
logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True)
|
|
return 0, f"Failed to index Linear issues: {str(e)}"
|
|
|
|
async def index_discord_messages(
|
|
session: AsyncSession,
|
|
connector_id: int,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
update_last_indexed: bool = True
|
|
) -> Tuple[int, Optional[str]]:
|
|
"""
|
|
Index Discord messages from all accessible channels.
|
|
|
|
Args:
|
|
session: Database session
|
|
connector_id: ID of the Discord connector
|
|
search_space_id: ID of the search space to store documents in
|
|
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
|
|
|
Returns:
|
|
Tuple containing (number of documents indexed, error message or None)
|
|
"""
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Log task start
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="discord_messages_indexing",
|
|
source="connector_indexing_task",
|
|
message=f"Starting Discord messages indexing for connector {connector_id}",
|
|
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
|
|
)
|
|
|
|
try:
|
|
# Get the connector
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Retrieving Discord connector {connector_id} from database",
|
|
{"stage": "connector_retrieval"}
|
|
)
|
|
|
|
result = await session.execute(
|
|
select(SearchSourceConnector)
|
|
.filter(
|
|
SearchSourceConnector.id == connector_id,
|
|
SearchSourceConnector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR
|
|
)
|
|
)
|
|
connector = result.scalars().first()
|
|
|
|
if not connector:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Connector with ID {connector_id} not found or is not a Discord connector",
|
|
"Connector not found",
|
|
{"error_type": "ConnectorNotFound"}
|
|
)
|
|
return 0, f"Connector with ID {connector_id} not found or is not a Discord connector"
|
|
|
|
# Get the Discord token from the connector config
|
|
discord_token = connector.config.get("DISCORD_BOT_TOKEN")
|
|
if not discord_token:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Discord token not found in connector config for connector {connector_id}",
|
|
"Missing Discord token",
|
|
{"error_type": "MissingToken"}
|
|
)
|
|
return 0, "Discord token not found in connector config"
|
|
|
|
logger.info(f"Starting Discord indexing for connector {connector_id}")
|
|
|
|
# Initialize Discord client
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Initializing Discord client for connector {connector_id}",
|
|
{"stage": "client_initialization"}
|
|
)
|
|
|
|
discord_client = DiscordConnector(token=discord_token)
|
|
|
|
# Calculate date range
|
|
if start_date is None or end_date is None:
|
|
# Fall back to calculating dates based on last_indexed_at
|
|
calculated_end_date = datetime.now(timezone.utc)
|
|
|
|
# Use last_indexed_at as start date if available, otherwise use 365 days ago
|
|
if connector.last_indexed_at:
|
|
calculated_start_date = connector.last_indexed_at.replace(tzinfo=timezone.utc)
|
|
logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date")
|
|
else:
|
|
calculated_start_date = calculated_end_date - timedelta(days=365)
|
|
logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date")
|
|
|
|
# Use calculated dates if not provided, convert to ISO format for Discord API
|
|
if start_date is None:
|
|
start_date_iso = calculated_start_date.isoformat()
|
|
else:
|
|
# Convert YYYY-MM-DD to ISO format
|
|
start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat()
|
|
|
|
if end_date is None:
|
|
end_date_iso = calculated_end_date.isoformat()
|
|
else:
|
|
# Convert YYYY-MM-DD to ISO format
|
|
end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat()
|
|
else:
|
|
# Convert provided dates to ISO format for Discord API
|
|
start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat()
|
|
end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat()
|
|
|
|
logger.info(f"Indexing Discord messages from {start_date_iso} to {end_date_iso}")
|
|
|
|
documents_indexed = 0
|
|
documents_skipped = 0
|
|
skipped_channels = []
|
|
|
|
try:
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Starting Discord bot and fetching guilds for connector {connector_id}",
|
|
{"stage": "fetch_guilds"}
|
|
)
|
|
|
|
logger.info("Starting Discord bot to fetch guilds")
|
|
discord_client._bot_task = asyncio.create_task(discord_client.start_bot())
|
|
await discord_client._wait_until_ready()
|
|
|
|
logger.info("Fetching Discord guilds")
|
|
guilds = await discord_client.get_guilds()
|
|
logger.info(f"Found {len(guilds)} guilds")
|
|
except Exception as e:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to get Discord guilds for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": "GuildFetchError"}
|
|
)
|
|
logger.error(f"Failed to get Discord guilds: {str(e)}", exc_info=True)
|
|
await discord_client.close_bot()
|
|
return 0, f"Failed to get Discord guilds: {str(e)}"
|
|
if not guilds:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"No Discord guilds found for connector {connector_id}",
|
|
{"guilds_found": 0}
|
|
)
|
|
logger.info("No Discord guilds found to index")
|
|
await discord_client.close_bot()
|
|
return 0, "No Discord guilds found"
|
|
|
|
# Process each guild and channel
|
|
await task_logger.log_task_progress(
|
|
log_entry,
|
|
f"Starting to process {len(guilds)} Discord guilds",
|
|
{"stage": "process_guilds", "total_guilds": len(guilds)}
|
|
)
|
|
|
|
for guild in guilds:
|
|
guild_id = guild["id"]
|
|
guild_name = guild["name"]
|
|
logger.info(f"Processing guild: {guild_name} ({guild_id})")
|
|
try:
|
|
channels = await discord_client.get_text_channels(guild_id)
|
|
if not channels:
|
|
logger.info(f"No channels found in guild {guild_name}. Skipping.")
|
|
skipped_channels.append(f"{guild_name} (no channels)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
for channel in channels:
|
|
channel_id = channel["id"]
|
|
channel_name = channel["name"]
|
|
|
|
try:
|
|
messages = await discord_client.get_channel_history(
|
|
channel_id=channel_id,
|
|
start_date=start_date_iso,
|
|
end_date=end_date_iso,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to get messages for channel {channel_name}: {str(e)}")
|
|
skipped_channels.append(f"{guild_name}#{channel_name} (fetch error)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
if not messages:
|
|
logger.info(f"No messages found in channel {channel_name} for the specified date range.")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Format messages
|
|
formatted_messages = []
|
|
for msg in messages:
|
|
# Skip system messages if needed (Discord has some types)
|
|
if msg.get("type") in ["system"]:
|
|
continue
|
|
formatted_messages.append(msg)
|
|
|
|
if not formatted_messages:
|
|
logger.info(f"No valid messages found in channel {channel_name} after filtering.")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Convert messages to markdown format
|
|
channel_content = f"# Discord Channel: {guild_name} / {channel_name}\n\n"
|
|
for msg in formatted_messages:
|
|
user_name = msg.get("author_name", "Unknown User")
|
|
timestamp = msg.get("created_at", "Unknown Time")
|
|
text = msg.get("content", "")
|
|
channel_content += f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n"
|
|
|
|
# Format document metadata
|
|
metadata_sections = [
|
|
("METADATA", [
|
|
f"GUILD_NAME: {guild_name}",
|
|
f"GUILD_ID: {guild_id}",
|
|
f"CHANNEL_NAME: {channel_name}",
|
|
f"CHANNEL_ID: {channel_id}",
|
|
f"MESSAGE_COUNT: {len(formatted_messages)}"
|
|
]),
|
|
("CONTENT", [
|
|
"FORMAT: markdown",
|
|
"TEXT_START",
|
|
channel_content,
|
|
"TEXT_END"
|
|
])
|
|
]
|
|
|
|
# Build the document string
|
|
document_parts = []
|
|
document_parts.append("<DOCUMENT>")
|
|
for section_title, section_content in metadata_sections:
|
|
document_parts.append(f"<{section_title}>")
|
|
document_parts.extend(section_content)
|
|
document_parts.append(f"</{section_title}>")
|
|
document_parts.append("</DOCUMENT>")
|
|
combined_document_string = '\n'.join(document_parts)
|
|
content_hash = generate_content_hash(combined_document_string, search_space_id)
|
|
|
|
# Check if document with this content hash already exists
|
|
existing_doc_by_hash_result = await session.execute(
|
|
select(Document).where(Document.content_hash == content_hash)
|
|
)
|
|
existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
|
|
|
|
if existing_document_by_hash:
|
|
logger.info(f"Document with content hash {content_hash} already exists for channel {guild_name}#{channel_name}. Skipping processing.")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Get user's long context LLM
|
|
user_llm = await get_user_long_context_llm(session, user_id)
|
|
if not user_llm:
|
|
logger.error(f"No long context LLM configured for user {user_id}")
|
|
skipped_channels.append(f"{guild_name}#{channel_name} (no LLM configured)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Generate summary using summary_chain
|
|
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
|
|
summary_result = await summary_chain.ainvoke({"document": combined_document_string})
|
|
summary_content = summary_result.content
|
|
summary_embedding = await asyncio.to_thread(
|
|
config.embedding_model_instance.embed, summary_content
|
|
)
|
|
|
|
# Process chunks
|
|
raw_chunks = await asyncio.to_thread(
|
|
config.chunker_instance.chunk,
|
|
channel_content
|
|
)
|
|
|
|
chunk_texts = [chunk.text for chunk in raw_chunks if chunk.text.strip()]
|
|
chunk_embeddings = await asyncio.to_thread(
|
|
lambda texts: [config.embedding_model_instance.embed(t) for t in texts],
|
|
chunk_texts
|
|
)
|
|
|
|
chunks = [
|
|
Chunk(content=raw_chunk.text, embedding=embedding)
|
|
for raw_chunk, embedding in zip(raw_chunks, chunk_embeddings)
|
|
]
|
|
|
|
# Create and store new document
|
|
document = Document(
|
|
search_space_id=search_space_id,
|
|
title=f"Discord - {guild_name}#{channel_name}",
|
|
document_type=DocumentType.DISCORD_CONNECTOR,
|
|
document_metadata={
|
|
"guild_name": guild_name,
|
|
"guild_id": guild_id,
|
|
"channel_name": channel_name,
|
|
"channel_id": channel_id,
|
|
"message_count": len(formatted_messages),
|
|
"start_date": start_date_iso,
|
|
"end_date": end_date_iso,
|
|
"indexed_at": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
},
|
|
content=summary_content,
|
|
content_hash=content_hash,
|
|
embedding=summary_embedding,
|
|
chunks=chunks
|
|
)
|
|
|
|
session.add(document)
|
|
documents_indexed += 1
|
|
logger.info(f"Successfully indexed new channel {guild_name}#{channel_name} with {len(formatted_messages)} messages")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing guild {guild_name}: {str(e)}", exc_info=True)
|
|
skipped_channels.append(f"{guild_name} (processing error)")
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
if update_last_indexed and documents_indexed > 0:
|
|
connector.last_indexed_at = datetime.now(timezone.utc)
|
|
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
|
|
|
|
await session.commit()
|
|
await discord_client.close_bot()
|
|
|
|
# Prepare result message
|
|
result_message = None
|
|
if skipped_channels:
|
|
result_message = f"Processed {documents_indexed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}"
|
|
else:
|
|
result_message = f"Processed {documents_indexed} channels."
|
|
|
|
# Log success
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully completed Discord indexing for connector {connector_id}",
|
|
{
|
|
"channels_processed": documents_indexed,
|
|
"documents_indexed": documents_indexed,
|
|
"documents_skipped": documents_skipped,
|
|
"skipped_channels_count": len(skipped_channels),
|
|
"guilds_processed": len(guilds),
|
|
"result_message": result_message
|
|
}
|
|
)
|
|
|
|
logger.info(f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
|
|
return documents_indexed, result_message
|
|
|
|
except SQLAlchemyError as db_error:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Database error during Discord indexing for connector {connector_id}",
|
|
str(db_error),
|
|
{"error_type": "SQLAlchemyError"}
|
|
)
|
|
logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True)
|
|
return 0, f"Database error: {str(db_error)}"
|
|
except Exception as e:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to index Discord messages for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": type(e).__name__}
|
|
)
|
|
logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True)
|
|
return 0, f"Failed to index Discord messages: {str(e)}"
|