diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index c25a7f4..6a972be 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -7,6 +7,7 @@ from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchS from app.config import config from app.prompts import SUMMARY_PROMPT_TEMPLATE from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService from app.connectors.slack_history import SlackHistory from app.connectors.notion_history import NotionHistoryConnector from app.connectors.github_connector import GitHubConnector @@ -42,8 +43,24 @@ async def index_slack_messages( Returns: Tuple containing (number of documents indexed, error message or None) """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="slack_messages_indexing", + source="connector_indexing_task", + message=f"Starting Slack messages indexing for connector {connector_id}", + metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + ) + try: # Get the connector + await task_logger.log_task_progress( + log_entry, + f"Retrieving Slack connector {connector_id} from database", + {"stage": "connector_retrieval"} + ) + result = await session.execute( select(SearchSourceConnector) .filter( @@ -54,17 +71,41 @@ async def index_slack_messages( connector = result.scalars().first() if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a Slack connector", + "Connector not found", + {"error_type": "ConnectorNotFound"} + ) return 0, f"Connector with ID {connector_id} not found or is not a Slack connector" # Get the Slack token from the connector config slack_token = connector.config.get("SLACK_BOT_TOKEN") if not slack_token: + await task_logger.log_task_failure( + log_entry, + f"Slack token not found in connector config for connector {connector_id}", + "Missing Slack token", + {"error_type": "MissingToken"} + ) return 0, "Slack token not found in connector config" # Initialize Slack client + await task_logger.log_task_progress( + log_entry, + f"Initializing Slack client for connector {connector_id}", + {"stage": "client_initialization"} + ) + slack_client = SlackHistory(token=slack_token) # Calculate date range + await task_logger.log_task_progress( + log_entry, + f"Calculating date range for Slack indexing", + {"stage": "date_calculation", "provided_start_date": start_date, "provided_end_date": end_date} + ) + if start_date is None or end_date is None: # Fall back to calculating dates based on last_indexed_at calculated_end_date = datetime.now() @@ -95,13 +136,30 @@ async def index_slack_messages( logger.info(f"Indexing Slack messages from {start_date_str} to {end_date_str}") + await task_logger.log_task_progress( + log_entry, + f"Fetching Slack channels from {start_date_str} to {end_date_str}", + {"stage": "fetch_channels", "start_date": start_date_str, "end_date": end_date_str} + ) + # Get all channels try: channels = slack_client.get_all_channels() except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to get Slack channels for connector {connector_id}", + str(e), + {"error_type": "ChannelFetchError"} + ) return 0, f"Failed to get Slack channels: {str(e)}" if not channels: + await task_logger.log_task_success( + log_entry, + f"No Slack channels found for connector {connector_id}", + {"channels_found": 0} + ) return 0, "No Slack channels found" # Track the number of documents indexed @@ -109,6 +167,12 @@ async def index_slack_messages( documents_skipped = 0 skipped_channels = [] + await task_logger.log_task_progress( + log_entry, + f"Starting to process {len(channels)} Slack channels", + {"stage": "process_channels", "total_channels": len(channels)} + ) + # Process each channel for channel_obj in channels: # Modified loop to iterate over list of channel objects channel_id = channel_obj["id"] @@ -283,15 +347,40 @@ async def index_slack_messages( else: result_message = f"Processed {total_processed} channels." + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Slack indexing for connector {connector_id}", + { + "channels_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_channels_count": len(skipped_channels), + "result_message": result_message + } + ) + logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped") return total_processed, result_message except SQLAlchemyError as db_error: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Slack indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"} + ) logger.error(f"Database error: {str(db_error)}") return 0, f"Database error: {str(db_error)}" except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Slack messages for connector {connector_id}", + str(e), + {"error_type": type(e).__name__} + ) logger.error(f"Failed to index Slack messages: {str(e)}") return 0, f"Failed to index Slack messages: {str(e)}" @@ -316,8 +405,24 @@ async def index_notion_pages( Returns: Tuple containing (number of documents indexed, error message or None) """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="notion_pages_indexing", + source="connector_indexing_task", + message=f"Starting Notion pages indexing for connector {connector_id}", + metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + ) + try: # Get the connector + await task_logger.log_task_progress( + log_entry, + f"Retrieving Notion connector {connector_id} from database", + {"stage": "connector_retrieval"} + ) + result = await session.execute( select(SearchSourceConnector) .filter( @@ -328,14 +433,32 @@ async def index_notion_pages( connector = result.scalars().first() if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a Notion connector", + "Connector not found", + {"error_type": "ConnectorNotFound"} + ) return 0, f"Connector with ID {connector_id} not found or is not a Notion connector" # Get the Notion token from the connector config notion_token = connector.config.get("NOTION_INTEGRATION_TOKEN") if not notion_token: + await task_logger.log_task_failure( + log_entry, + f"Notion integration token not found in connector config for connector {connector_id}", + "Missing Notion token", + {"error_type": "MissingToken"} + ) return 0, "Notion integration token not found in connector config" # Initialize Notion client + await task_logger.log_task_progress( + log_entry, + f"Initializing Notion client for connector {connector_id}", + {"stage": "client_initialization"} + ) + logger.info(f"Initializing Notion client for connector {connector_id}") notion_client = NotionHistoryConnector(token=notion_token) @@ -364,15 +487,32 @@ async def index_notion_pages( logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}") + await task_logger.log_task_progress( + log_entry, + f"Fetching Notion pages from {start_date_iso} to {end_date_iso}", + {"stage": "fetch_pages", "start_date": start_date_iso, "end_date": end_date_iso} + ) + # Get all pages try: pages = notion_client.get_all_pages(start_date=start_date_iso, end_date=end_date_iso) logger.info(f"Found {len(pages)} Notion pages") except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to get Notion pages for connector {connector_id}", + str(e), + {"error_type": "PageFetchError"} + ) logger.error(f"Error fetching Notion pages: {str(e)}", exc_info=True) return 0, f"Failed to get Notion pages: {str(e)}" if not pages: + await task_logger.log_task_success( + log_entry, + f"No Notion pages found for connector {connector_id}", + {"pages_found": 0} + ) logger.info("No Notion pages found to index") return 0, "No Notion pages found" @@ -381,6 +521,12 @@ async def index_notion_pages( documents_skipped = 0 skipped_pages = [] + await task_logger.log_task_progress( + log_entry, + f"Starting to process {len(pages)} Notion pages", + {"stage": "process_pages", "total_pages": len(pages)} + ) + # Process each page for page in pages: try: @@ -552,15 +698,40 @@ async def index_notion_pages( else: result_message = f"Processed {total_processed} pages." + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Notion indexing for connector {connector_id}", + { + "pages_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_pages_count": len(skipped_pages), + "result_message": result_message + } + ) + logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped") return total_processed, result_message except SQLAlchemyError as db_error: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Notion indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"} + ) logger.error(f"Database error during Notion indexing: {str(db_error)}", exc_info=True) return 0, f"Database error: {str(db_error)}" except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Notion pages for connector {connector_id}", + str(e), + {"error_type": type(e).__name__} + ) logger.error(f"Failed to index Notion pages: {str(e)}", exc_info=True) return 0, f"Failed to index Notion pages: {str(e)}" @@ -585,11 +756,27 @@ async def index_github_repos( Returns: Tuple containing (number of documents indexed, error message or None) """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="github_repos_indexing", + source="connector_indexing_task", + message=f"Starting GitHub repositories indexing for connector {connector_id}", + metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + ) + documents_processed = 0 errors = [] try: # 1. Get the GitHub connector from the database + await task_logger.log_task_progress( + log_entry, + f"Retrieving GitHub connector {connector_id} from database", + {"stage": "connector_retrieval"} + ) + result = await session.execute( select(SearchSourceConnector) .filter( @@ -600,6 +787,12 @@ async def index_github_repos( connector = result.scalars().first() if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a GitHub connector", + "Connector not found", + {"error_type": "ConnectorNotFound"} + ) return 0, f"Connector with ID {connector_id} not found or is not a GitHub connector" # 2. Get the GitHub PAT and selected repositories from the connector config @@ -607,20 +800,50 @@ async def index_github_repos( repo_full_names_to_index = connector.config.get("repo_full_names") if not github_pat: + await task_logger.log_task_failure( + log_entry, + f"GitHub Personal Access Token (PAT) not found in connector config for connector {connector_id}", + "Missing GitHub PAT", + {"error_type": "MissingToken"} + ) return 0, "GitHub Personal Access Token (PAT) not found in connector config" if not repo_full_names_to_index or not isinstance(repo_full_names_to_index, list): - return 0, "'repo_full_names' not found or is not a list in connector config" + await task_logger.log_task_failure( + log_entry, + f"'repo_full_names' not found or is not a list in connector config for connector {connector_id}", + "Invalid repo configuration", + {"error_type": "InvalidConfiguration"} + ) + return 0, "'repo_full_names' not found or is not a list in connector config" # 3. Initialize GitHub connector client + await task_logger.log_task_progress( + log_entry, + f"Initializing GitHub client for connector {connector_id}", + {"stage": "client_initialization", "repo_count": len(repo_full_names_to_index)} + ) + try: github_client = GitHubConnector(token=github_pat) except ValueError as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to initialize GitHub client for connector {connector_id}", + str(e), + {"error_type": "ClientInitializationError"} + ) return 0, f"Failed to initialize GitHub client: {str(e)}" # 4. Validate selected repositories # For simplicity, we'll proceed with the list provided. # If a repo is inaccessible, get_repository_files will likely fail gracefully later. + await task_logger.log_task_progress( + log_entry, + f"Starting indexing for {len(repo_full_names_to_index)} selected repositories", + {"stage": "repo_processing", "repo_count": len(repo_full_names_to_index), "start_date": start_date, "end_date": end_date} + ) + logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.") if start_date and end_date: logger.info(f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)") @@ -719,13 +942,36 @@ async def index_github_repos( await session.commit() logger.info(f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files.") + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed GitHub indexing for connector {connector_id}", + { + "documents_processed": documents_processed, + "errors_count": len(errors), + "repo_count": len(repo_full_names_to_index) + } + ) + except SQLAlchemyError as db_err: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during GitHub indexing for connector {connector_id}", + str(db_err), + {"error_type": "SQLAlchemyError"} + ) logger.error(f"Database error during GitHub indexing for connector {connector_id}: {db_err}") errors.append(f"Database error: {db_err}") return documents_processed, "; ".join(errors) if errors else str(db_err) except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Unexpected error during GitHub indexing for connector {connector_id}", + str(e), + {"error_type": type(e).__name__} + ) logger.error(f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", exc_info=True) errors.append(f"Unexpected error: {e}") return documents_processed, "; ".join(errors) if errors else str(e) @@ -754,8 +1000,24 @@ async def index_linear_issues( Returns: Tuple containing (number of documents indexed, error message or None) """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="linear_issues_indexing", + source="connector_indexing_task", + message=f"Starting Linear issues indexing for connector {connector_id}", + metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + ) + try: # Get the connector + await task_logger.log_task_progress( + log_entry, + f"Retrieving Linear connector {connector_id} from database", + {"stage": "connector_retrieval"} + ) + result = await session.execute( select(SearchSourceConnector) .filter( @@ -766,14 +1028,32 @@ async def index_linear_issues( connector = result.scalars().first() if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a Linear connector", + "Connector not found", + {"error_type": "ConnectorNotFound"} + ) return 0, f"Connector with ID {connector_id} not found or is not a Linear connector" # Get the Linear token from the connector config linear_token = connector.config.get("LINEAR_API_KEY") if not linear_token: + await task_logger.log_task_failure( + log_entry, + f"Linear API token not found in connector config for connector {connector_id}", + "Missing Linear token", + {"error_type": "MissingToken"} + ) return 0, "Linear API token not found in connector config" # Initialize Linear client + await task_logger.log_task_progress( + log_entry, + f"Initializing Linear client for connector {connector_id}", + {"stage": "client_initialization"} + ) + linear_client = LinearConnector(token=linear_token) # Calculate date range @@ -807,6 +1087,12 @@ async def index_linear_issues( logger.info(f"Fetching Linear issues from {start_date_str} to {end_date_str}") + await task_logger.log_task_progress( + log_entry, + f"Fetching Linear issues from {start_date_str} to {end_date_str}", + {"stage": "fetch_issues", "start_date": start_date_str, "end_date": end_date_str} + ) + # Get issues within date range try: issues, error = linear_client.get_issues_by_date_range( @@ -855,6 +1141,12 @@ async def index_linear_issues( documents_skipped = 0 skipped_issues = [] + await task_logger.log_task_progress( + log_entry, + f"Starting to process {len(issues)} Linear issues", + {"stage": "process_issues", "total_issues": len(issues)} + ) + # Process each issue for issue in issues: try: @@ -959,16 +1251,39 @@ async def index_linear_issues( await session.commit() logger.info(f"Successfully committed all Linear document changes to database") + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Linear indexing for connector {connector_id}", + { + "issues_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_issues_count": len(skipped_issues) + } + ) logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped") return total_processed, None # Return None as the error message to indicate success except SQLAlchemyError as db_error: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Linear indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"} + ) logger.error(f"Database error: {str(db_error)}", exc_info=True) return 0, f"Database error: {str(db_error)}" except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Linear issues for connector {connector_id}", + str(e), + {"error_type": type(e).__name__} + ) logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True) return 0, f"Failed to index Linear issues: {str(e)}" @@ -993,8 +1308,24 @@ async def index_discord_messages( Returns: Tuple containing (number of documents indexed, error message or None) """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="discord_messages_indexing", + source="connector_indexing_task", + message=f"Starting Discord messages indexing for connector {connector_id}", + metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + ) + try: # Get the connector + await task_logger.log_task_progress( + log_entry, + f"Retrieving Discord connector {connector_id} from database", + {"stage": "connector_retrieval"} + ) + result = await session.execute( select(SearchSourceConnector) .filter( @@ -1005,16 +1336,34 @@ async def index_discord_messages( connector = result.scalars().first() if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a Discord connector", + "Connector not found", + {"error_type": "ConnectorNotFound"} + ) return 0, f"Connector with ID {connector_id} not found or is not a Discord connector" # Get the Discord token from the connector config discord_token = connector.config.get("DISCORD_BOT_TOKEN") if not discord_token: + await task_logger.log_task_failure( + log_entry, + f"Discord token not found in connector config for connector {connector_id}", + "Missing Discord token", + {"error_type": "MissingToken"} + ) return 0, "Discord token not found in connector config" logger.info(f"Starting Discord indexing for connector {connector_id}") # Initialize Discord client + await task_logger.log_task_progress( + log_entry, + f"Initializing Discord client for connector {connector_id}", + {"stage": "client_initialization"} + ) + discord_client = DiscordConnector(token=discord_token) # Calculate date range @@ -1054,6 +1403,12 @@ async def index_discord_messages( skipped_channels = [] try: + await task_logger.log_task_progress( + log_entry, + f"Starting Discord bot and fetching guilds for connector {connector_id}", + {"stage": "fetch_guilds"} + ) + logger.info("Starting Discord bot to fetch guilds") discord_client._bot_task = asyncio.create_task(discord_client.start_bot()) await discord_client._wait_until_ready() @@ -1062,15 +1417,32 @@ async def index_discord_messages( guilds = await discord_client.get_guilds() logger.info(f"Found {len(guilds)} guilds") except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to get Discord guilds for connector {connector_id}", + str(e), + {"error_type": "GuildFetchError"} + ) logger.error(f"Failed to get Discord guilds: {str(e)}", exc_info=True) await discord_client.close_bot() return 0, f"Failed to get Discord guilds: {str(e)}" if not guilds: + await task_logger.log_task_success( + log_entry, + f"No Discord guilds found for connector {connector_id}", + {"guilds_found": 0} + ) logger.info("No Discord guilds found to index") await discord_client.close_bot() return 0, "No Discord guilds found" # Process each guild and channel + await task_logger.log_task_progress( + log_entry, + f"Starting to process {len(guilds)} Discord guilds", + {"stage": "process_guilds", "total_guilds": len(guilds)} + ) + for guild in guilds: guild_id = guild["id"] guild_name = guild["name"] @@ -1242,14 +1614,40 @@ async def index_discord_messages( else: result_message = f"Processed {documents_indexed} channels." + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Discord indexing for connector {connector_id}", + { + "channels_processed": documents_indexed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_channels_count": len(skipped_channels), + "guilds_processed": len(guilds), + "result_message": result_message + } + ) + logger.info(f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped") return documents_indexed, result_message except SQLAlchemyError as db_error: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Discord indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"} + ) logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True) return 0, f"Database error: {str(db_error)}" except Exception as e: await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Discord messages for connector {connector_id}", + str(e), + {"error_type": type(e).__name__} + ) logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True) return 0, f"Failed to index Discord messages: {str(e)}" diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx index 1093621..d01d54a 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx @@ -134,7 +134,7 @@ export default function ConnectorsPage() { const endDateStr = endDate ? format(endDate, "yyyy-MM-dd") : undefined; await indexConnector(selectedConnectorForIndexing, searchSpaceId, startDateStr, endDateStr); - toast.success("Connector content indexed successfully"); + toast.success("Connector content indexing started"); } catch (error) { console.error("Error indexing connector content:", error); toast.error( @@ -155,7 +155,7 @@ export default function ConnectorsPage() { setIndexingConnectorId(connectorId); try { await indexConnector(connectorId, searchSpaceId); - toast.success("Connector content indexed successfully"); + toast.success("Connector content indexing started"); } catch (error) { console.error("Error indexing connector content:", error); toast.error(