From 23da404177657bff79cc45dd3f3a9c1ee65250af Mon Sep 17 00:00:00 2001 From: "DESKTOP-RTLN3BA\\$punk" Date: Wed, 26 Mar 2025 17:19:10 -0700 Subject: [PATCH] fix: Fixed Notion Reindexing & Updation --- .gitignore | 3 +- .../routes/search_source_connectors_routes.py | 10 +- .../app/tasks/connectors_indexing_tasks.py | 112 +++++++++++++----- 3 files changed, 87 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index 90a8dbb..78b66ad 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ data/ __pycache__ __pycache__/ .__pycache__ -surfsense_backend/.env \ No newline at end of file +surfsense_backend/.env +.flashrank_cache \ No newline at end of file diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 42669d3..d0b63d9 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -401,18 +401,18 @@ async def run_notion_indexing( """ try: # Index Notion pages without updating last_indexed_at (we'll do it separately) - documents_indexed, error_or_warning = await index_notion_pages( + documents_processed, error_or_warning = await index_notion_pages( session=session, connector_id=connector_id, search_space_id=search_space_id, update_last_indexed=False # Don't update timestamp in the indexing function ) - # Only update last_indexed_at if indexing was successful - if documents_indexed > 0 and (error_or_warning is None or "Indexed" in error_or_warning): + # Only update last_indexed_at if indexing was successful (either new docs or updated docs) + if documents_processed > 0: await update_connector_last_indexed(session, connector_id) - logger.info(f"Notion indexing completed successfully: {documents_indexed} documents indexed") + logger.info(f"Notion indexing completed successfully: {documents_processed} documents processed") else: - logger.error(f"Notion indexing failed or no documents indexed: {error_or_warning}") + logger.error(f"Notion indexing failed or no documents processed: {error_or_warning}") except Exception as e: logger.error(f"Error in background Notion indexing task: {str(e)}") \ No newline at end of file diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index 4e6d9b2..2fb8e8d 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -2,6 +2,7 @@ from typing import Optional, List, Dict, Any, Tuple from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.future import select +from sqlalchemy import delete from datetime import datetime, timedelta from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchSourceConnectorType from app.config import config @@ -62,7 +63,7 @@ async def index_slack_messages( # Check if last_indexed_at is today today = datetime.now().date() if connector.last_indexed_at.date() == today: - # If last indexed today, go back 1 day to ensure we don't miss anything + # If last indexed today, go back 7 day to ensure we don't miss anything start_date = end_date - timedelta(days=7) else: start_date = connector.last_indexed_at @@ -293,17 +294,8 @@ async def index_notion_pages( # Calculate date range end_date = datetime.now() - # Use last_indexed_at as start date if available, otherwise use 365 days ago - if connector.last_indexed_at: - # Check if last_indexed_at is today - today = datetime.now().date() - if connector.last_indexed_at.date() == today: - # If last indexed today, go back 1 day to ensure we don't miss anything - start_date = end_date - timedelta(days=1) - else: - start_date = connector.last_indexed_at - else: - start_date = end_date - timedelta(days=365) + # Check for last 1 year of pages + start_date = end_date - timedelta(days=365) # Format dates for Notion API (ISO format) start_date_str = start_date.strftime("%Y-%m-%dT%H:%M:%SZ") @@ -323,8 +315,28 @@ async def index_notion_pages( logger.info("No Notion pages found to index") return 0, "No Notion pages found" + # Get existing documents for this search space and connector type to prevent duplicates + existing_docs_result = await session.execute( + select(Document) + .filter( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.NOTION_CONNECTOR + ) + ) + existing_docs = existing_docs_result.scalars().all() + + # Create a lookup dictionary of existing documents by page_id + existing_docs_by_page_id = {} + for doc in existing_docs: + if "page_id" in doc.document_metadata: + existing_docs_by_page_id[doc.document_metadata["page_id"]] = doc + + logger.info(f"Found {len(existing_docs_by_page_id)} existing Notion documents in database") + # Track the number of documents indexed documents_indexed = 0 + documents_updated = 0 + documents_skipped = 0 skipped_pages = [] # Process each page @@ -339,6 +351,7 @@ async def index_notion_pages( if not page_content: logger.info(f"No content found in page {page_title}. Skipping.") skipped_pages.append(f"{page_title} (no content)") + documents_skipped += 1 continue # Convert page content to markdown format @@ -435,33 +448,66 @@ async def index_notion_pages( for chunk in config.chunker_instance.chunk(markdown_content) ] - # Create and store document - document = Document( - search_space_id=search_space_id, - title=f"Notion - {page_title}", - document_type=DocumentType.NOTION_CONNECTOR, - document_metadata={ + # Check if this page already exists in our database + existing_document = existing_docs_by_page_id.get(page_id) + + if existing_document: + # Update existing document instead of creating a new one + logger.info(f"Updating existing document for page {page_title}") + + # Update document fields + existing_document.title = f"Notion - {page_title}" + existing_document.document_metadata = { "page_title": page_title, "page_id": page_id, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks - ) - - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed Notion page: {page_title}") + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + existing_document.content = summary_content + existing_document.embedding = summary_embedding + + # Delete existing chunks and add new ones + await session.execute( + delete(Chunk) + .where(Chunk.document_id == existing_document.id) + ) + + # Assign new chunks to existing document + for chunk in chunks: + chunk.document_id = existing_document.id + session.add(chunk) + + documents_updated += 1 + else: + # Create and store new document + document = Document( + search_space_id=search_space_id, + title=f"Notion - {page_title}", + document_type=DocumentType.NOTION_CONNECTOR, + document_metadata={ + "page_title": page_title, + "page_id": page_id, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + }, + content=summary_content, + embedding=summary_embedding, + chunks=chunks + ) + + session.add(document) + documents_indexed += 1 + logger.info(f"Successfully indexed new Notion page: {page_title}") except Exception as e: logger.error(f"Error processing Notion page {page.get('title', 'Unknown')}: {str(e)}", exc_info=True) skipped_pages.append(f"{page.get('title', 'Unknown')} (processing error)") + documents_skipped += 1 continue # Skip this page and continue with others # Update the last_indexed_at timestamp for the connector only if requested # and if we successfully indexed at least one page - if update_last_indexed and documents_indexed > 0: + total_processed = documents_indexed + documents_updated + if update_last_indexed and total_processed > 0: connector.last_indexed_at = datetime.now() logger.info(f"Updated last_indexed_at for connector {connector_id}") @@ -471,10 +517,12 @@ async def index_notion_pages( # Prepare result message result_message = None if skipped_pages: - result_message = f"Indexed {documents_indexed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}" + result_message = f"Processed {total_processed} pages ({documents_indexed} new, {documents_updated} updated). Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}" + else: + result_message = f"Processed {total_processed} pages ({documents_indexed} new, {documents_updated} updated)." - logger.info(f"Notion indexing completed: {documents_indexed} pages indexed, {len(skipped_pages)} pages skipped") - return documents_indexed, result_message + logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_updated} updated, {documents_skipped} skipped") + return total_processed, result_message except SQLAlchemyError as db_error: await session.rollback()