diff --git a/surfsense_backend/alembic/versions/1_add_github_connector_enum.py b/surfsense_backend/alembic/versions/1_add_github_connector_enum.py index bb72838..1902777 100644 --- a/surfsense_backend/alembic/versions/1_add_github_connector_enum.py +++ b/surfsense_backend/alembic/versions/1_add_github_connector_enum.py @@ -2,7 +2,6 @@ Revision ID: 1 Revises: -Create Date: 2023-10-27 10:00:00.000000 """ from typing import Sequence, Union diff --git a/surfsense_backend/alembic/versions/2_add_linear_connector_enum.py b/surfsense_backend/alembic/versions/2_add_linear_connector_enum.py index d3527d3..526c7c3 100644 --- a/surfsense_backend/alembic/versions/2_add_linear_connector_enum.py +++ b/surfsense_backend/alembic/versions/2_add_linear_connector_enum.py @@ -2,7 +2,6 @@ Revision ID: 2 Revises: e55302644c51 -Create Date: 2025-04-16 10:00:00.000000 """ from typing import Sequence, Union diff --git a/surfsense_backend/alembic/versions/3_add_linear_connector_to_documenttype_.py b/surfsense_backend/alembic/versions/3_add_linear_connector_to_documenttype_.py index ab50d85..e71ee2e 100644 --- a/surfsense_backend/alembic/versions/3_add_linear_connector_to_documenttype_.py +++ b/surfsense_backend/alembic/versions/3_add_linear_connector_to_documenttype_.py @@ -2,7 +2,6 @@ Revision ID: 3 Revises: 2 -Create Date: 2025-04-16 10:05:00.059921 """ from typing import Sequence, Union diff --git a/surfsense_backend/alembic/versions/4_add_linkup_api_enum.py b/surfsense_backend/alembic/versions/4_add_linkup_api_enum.py index 8ccfac2..093bdf0 100644 --- a/surfsense_backend/alembic/versions/4_add_linkup_api_enum.py +++ b/surfsense_backend/alembic/versions/4_add_linkup_api_enum.py @@ -2,7 +2,6 @@ Revision ID: 4 Revises: 3 -Create Date: 2025-04-18 10:00:00.000000 """ from typing import Sequence, Union diff --git a/surfsense_backend/alembic/versions/5_remove_title_char_limit.py b/surfsense_backend/alembic/versions/5_remove_title_char_limit.py index 57ed108..62fe019 100644 --- a/surfsense_backend/alembic/versions/5_remove_title_char_limit.py +++ b/surfsense_backend/alembic/versions/5_remove_title_char_limit.py @@ -2,7 +2,6 @@ Revision ID: 5 Revises: 4 -Create Date: 2023-06-10 00:00:00.000000 """ from typing import Sequence, Union diff --git a/surfsense_backend/alembic/versions/6_change_podcast_content_to_transcript.py b/surfsense_backend/alembic/versions/6_change_podcast_content_to_transcript.py index 991948f..fa7a0f8 100644 --- a/surfsense_backend/alembic/versions/6_change_podcast_content_to_transcript.py +++ b/surfsense_backend/alembic/versions/6_change_podcast_content_to_transcript.py @@ -2,7 +2,6 @@ Revision ID: 6 Revises: 5 -Create Date: 2023-08-15 00:00:00.000000 """ from typing import Sequence, Union diff --git a/surfsense_backend/alembic/versions/7_remove_is_generated_column.py b/surfsense_backend/alembic/versions/7_remove_is_generated_column.py index c5d25ad..03048a1 100644 --- a/surfsense_backend/alembic/versions/7_remove_is_generated_column.py +++ b/surfsense_backend/alembic/versions/7_remove_is_generated_column.py @@ -2,7 +2,6 @@ Revision ID: 7 Revises: 6 -Create Date: 2023-08-15 01:00:00.000000 """ from typing import Sequence, Union diff --git a/surfsense_backend/alembic/versions/8_add_content_hash_to_documents.py b/surfsense_backend/alembic/versions/8_add_content_hash_to_documents.py new file mode 100644 index 0000000..64982fc --- /dev/null +++ b/surfsense_backend/alembic/versions/8_add_content_hash_to_documents.py @@ -0,0 +1,56 @@ +"""Add content_hash column to documents table + +Revision ID: 8 +Revises: 7 +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '8' +down_revision: Union[str, None] = '7' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Add content_hash column as nullable first to handle existing data + op.add_column('documents', sa.Column('content_hash', sa.String(), nullable=True)) + + # Update existing documents to generate content hashes + # Using SHA-256 hash of the content column with proper UTF-8 encoding + op.execute(""" + UPDATE documents + SET content_hash = encode(sha256(convert_to(content, 'UTF8')), 'hex') + WHERE content_hash IS NULL + """) + + # Handle duplicate content hashes by keeping only the oldest document for each hash + # Delete newer documents with duplicate content hashes + op.execute(""" + DELETE FROM documents + WHERE id NOT IN ( + SELECT MIN(id) + FROM documents + GROUP BY content_hash + ) + """) + + # Now alter the column to match the model: nullable=False, index=True, unique=True + op.alter_column('documents', 'content_hash', + existing_type=sa.String(), + nullable=False) + op.create_index(op.f('ix_documents_content_hash'), 'documents', ['content_hash'], unique=False) + op.create_unique_constraint(op.f('uq_documents_content_hash'), 'documents', ['content_hash']) + + +def downgrade() -> None: + # Remove constraints and index first + op.drop_constraint(op.f('uq_documents_content_hash'), 'documents', type_='unique') + op.drop_index(op.f('ix_documents_content_hash'), table_name='documents') + + # Remove content_hash column from documents table + op.drop_column('documents', 'content_hash') \ No newline at end of file diff --git a/surfsense_backend/alembic/versions/e55302644c51_add_github_connector_to_documenttype_.py b/surfsense_backend/alembic/versions/e55302644c51_add_github_connector_to_documenttype_.py index 1f15912..12d6537 100644 --- a/surfsense_backend/alembic/versions/e55302644c51_add_github_connector_to_documenttype_.py +++ b/surfsense_backend/alembic/versions/e55302644c51_add_github_connector_to_documenttype_.py @@ -2,7 +2,6 @@ Revision ID: e55302644c51 Revises: 1 -Create Date: 2025-04-13 19:56:00.059921 """ from typing import Sequence, Union diff --git a/surfsense_backend/app/connectors/linear_connector.py b/surfsense_backend/app/connectors/linear_connector.py index be9a1a4..52b7704 100644 --- a/surfsense_backend/app/connectors/linear_connector.py +++ b/surfsense_backend/app/connectors/linear_connector.py @@ -6,7 +6,7 @@ Allows fetching issue lists and their comments with date range filtering. """ import requests -from datetime import datetime, timedelta +from datetime import datetime from typing import Dict, List, Optional, Tuple, Any, Union diff --git a/surfsense_backend/app/connectors/slack_history.py b/surfsense_backend/app/connectors/slack_history.py index 67e5403..effb9c8 100644 --- a/surfsense_backend/app/connectors/slack_history.py +++ b/surfsense_backend/app/connectors/slack_history.py @@ -8,7 +8,7 @@ Allows fetching channel lists and message history with date range filtering. import os from slack_sdk import WebClient from slack_sdk.errors import SlackApiError -from datetime import datetime, timedelta +from datetime import datetime from typing import Dict, List, Optional, Tuple, Any, Union diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 10f78a5..7ee5663 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -99,6 +99,7 @@ class Document(BaseModel, TimestampMixin): document_metadata = Column(JSON, nullable=True) content = Column(Text, nullable=False) + content_hash = Column(String, nullable=False, index=True, unique=True) embedding = Column(Vector(config.embedding_model_instance.dimension)) search_space_id = Column(Integer, ForeignKey("searchspaces.id", ondelete='CASCADE'), nullable=False) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index ff5cce1..15c8150 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -21,7 +21,7 @@ from app.utils.check_ownership import check_ownership from pydantic import BaseModel, Field, ValidationError from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues from app.connectors.github_connector import GitHubConnector -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta import logging # Set up logging diff --git a/surfsense_backend/app/tasks/background_tasks.py b/surfsense_backend/app/tasks/background_tasks.py index 2e5e361..1c6cd6d 100644 --- a/surfsense_backend/app/tasks/background_tasks.py +++ b/surfsense_backend/app/tasks/background_tasks.py @@ -1,12 +1,13 @@ from typing import Optional, List from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.future import select from app.db import Document, DocumentType, Chunk from app.schemas import ExtensionDocumentContent from app.config import config from app.prompts import SUMMARY_PROMPT_TEMPLATE from datetime import datetime -from app.utils.document_converters import convert_document_to_markdown +from app.utils.document_converters import convert_document_to_markdown, generate_content_hash from langchain_core.documents import Document as LangChainDocument from langchain_community.document_loaders import FireCrawlLoader, AsyncChromiumLoader from langchain_community.document_transformers import MarkdownifyTransformer @@ -14,7 +15,6 @@ import validators from youtube_transcript_api import YouTubeTranscriptApi from urllib.parse import urlparse, parse_qs import aiohttp -from app.db import Document as DB_Document, DocumentType as DB_DocumentType import logging md = MarkdownifyTransformer() @@ -73,6 +73,17 @@ async def add_crawled_url_document( document_parts.append("") combined_document_string = "\n".join(document_parts) + content_hash = generate_content_hash(combined_document_string) + + # Check if document with this content hash already exists + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document = existing_doc_result.scalars().first() + + if existing_document: + logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") + return existing_document # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance @@ -102,6 +113,7 @@ async def add_crawled_url_document( content=summary_content, embedding=summary_embedding, chunks=chunks, + content_hash=content_hash, ) session.add(document) @@ -163,6 +175,17 @@ async def add_extension_received_document( document_parts.append("") combined_document_string = "\n".join(document_parts) + content_hash = generate_content_hash(combined_document_string) + + # Check if document with this content hash already exists + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document = existing_doc_result.scalars().first() + + if existing_document: + logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") + return existing_document # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance @@ -190,6 +213,7 @@ async def add_extension_received_document( content=summary_content, embedding=summary_embedding, chunks=chunks, + content_hash=content_hash, ) session.add(document) @@ -210,6 +234,18 @@ async def add_received_markdown_file_document( session: AsyncSession, file_name: str, file_in_markdown: str, search_space_id: int ) -> Optional[Document]: try: + content_hash = generate_content_hash(file_in_markdown) + + # Check if document with this content hash already exists + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document = existing_doc_result.scalars().first() + + if existing_document: + logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") + return existing_document + # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance summary_result = await summary_chain.ainvoke({"document": file_in_markdown}) @@ -237,6 +273,7 @@ async def add_received_markdown_file_document( content=summary_content, embedding=summary_embedding, chunks=chunks, + content_hash=content_hash, ) session.add(document) @@ -263,6 +300,18 @@ async def add_received_file_document( unstructured_processed_elements ) + content_hash = generate_content_hash(file_in_markdown) + + # Check if document with this content hash already exists + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document = existing_doc_result.scalars().first() + + if existing_document: + logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") + return existing_document + # TODO: Check if file_markdown exceeds token limit of embedding model # Generate summary @@ -292,6 +341,7 @@ async def add_received_file_document( content=summary_content, embedding=summary_embedding, chunks=chunks, + content_hash=content_hash, ) session.add(document) @@ -404,6 +454,17 @@ async def add_youtube_video_document( document_parts.append("") combined_document_string = "\n".join(document_parts) + content_hash = generate_content_hash(combined_document_string) + + # Check if document with this content hash already exists + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document = existing_doc_result.scalars().first() + + if existing_document: + logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") + return existing_document # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance @@ -424,9 +485,9 @@ async def add_youtube_video_document( # Create document - document = DB_Document( + document = Document( title=video_data.get("title", "YouTube Video"), - document_type=DB_DocumentType.YOUTUBE_VIDEO, + document_type=DocumentType.YOUTUBE_VIDEO, document_metadata={ "url": url, "video_id": video_id, @@ -438,6 +499,7 @@ async def add_youtube_video_document( embedding=summary_embedding, chunks=chunks, search_space_id=search_space_id, + content_hash=content_hash, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index 94643a4..a36e14d 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -14,6 +14,8 @@ from app.connectors.linear_connector import LinearConnector from slack_sdk.errors import SlackApiError import logging +from app.utils.document_converters import generate_content_hash + # Set up logging logger = logging.getLogger(__name__) @@ -67,13 +69,13 @@ async def index_slack_messages( # Check if last_indexed_at is in the future or after end_date if last_indexed_naive > end_date: - logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 30 days ago instead.") - start_date = end_date - timedelta(days=30) + logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.") + start_date = end_date - timedelta(days=365) else: start_date = last_indexed_naive logger.info(f"Using last_indexed_at ({start_date.strftime('%Y-%m-%d')}) as start date") else: - start_date = end_date - timedelta(days=30) # Use 30 days instead of 365 to catch recent issues + start_date = end_date - timedelta(days=365) # Use 365 days as default logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (30 days ago) as start date") # Format dates for Slack API @@ -89,27 +91,8 @@ async def index_slack_messages( if not channels: return 0, "No Slack channels found" - # Get existing documents for this search space and connector type to prevent duplicates - existing_docs_result = await session.execute( - select(Document) - .filter( - Document.search_space_id == search_space_id, - Document.document_type == DocumentType.SLACK_CONNECTOR - ) - ) - existing_docs = existing_docs_result.scalars().all() - - # Create a lookup dictionary of existing documents by channel_id - existing_docs_by_channel_id = {} - for doc in existing_docs: - if "channel_id" in doc.document_metadata: - existing_docs_by_channel_id[doc.document_metadata["channel_id"]] = doc - - logger.info(f"Found {len(existing_docs_by_channel_id)} existing Slack documents in database") - # Track the number of documents indexed documents_indexed = 0 - documents_updated = 0 documents_skipped = 0 skipped_channels = [] @@ -189,10 +172,9 @@ async def index_slack_messages( ("METADATA", [ f"CHANNEL_NAME: {channel_name}", f"CHANNEL_ID: {channel_id}", - f"START_DATE: {start_date_str}", - f"END_DATE: {end_date_str}", - f"MESSAGE_COUNT: {len(formatted_messages)}", - f"INDEXED_AT: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + # f"START_DATE: {start_date_str}", + # f"END_DATE: {end_date_str}", + f"MESSAGE_COUNT: {len(formatted_messages)}" ]), ("CONTENT", [ "FORMAT: markdown", @@ -213,6 +195,18 @@ async def index_slack_messages( document_parts.append("") combined_document_string = '\n'.join(document_parts) + content_hash = generate_content_hash(combined_document_string) + + # Check if document with this content hash already exists + existing_doc_by_hash_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document_by_hash = existing_doc_by_hash_result.scalars().first() + + if existing_document_by_hash: + logger.info(f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing.") + documents_skipped += 1 + continue # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance @@ -226,61 +220,28 @@ async def index_slack_messages( for chunk in config.chunker_instance.chunk(channel_content) ] - # Check if this channel already exists in our database - existing_document = existing_docs_by_channel_id.get(channel_id) - - if existing_document: - # Update existing document instead of creating a new one - logger.info(f"Updating existing document for channel {channel_name}") - - # Update document fields - existing_document.title = f"Slack - {channel_name}" - existing_document.document_metadata = { + # Create and store new document + document = Document( + search_space_id=search_space_id, + title=f"Slack - {channel_name}", + document_type=DocumentType.SLACK_CONNECTOR, + document_metadata={ "channel_name": channel_name, "channel_id": channel_id, "start_date": start_date_str, "end_date": end_date_str, "message_count": len(formatted_messages), - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - existing_document.content = summary_content - existing_document.embedding = summary_embedding - - # Delete existing chunks and add new ones - await session.execute( - delete(Chunk) - .where(Chunk.document_id == existing_document.id) - ) - - # Assign new chunks to existing document - for chunk in chunks: - chunk.document_id = existing_document.id - session.add(chunk) - - documents_updated += 1 - else: - # Create and store new document - document = Document( - search_space_id=search_space_id, - title=f"Slack - {channel_name}", - document_type=DocumentType.SLACK_CONNECTOR, - document_metadata={ - "channel_name": channel_name, - "channel_id": channel_id, - "start_date": start_date_str, - "end_date": end_date_str, - "message_count": len(formatted_messages), - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks - ) - - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages") + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + }, + content=summary_content, + embedding=summary_embedding, + chunks=chunks, + content_hash=content_hash, + ) + + session.add(document) + documents_indexed += 1 + logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages") except SlackApiError as slack_error: logger.error(f"Slack API error for channel {channel_name}: {str(slack_error)}") @@ -295,7 +256,7 @@ async def index_slack_messages( # Update the last_indexed_at timestamp for the connector only if requested # and if we successfully indexed at least one channel - total_processed = documents_indexed + documents_updated + total_processed = documents_indexed if update_last_indexed and total_processed > 0: connector.last_indexed_at = datetime.now() @@ -305,11 +266,11 @@ async def index_slack_messages( # Prepare result message result_message = None if skipped_channels: - result_message = f"Processed {total_processed} channels ({documents_indexed} new, {documents_updated} updated). Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" + result_message = f"Processed {total_processed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" else: - result_message = f"Processed {total_processed} channels ({documents_indexed} new, {documents_updated} updated)." + result_message = f"Processed {total_processed} channels." - logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_updated} updated, {documents_skipped} skipped") + logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped") return total_processed, result_message except SQLAlchemyError as db_error: @@ -386,27 +347,8 @@ async def index_notion_pages( logger.info("No Notion pages found to index") return 0, "No Notion pages found" - # Get existing documents for this search space and connector type to prevent duplicates - existing_docs_result = await session.execute( - select(Document) - .filter( - Document.search_space_id == search_space_id, - Document.document_type == DocumentType.NOTION_CONNECTOR - ) - ) - existing_docs = existing_docs_result.scalars().all() - - # Create a lookup dictionary of existing documents by page_id - existing_docs_by_page_id = {} - for doc in existing_docs: - if "page_id" in doc.document_metadata: - existing_docs_by_page_id[doc.document_metadata["page_id"]] = doc - - logger.info(f"Found {len(existing_docs_by_page_id)} existing Notion documents in database") - # Track the number of documents indexed documents_indexed = 0 - documents_updated = 0 documents_skipped = 0 skipped_pages = [] @@ -482,8 +424,7 @@ async def index_notion_pages( metadata_sections = [ ("METADATA", [ f"PAGE_TITLE: {page_title}", - f"PAGE_ID: {page_id}", - f"INDEXED_AT: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + f"PAGE_ID: {page_id}" ]), ("CONTENT", [ "FORMAT: markdown", @@ -504,6 +445,18 @@ async def index_notion_pages( document_parts.append("") combined_document_string = '\n'.join(document_parts) + content_hash = generate_content_hash(combined_document_string) + + # Check if document with this content hash already exists + existing_doc_by_hash_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document_by_hash = existing_doc_by_hash_result.scalars().first() + + if existing_document_by_hash: + logger.info(f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing.") + documents_skipped += 1 + continue # Generate summary logger.debug(f"Generating summary for page {page_title}") @@ -519,55 +472,25 @@ async def index_notion_pages( for chunk in config.chunker_instance.chunk(markdown_content) ] - # Check if this page already exists in our database - existing_document = existing_docs_by_page_id.get(page_id) - - if existing_document: - # Update existing document instead of creating a new one - logger.info(f"Updating existing document for page {page_title}") - - # Update document fields - existing_document.title = f"Notion - {page_title}" - existing_document.document_metadata = { + # Create and store new document + document = Document( + search_space_id=search_space_id, + title=f"Notion - {page_title}", + document_type=DocumentType.NOTION_CONNECTOR, + document_metadata={ "page_title": page_title, "page_id": page_id, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - existing_document.content = summary_content - existing_document.embedding = summary_embedding - - # Delete existing chunks and add new ones - await session.execute( - delete(Chunk) - .where(Chunk.document_id == existing_document.id) - ) - - # Assign new chunks to existing document - for chunk in chunks: - chunk.document_id = existing_document.id - session.add(chunk) - - documents_updated += 1 - else: - # Create and store new document - document = Document( - search_space_id=search_space_id, - title=f"Notion - {page_title}", - document_type=DocumentType.NOTION_CONNECTOR, - document_metadata={ - "page_title": page_title, - "page_id": page_id, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks - ) - - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new Notion page: {page_title}") + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + }, + content=summary_content, + content_hash=content_hash, + embedding=summary_embedding, + chunks=chunks + ) + + session.add(document) + documents_indexed += 1 + logger.info(f"Successfully indexed new Notion page: {page_title}") except Exception as e: logger.error(f"Error processing Notion page {page.get('title', 'Unknown')}: {str(e)}", exc_info=True) @@ -577,7 +500,7 @@ async def index_notion_pages( # Update the last_indexed_at timestamp for the connector only if requested # and if we successfully indexed at least one page - total_processed = documents_indexed + documents_updated + total_processed = documents_indexed if update_last_indexed and total_processed > 0: connector.last_indexed_at = datetime.now() logger.info(f"Updated last_indexed_at for connector {connector_id}") @@ -588,11 +511,11 @@ async def index_notion_pages( # Prepare result message result_message = None if skipped_pages: - result_message = f"Processed {total_processed} pages ({documents_indexed} new, {documents_updated} updated). Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}" + result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}" else: - result_message = f"Processed {total_processed} pages ({documents_indexed} new, {documents_updated} updated)." + result_message = f"Processed {total_processed} pages." - logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_updated} updated, {documents_skipped} skipped") + logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped") return total_processed, result_message except SQLAlchemyError as db_error: @@ -660,19 +583,6 @@ async def index_github_repos( # If a repo is inaccessible, get_repository_files will likely fail gracefully later. logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.") - # 5. Get existing documents for this search space and connector type to prevent duplicates - existing_docs_result = await session.execute( - select(Document) - .filter( - Document.search_space_id == search_space_id, - Document.document_type == DocumentType.GITHUB_CONNECTOR - ) - ) - existing_docs = existing_docs_result.scalars().all() - # Create a lookup dict: key=repo_fullname/file_path, value=Document object - existing_docs_lookup = {doc.document_metadata.get("full_path"): doc for doc in existing_docs if doc.document_metadata.get("full_path")} - logger.info(f"Found {len(existing_docs_lookup)} existing GitHub documents in database for search space {search_space_id}") - # 6. Iterate through selected repositories and index files for repo_full_name in repo_full_names_to_index: if not repo_full_name or not isinstance(repo_full_name, str): @@ -699,12 +609,6 @@ async def index_github_repos( logger.warning(f"Skipping file with missing info in {repo_full_name}: {file_info}") continue - # Check if document already exists and if content hash matches - existing_doc = existing_docs_lookup.get(full_path_key) - if existing_doc and existing_doc.document_metadata.get("sha") == file_sha: - logger.debug(f"Skipping unchanged file: {full_path_key}") - continue # Skip if SHA matches (content hasn't changed) - # Get file content file_content = github_client.get_file_content(repo_full_name, file_path) @@ -712,6 +616,18 @@ async def index_github_repos( logger.warning(f"Could not retrieve content for {full_path_key}. Skipping.") continue # Skip if content fetch failed + content_hash = generate_content_hash(file_content) + + # Check if document with this content hash already exists + existing_doc_by_hash_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document_by_hash = existing_doc_by_hash_result.scalars().first() + + if existing_document_by_hash: + logger.info(f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing.") + continue + # Use file_content directly for chunking, maybe summary for main content? # For now, let's use the full content for both, might need refinement summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary @@ -738,42 +654,20 @@ async def index_github_repos( "indexed_at": datetime.now(timezone.utc).isoformat() } - if existing_doc: - # Update existing document - logger.info(f"Updating document for file: {full_path_key}") - existing_doc.title = f"GitHub - {file_path}" - existing_doc.document_metadata = doc_metadata - existing_doc.content = summary_content # Update summary - existing_doc.embedding = summary_embedding # Update embedding - - # Delete old chunks - await session.execute( - delete(Chunk) - .where(Chunk.document_id == existing_doc.id) - ) - # Add new chunks - for chunk_obj in chunks_data: - chunk_obj.document_id = existing_doc.id - session.add(chunk_obj) - - documents_processed += 1 - else: - # Create new document - logger.info(f"Creating new document for file: {full_path_key}") - document = Document( - title=f"GitHub - {file_path}", - document_type=DocumentType.GITHUB_CONNECTOR, - document_metadata=doc_metadata, - content=summary_content, # Store summary - embedding=summary_embedding, - search_space_id=search_space_id, - chunks=chunks_data # Associate chunks directly - ) - session.add(document) - documents_processed += 1 - - # Commit periodically or at the end? For now, commit per repo - # await session.commit() + # Create new document + logger.info(f"Creating new document for file: {full_path_key}") + document = Document( + title=f"GitHub - {file_path}", + document_type=DocumentType.GITHUB_CONNECTOR, + document_metadata=doc_metadata, + content=summary_content, # Store summary + content_hash=content_hash, + embedding=summary_embedding, + search_space_id=search_space_id, + chunks=chunks_data # Associate chunks directly + ) + session.add(document) + documents_processed += 1 except Exception as repo_err: logger.error(f"Failed to process repository {repo_full_name}: {repo_err}") @@ -847,14 +741,14 @@ async def index_linear_issues( # Check if last_indexed_at is in the future or after end_date if last_indexed_naive > end_date: - logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 30 days ago instead.") - start_date = end_date - timedelta(days=30) + logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.") + start_date = end_date - timedelta(days=365) else: start_date = last_indexed_naive logger.info(f"Using last_indexed_at ({start_date.strftime('%Y-%m-%d')}) as start date") else: - start_date = end_date - timedelta(days=30) # Use 30 days instead of 365 to catch recent issues - logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (30 days ago) as start date") + start_date = end_date - timedelta(days=365) # Use 365 days as default + logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (365 days ago) as start date") # Format dates for Linear API start_date_str = start_date.strftime("%Y-%m-%d") @@ -905,35 +799,8 @@ async def index_linear_issues( if len(issues) > 10: logger.info(f" ...and {len(issues) - 10} more issues") - # Get existing documents for this search space and connector type to prevent duplicates - existing_docs_result = await session.execute( - select(Document) - .filter( - Document.search_space_id == search_space_id, - Document.document_type == DocumentType.LINEAR_CONNECTOR - ) - ) - existing_docs = existing_docs_result.scalars().all() - - # Create a lookup dictionary of existing documents by issue_id - existing_docs_by_issue_id = {} - for doc in existing_docs: - if "issue_id" in doc.document_metadata: - existing_docs_by_issue_id[doc.document_metadata["issue_id"]] = doc - - logger.info(f"Found {len(existing_docs_by_issue_id)} existing Linear documents in database") - - # Log existing document IDs for debugging - if existing_docs_by_issue_id: - logger.info("Existing Linear document issue IDs in database:") - for idx, (issue_id, doc) in enumerate(list(existing_docs_by_issue_id.items())[:10]): # Log first 10 - logger.info(f" {idx+1}. {issue_id} - {doc.document_metadata.get('issue_identifier', 'Unknown')} - {doc.document_metadata.get('issue_title', 'Unknown')}") - if len(existing_docs_by_issue_id) > 10: - logger.info(f" ...and {len(existing_docs_by_issue_id) - 10} more existing documents") - # Track the number of documents indexed documents_indexed = 0 - documents_updated = 0 documents_skipped = 0 skipped_issues = [] @@ -979,6 +846,19 @@ async def index_linear_issues( comment_count = len(formatted_issue.get("comments", [])) summary_content += f"Comments: {comment_count}" + content_hash = generate_content_hash(issue_content) + + # Check if document with this content hash already exists + existing_doc_by_hash_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document_by_hash = existing_doc_by_hash_result.scalars().first() + + if existing_document_by_hash: + logger.info(f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing.") + documents_skipped += 1 + continue + # Generate embedding for the summary summary_embedding = config.embedding_model_instance.embed(summary_content) @@ -988,62 +868,29 @@ async def index_linear_issues( for chunk in config.chunker_instance.chunk(issue_content) ] - # Check if this issue already exists in our database - existing_document = existing_docs_by_issue_id.get(issue_id) - - if existing_document: - # Update existing document instead of creating a new one - logger.info(f"Updating existing document for issue {issue_identifier} - {issue_title}") - - # Update document fields - existing_document.title = f"Linear - {issue_identifier}: {issue_title}" - existing_document.document_metadata = { + # Create and store new document + logger.info(f"Creating new document for issue {issue_identifier} - {issue_title}") + document = Document( + search_space_id=search_space_id, + title=f"Linear - {issue_identifier}: {issue_title}", + document_type=DocumentType.LINEAR_CONNECTOR, + document_metadata={ "issue_id": issue_id, "issue_identifier": issue_identifier, "issue_title": issue_title, "state": state, "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - existing_document.content = summary_content - existing_document.embedding = summary_embedding - - # Delete existing chunks and add new ones - await session.execute( - delete(Chunk) - .where(Chunk.document_id == existing_document.id) - ) - - # Assign new chunks to existing document - for chunk in chunks: - chunk.document_id = existing_document.id - session.add(chunk) - - documents_updated += 1 - else: - # Create and store new document - logger.info(f"Creating new document for issue {issue_identifier} - {issue_title}") - document = Document( - search_space_id=search_space_id, - title=f"Linear - {issue_identifier}: {issue_title}", - document_type=DocumentType.LINEAR_CONNECTOR, - document_metadata={ - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": state, - "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks - ) - - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new issue {issue_identifier} - {issue_title}") + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + }, + content=summary_content, + content_hash=content_hash, + embedding=summary_embedding, + chunks=chunks + ) + + session.add(document) + documents_indexed += 1 + logger.info(f"Successfully indexed new issue {issue_identifier} - {issue_title}") except Exception as e: logger.error(f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}", exc_info=True) @@ -1052,7 +899,7 @@ async def index_linear_issues( continue # Skip this issue and continue with others # Update the last_indexed_at timestamp for the connector only if requested - total_processed = documents_indexed + documents_updated + total_processed = documents_indexed if update_last_indexed: connector.last_indexed_at = datetime.now() logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") @@ -1062,7 +909,7 @@ async def index_linear_issues( logger.info(f"Successfully committed all Linear document changes to database") - logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_updated} updated, {documents_skipped} skipped") + logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped") return total_processed, None # Return None as the error message to indicate success except SQLAlchemyError as db_error: diff --git a/surfsense_backend/app/utils/document_converters.py b/surfsense_backend/app/utils/document_converters.py index 0c9b8f7..a6f69e4 100644 --- a/surfsense_backend/app/utils/document_converters.py +++ b/surfsense_backend/app/utils/document_converters.py @@ -1,19 +1,22 @@ +import hashlib + + async def convert_element_to_markdown(element) -> str: """ Convert an Unstructured element to markdown format based on its category. - + Args: element: The Unstructured API element object - + Returns: str: Markdown formatted string """ element_category = element.metadata["category"] content = element.page_content - + if not content: return "" - + markdown_mapping = { "Formula": lambda x: f"```math\n{x}\n```", "FigureCaption": lambda x: f"*Figure: {x}*", @@ -31,7 +34,7 @@ async def convert_element_to_markdown(element) -> str: "PageNumber": lambda x: f"*Page {x}*\n\n", "UncategorizedText": lambda x: f"{x}\n\n" } - + converter = markdown_mapping.get(element_category, lambda x: x) return converter(content) @@ -39,29 +42,30 @@ async def convert_element_to_markdown(element) -> str: async def convert_document_to_markdown(elements): """ Convert all document elements to markdown. - + Args: elements: List of Unstructured API elements - + Returns: str: Complete markdown document """ markdown_parts = [] - + for element in elements: markdown_text = await convert_element_to_markdown(element) if markdown_text: markdown_parts.append(markdown_text) - + return "".join(markdown_parts) + def convert_chunks_to_langchain_documents(chunks): """ Convert chunks from hybrid search results to LangChain Document objects. - + Args: chunks: List of chunk dictionaries from hybrid search results - + Returns: List of LangChain Document objects """ @@ -71,20 +75,20 @@ def convert_chunks_to_langchain_documents(chunks): raise ImportError( "LangChain is not installed. Please install it with `pip install langchain langchain-core`" ) - + langchain_docs = [] - + for chunk in chunks: # Extract content from the chunk content = chunk.get("content", "") - + # Create metadata dictionary metadata = { "chunk_id": chunk.get("chunk_id"), "score": chunk.get("score"), "rank": chunk.get("rank") if "rank" in chunk else None, } - + # Add document information to metadata if "document" in chunk: doc = chunk["document"] @@ -93,24 +97,25 @@ def convert_chunks_to_langchain_documents(chunks): "document_title": doc.get("title"), "document_type": doc.get("document_type"), }) - + # Add document metadata if available if "metadata" in doc: # Prefix document metadata keys to avoid conflicts - doc_metadata = {f"doc_meta_{k}": v for k, v in doc.get("metadata", {}).items()} + doc_metadata = {f"doc_meta_{k}": v for k, + v in doc.get("metadata", {}).items()} metadata.update(doc_metadata) - + # Add source URL if available in metadata if "url" in doc.get("metadata", {}): metadata["source"] = doc["metadata"]["url"] elif "sourceURL" in doc.get("metadata", {}): metadata["source"] = doc["metadata"]["sourceURL"] - + # Ensure source_id is set for citation purposes # Use document_id as the source_id if available if "document_id" in metadata: metadata["source_id"] = metadata["document_id"] - + # Update content for citation mode - format as XML with explicit source_id new_content = f""" @@ -124,13 +129,18 @@ def convert_chunks_to_langchain_documents(chunks): """ - + # Create LangChain Document langchain_doc = LangChainDocument( page_content=new_content, metadata=metadata ) - + langchain_docs.append(langchain_doc) - + return langchain_docs + + +def generate_content_hash(content: str) -> str: + """Generate SHA-256 hash for the given content.""" + return hashlib.sha256(content.encode('utf-8')).hexdigest() diff --git a/surfsense_backend/draw.py b/surfsense_backend/draw.py deleted file mode 100644 index ec55f79..0000000 --- a/surfsense_backend/draw.py +++ /dev/null @@ -1,5 +0,0 @@ -from app.agents.researcher.graph import graph as researcher_graph -from app.agents.researcher.sub_section_writer.graph import graph as sub_section_writer_graph - -print(researcher_graph.get_graph().draw_mermaid()) -print(sub_section_writer_graph.get_graph().draw_mermaid()) \ No newline at end of file