From 69f6a0a2781442a33da014b5ad5db4f0592bd601 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Mon, 4 Aug 2025 20:26:04 +0200 Subject: [PATCH] fix scopes issues for google services --- .../routes/search_source_connectors_routes.py | 76 +++++++ .../app/tasks/connectors_indexing_tasks.py | 213 ++++++++++++------ 2 files changed, 225 insertions(+), 64 deletions(-) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 49f3128..d1f6108 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -41,6 +41,7 @@ from app.tasks.connector_indexers import ( index_discord_messages, index_github_repos, index_google_calendar_events, + index_google_gmail_messages, index_jira_issues, index_linear_issues, index_notion_pages, @@ -507,6 +508,22 @@ async def index_connector_content( indexing_to, ) response_message = "Google Calendar indexing started in the background." + elif ( + connector.connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR + ): + # Run indexing in background + logger.info( + f"Triggering Google Gmail indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_google_gmail_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) + response_message = "Google Gmail indexing started in the background." elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: # Run indexing in background @@ -1113,3 +1130,62 @@ async def run_google_calendar_indexing( exc_info=True, ) # Optionally update status in DB to indicate failure + + +async def run_google_gmail_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + max_messages: int, + days_back: int, +): + """Wrapper to run Google Gmail indexing with its own database session.""" + logger.info( + f"Background task started: Indexing Google Gmail connector {connector_id} into space {search_space_id} for {max_messages} messages from the last {days_back} days" + ) + async with async_session_maker() as session: + await run_google_gmail_indexing( + session, connector_id, search_space_id, user_id, max_messages, days_back + ) + logger.info( + f"Background task finished: Indexing Google Gmail connector {connector_id}" + ) + + +async def run_google_gmail_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + max_messages: int, + days_back: int, +): + """Runs the Google Gmail indexing task and updates the timestamp.""" + try: + indexed_count, error_message = await index_google_gmail_messages( + session, + connector_id, + search_space_id, + user_id, + max_messages, + days_back, + update_last_indexed=False, + ) + if error_message: + logger.error( + f"Google Gmail indexing failed for connector {connector_id}: {error_message}" + ) + # Optionally update status in DB to indicate failure + else: + logger.info( + f"Google Gmail indexing successful for connector {connector_id}. Indexed {indexed_count} documents." + ) + # Update the last indexed timestamp only on success + await update_connector_last_indexed(session, connector_id) + await session.commit() # Commit timestamp update + except Exception as e: + logger.error( + f"Critical error in run_google_gmail_indexing for connector {connector_id}: {e}", + exc_info=True, + ) + # Optionally update status in DB to indicate failure diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index 0c678b3..85007fc 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -3381,8 +3381,10 @@ async def index_google_gmail_messages( connector_id: int, search_space_id: int, user_id: str, + start_date: str | None = None, + end_date: str | None = None, + update_last_indexed: bool = True, max_messages: int = 100, - days_back: int = 30, ) -> tuple[int, str]: """ Index Gmail messages for a specific connector. @@ -3392,14 +3394,24 @@ async def index_google_gmail_messages( connector_id: ID of the Gmail connector search_space_id: ID of the search space user_id: ID of the user + start_date: Start date for filtering messages (YYYY-MM-DD format) + end_date: End date for filtering messages (YYYY-MM-DD format) + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) max_messages: Maximum number of messages to fetch (default: 100) - days_back: Number of days to look back (default: 30) Returns: Tuple of (number_of_indexed_messages, status_message) """ task_logger = TaskLoggingService(session, search_space_id) + # Calculate days back based on start_date + if start_date: + try: + start_date_obj = datetime.strptime(start_date, "%Y-%m-%d") + days_back = (datetime.now() - start_date_obj).days + except ValueError: + days_back = 30 # Default to 30 days if start_date is invalid + # Log task start log_entry = await task_logger.log_task_start( task_name="google_gmail_messages_indexing", @@ -3426,8 +3438,8 @@ async def index_google_gmail_messages( if not connector: error_msg = f"Gmail connector with ID {connector_id} not found" - await task_logger.log_task_completion( - log_entry.id, "FAILED", error_msg, {"error_type": "ConnectorNotFound"} + await task_logger.log_task_failure( + log_entry, error_msg, {"error_type": "ConnectorNotFound"} ) return 0, error_msg @@ -3442,31 +3454,53 @@ async def index_google_gmail_messages( scopes=config_data.get("scopes", []), ) - # Initialize Gmail connector + if ( + not credentials.client_id + or not credentials.client_secret + or not credentials.refresh_token + ): + await task_logger.log_task_failure( + log_entry, + f"Google gmail credentials not found in connector config for connector {connector_id}", + "Missing Google gmail credentials", + {"error_type": "MissingCredentials"}, + ) + return 0, "Google gmail credentials not found in connector config" + + # Initialize Google gmail client + await task_logger.log_task_progress( + log_entry, + f"Initializing Google gmail client for connector {connector_id}", + {"stage": "client_initialization"}, + ) + + # Initialize Google gmail connector gmail_connector = GoogleGmailConnector(credentials) - # Fetch recent messages - logger.info(f"Fetching recent Gmail messages for connector {connector_id}") + # Fetch recent Google gmail messages + logger.info(f"Fetching recent emails for connector {connector_id}") messages, error = gmail_connector.get_recent_messages( max_results=max_messages, days_back=days_back ) if error: - await task_logger.log_task_completion( - log_entry.id, "FAILED", f"Failed to fetch messages: {error}", {} + await task_logger.log_task_failure( + log_entry, f"Failed to fetch messages: {error}", {} ) return 0, f"Failed to fetch Gmail messages: {error}" if not messages: - success_msg = "No Gmail messages found in the specified date range" - await task_logger.log_task_completion( - log_entry.id, "SUCCESS", success_msg, {"messages_count": 0} + success_msg = "No Google gmail messages found in the specified date range" + await task_logger.log_task_success( + log_entry, success_msg, {"messages_count": 0} ) return 0, success_msg - logger.info(f"Found {len(messages)} Gmail messages to index") + logger.info(f"Found {len(messages)} Google gmail messages to index") - indexed_count = 0 + documents_indexed = 0 + skipped_messages = [] + documents_skipped = 0 for message in messages: try: # Extract message information @@ -3491,23 +3525,58 @@ async def index_google_gmail_messages( elif name == "date": date_str = value - # Check if document already exists - existing_doc_result = await session.execute( - select(Document).filter( - Document.search_space_id == search_space_id, - Document.document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR, - Document.document_metadata["message_id"].astext == message_id, - ) - ) - existing_doc = existing_doc_result.scalars().first() - - if existing_doc: - logger.info(f"Gmail message {message_id} already indexed, skipping") + if not message_id: + logger.warning(f"Skipping message with missing ID: {subject}") + skipped_messages.append(f"{subject} (missing ID)") + documents_skipped += 1 continue # Format message to markdown markdown_content = gmail_connector.format_message_to_markdown(message) + if not markdown_content.strip(): + logger.warning(f"Skipping message with no content: {subject}") + skipped_messages.append(f"{subject} (no content)") + documents_skipped += 1 + continue + + # Create a simple summary + summary_content = f"Google Gmail Message: {subject}\n\n" + summary_content += f"Sender: {sender}\n" + summary_content += f"Date: {date_str}\n" + + # Generate content hash + content_hash = generate_content_hash(markdown_content, search_space_id) + + # Check if document already exists + existing_doc_by_hash_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document_by_hash = ( + existing_doc_by_hash_result.scalars().first() + ) + + if existing_document_by_hash: + logger.info( + f"Document with content hash {content_hash} already exists for message {message_id}. Skipping processing." + ) + documents_skipped += 1 + continue + + # Generate embedding for the summary + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = [ + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) + for chunk in config.chunker_instance.chunk(markdown_content) + ] + # Create and store new document logger.info(f"Creating new document for Gmail message: {subject}") document = Document( @@ -3523,56 +3592,72 @@ async def index_google_gmail_messages( "connector_id": connector_id, }, content=markdown_content, + content_hash=content_hash, + embedding=summary_embedding, + chunks=chunks, ) session.add(document) - await session.flush() - - # Create chunks for the document - chunks = config.chunker_instance.chunk(markdown_content) - for i, chunk_text in enumerate(chunks): - chunk = Chunk( - document_id=document.id, - content=chunk_text, - chunk_index=i, - embedding=config.embedding_model_instance.embed_query( - chunk_text - ), - ) - session.add(chunk) - - indexed_count += 1 - logger.info(f"Successfully indexed Gmail message: {subject}") + documents_indexed += 1 + logger.info(f"Successfully indexed new email {summary_content}") except Exception as e: logger.error( - f"Error indexing Gmail message {message_id}: {e!s}", exc_info=True + f"Error processing the email {message_id}: {e!s}", + exc_info=True, ) - continue + skipped_messages.append(f"{subject} (processing error)") + documents_skipped += 1 + continue # Skip this message and continue with others + + # Update the last_indexed_at timestamp for the connector only if requested + total_processed = documents_indexed + if update_last_indexed: + connector.last_indexed_at = datetime.now() + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") # Commit all changes await session.commit() - - # Update connector's last_indexed_at timestamp - connector.last_indexed_at = datetime.now(UTC) - await session.commit() - - success_msg = f"Successfully indexed {indexed_count} Gmail messages" - await task_logger.log_task_completion( - log_entry.id, - "SUCCESS", - success_msg, - {"indexed_count": indexed_count, "total_messages": len(messages)}, + logger.info( + "Successfully committed all Google gmail document changes to database" ) - logger.info(success_msg) - return indexed_count, success_msg + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Google gmail indexing for connector {connector_id}", + { + "events_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_messages_count": len(skipped_messages), + }, + ) + + logger.info( + f"Google gmail indexing completed: {documents_indexed} new emails, {documents_skipped} skipped" + ) + return ( + total_processed, + None, + ) # Return None as the error message to indicate success + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Google gmail indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, f"Database error: {db_error!s}" except Exception as e: - await task_logger.log_task_completion( - log_entry.id, - "FAILED", - f"Failed to index Gmail messages for connector {connector_id}", + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Google gmail emails for connector {connector_id}", str(e), {"error_type": type(e).__name__}, ) - logger.error(f"Failed to index Gmail messages: {e!s}", exc_info=True) - return 0, f"Failed to index Gmail messages: {e!s}" + logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True) + return 0, f"Failed to index Google gmail emails: {e!s}"