diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 8711859..a080fd3 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -513,7 +513,7 @@ async def process_file_in_background( @router.get("/documents/", response_model=list[DocumentRead]) async def read_documents( skip: int = 0, - limit: int = 300, + limit: int = 3000, search_space_id: int | None = None, session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), diff --git a/surfsense_backend/app/tasks/connector_indexers/base.py b/surfsense_backend/app/tasks/connector_indexers/base.py index 3ad2faf..28cd206 100644 --- a/surfsense_backend/app/tasks/connector_indexers/base.py +++ b/surfsense_backend/app/tasks/connector_indexers/base.py @@ -8,9 +8,7 @@ from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select -from app.config import config from app.db import ( - Chunk, Document, SearchSourceConnector, SearchSourceConnectorType, @@ -39,25 +37,6 @@ async def check_duplicate_document_by_hash( return existing_doc_result.scalars().first() -async def create_document_chunks(content: str) -> list[Chunk]: - """ - Create chunks from document content. - - Args: - content: Document content to chunk - - Returns: - List of Chunk objects with embeddings - """ - return [ - Chunk( - content=chunk.text, - embedding=config.embedding_model_instance.embed(chunk.text), - ) - for chunk in config.chunker_instance.chunk(content) - ] - - async def get_connector_by_id( session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType ) -> SearchSourceConnector | None: diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index 54197ee..3120fcb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -10,12 +10,16 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.clickup_connector import ClickUpConnector from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -217,10 +221,34 @@ async def index_clickup_tasks( documents_skipped += 1 continue - # Embedding and chunks - summary_embedding = config.embedding_model_instance.embed( - task_content - ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm(session, user_id) + + if user_llm: + document_metadata = { + "task_id": task_id, + "task_name": task_name, + "task_status": task_status, + "task_priority": task_priority, + "task_list": task_list_name, + "task_space": task_space_name, + "assignees": len(task_assignees), + "document_type": "ClickUp Task", + "connector_type": "ClickUp", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + task_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = task_content + summary_embedding = config.embedding_model_instance.embed( + task_content + ) + chunks = await create_document_chunks(task_content) document = Document( @@ -238,7 +266,7 @@ async def index_clickup_tasks( "task_updated": task_updated, "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, - content=task_content, + content=summary_content, content_hash=content_hash, embedding=summary_embedding, chunks=chunks, diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 57d19da..6259929 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -10,13 +10,17 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.confluence_connector import ConfluenceConnector from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( calculate_date_range, check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -213,21 +217,6 @@ async def index_confluence_pages( documents_skipped += 1 continue - # Create a simple summary - summary_content = ( - f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n" - ) - if page_content: - # Take first 500 characters of content for summary - content_preview = page_content[:500] - if len(page_content) > 500: - content_preview += "..." - summary_content += f"Content Preview: {content_preview}\n\n" - - # Add comment count - comment_count = len(comments) - summary_content += f"Comments: {comment_count}" - # Generate content hash content_hash = generate_content_hash(full_content, search_space_id) @@ -243,10 +232,40 @@ async def index_confluence_pages( documents_skipped += 1 continue - # Generate embedding for the summary - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm(session, user_id) + comment_count = len(comments) + + if user_llm: + document_metadata = { + "page_title": page_title, + "page_id": page_id, + "space_id": space_id, + "comment_count": comment_count, + "document_type": "Confluence Page", + "connector_type": "Confluence", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + full_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = ( + f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n" + ) + if page_content: + # Take first 500 characters of content for summary + content_preview = page_content[:500] + if len(page_content) > 500: + content_preview += "..." + summary_content += f"Content Preview: {content_preview}\n\n" + summary_content += f"Comments: {comment_count}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) # Process chunks - using the full page content with comments chunks = await create_document_chunks(full_content) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 602cce4..c538f12 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -8,18 +8,19 @@ from datetime import UTC, datetime, timedelta from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.config import config from app.connectors.discord_connector import DiscordConnector from app.db import Document, DocumentType, SearchSourceConnectorType -from app.prompts import SUMMARY_PROMPT_TEMPLATE from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( build_document_metadata_string, check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -335,14 +336,19 @@ async def index_discord_messages( documents_skipped += 1 continue - # Generate summary using summary_chain - summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm - summary_result = await summary_chain.ainvoke( - {"document": combined_document_string} - ) - summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed( - summary_content + # Generate summary with metadata + document_metadata = { + "guild_name": guild_name, + "channel_name": channel_name, + "message_count": len(formatted_messages), + "document_type": "Discord Channel Messages", + "connector_type": "Discord", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + combined_document_string, user_llm, document_metadata ) # Chunks from channel content diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index b9dd3ec..ba01e39 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -10,12 +10,16 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.github_connector import GitHubConnector from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, ) @@ -208,12 +212,34 @@ async def index_github_repos( ) continue - # Use file_content directly for chunking, maybe summary for main content? - # For now, let's use the full content for both, might need refinement - summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm(session, user_id) + if user_llm: + # Extract file extension from file path + file_extension = ( + file_path.split(".")[-1] if "." in file_path else None + ) + document_metadata = { + "file_path": full_path_key, + "repository": repo_full_name, + "file_type": file_extension or "unknown", + "document_type": "GitHub Repository File", + "connector_type": "GitHub", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + file_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = ( + f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) # Chunk the content try: diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 37f6362..d3ac800 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -11,11 +11,15 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.google_calendar_connector import GoogleCalendarConnector from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -237,18 +241,6 @@ async def index_google_calendar_events( location = event.get("location", "") description = event.get("description", "") - summary_content = f"Google Calendar Event: {event_summary}\n\n" - summary_content += f"Calendar: {calendar_id}\n" - summary_content += f"Start: {start_time}\n" - summary_content += f"End: {end_time}\n" - if location: - summary_content += f"Location: {location}\n" - if description: - desc_preview = description[:300] - if len(description) > 300: - desc_preview += "..." - summary_content += f"Description: {desc_preview}\n" - content_hash = generate_content_hash(event_markdown, search_space_id) # Duplicate check via simple query using helper in base @@ -266,10 +258,42 @@ async def index_google_calendar_events( documents_skipped += 1 continue - # Embeddings and chunks - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm(session, user_id) + + if user_llm: + document_metadata = { + "event_id": event_id, + "event_summary": event_summary, + "calendar_id": calendar_id, + "start_time": start_time, + "end_time": end_time, + "location": location or "No location", + "document_type": "Google Calendar Event", + "connector_type": "Google Calendar", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + event_markdown, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Google Calendar Event: {event_summary}\n\n" + summary_content += f"Calendar: {calendar_id}\n" + summary_content += f"Start: {start_time}\n" + summary_content += f"End: {end_time}\n" + if location: + summary_content += f"Location: {location}\n" + if description: + desc_preview = description[:300] + if len(description) > 300: + desc_preview += "..." + summary_content += f"Description: {desc_preview}\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(event_markdown) document = Document( diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 68e29e3..ac85afd 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -15,12 +15,16 @@ from app.db import ( DocumentType, SearchSourceConnectorType, ) +from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -186,11 +190,6 @@ async def index_google_gmail_messages( documents_skipped += 1 continue - # Create a simple summary - summary_content = f"Google Gmail Message: {subject}\n\n" - summary_content += f"Sender: {sender}\n" - summary_content += f"Date: {date_str}\n" - # Generate content hash content_hash = generate_content_hash(markdown_content, search_space_id) @@ -206,10 +205,33 @@ async def index_google_gmail_messages( documents_skipped += 1 continue - # Generate embedding for the summary - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm(session, user_id) + + if user_llm: + document_metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "document_type": "Gmail Message", + "connector_type": "Google Gmail", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Google Gmail Message: {subject}\n\n" + summary_content += f"Sender: {sender}\n" + summary_content += f"Date: {date_str}\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) # Process chunks chunks = await create_document_chunks(markdown_content) @@ -228,7 +250,7 @@ async def index_google_gmail_messages( "date": date_str, "connector_id": connector_id, }, - content=markdown_content, + content=summary_content, content_hash=content_hash, embedding=summary_embedding, chunks=chunks, diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 75b20f9..c199fae 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -10,13 +10,17 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.jira_connector import JiraConnector from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( calculate_date_range, check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -196,17 +200,6 @@ async def index_jira_issues( documents_skipped += 1 continue - # Create a simple summary - summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n" - if formatted_issue.get("description"): - summary_content += ( - f"Description: {formatted_issue.get('description')}\n\n" - ) - - # Add comment count - comment_count = len(formatted_issue.get("comments", [])) - summary_content += f"Comments: {comment_count}" - # Generate content hash content_hash = generate_content_hash(issue_content, search_space_id) @@ -222,10 +215,37 @@ async def index_jira_issues( documents_skipped += 1 continue - # Generate embedding for the summary - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm(session, user_id) + comment_count = len(formatted_issue.get("comments", [])) + + if user_llm: + document_metadata = { + "issue_key": issue_identifier, + "issue_title": issue_title, + "status": formatted_issue.get("status", "Unknown"), + "priority": formatted_issue.get("priority", "Unknown"), + "comment_count": comment_count, + "document_type": "Jira Issue", + "connector_type": "Jira", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + issue_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n" + if formatted_issue.get("description"): + summary_content += ( + f"Description: {formatted_issue.get('description')}\n\n" + ) + summary_content += f"Comments: {comment_count}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) # Process chunks - using the full issue content with comments chunks = await create_document_chunks(issue_content) diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index dfe0aff..6ca1453 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -10,13 +10,17 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.linear_connector import LinearConnector from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( calculate_date_range, check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -209,22 +213,6 @@ async def index_linear_issues( documents_skipped += 1 continue - # Create a short summary for the embedding - state = formatted_issue.get("state", "Unknown") - description = formatted_issue.get("description", "") - # Truncate description if it's too long for the summary - if description and len(description) > 500: - description = description[:497] + "..." - - # Create a simple summary from the issue data - summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n" - if description: - summary_content += f"Description: {description}\n\n" - - # Add comment count - comment_count = len(formatted_issue.get("comments", [])) - summary_content += f"Comments: {comment_count}" - content_hash = generate_content_hash(issue_content, search_space_id) # Check if document with this content hash already exists @@ -239,10 +227,40 @@ async def index_linear_issues( documents_skipped += 1 continue - # Generate embedding for the summary - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm(session, user_id) + state = formatted_issue.get("state", "Unknown") + description = formatted_issue.get("description", "") + comment_count = len(formatted_issue.get("comments", [])) + + if user_llm: + document_metadata = { + "issue_id": issue_identifier, + "issue_title": issue_title, + "state": state, + "priority": formatted_issue.get("priority", "Unknown"), + "comment_count": comment_count, + "document_type": "Linear Issue", + "connector_type": "Linear", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + issue_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + # Truncate description if it's too long for the summary + if description and len(description) > 500: + description = description[:497] + "..." + summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n" + if description: + summary_content += f"Description: {description}\n\n" + summary_content += f"Comments: {comment_count}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) # Process chunks - using the full issue content with comments chunks = await create_document_chunks(issue_content) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index fbd769e..6267ad9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -7,18 +7,19 @@ from datetime import datetime, timedelta from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.config import config from app.connectors.notion_history import NotionHistoryConnector from app.db import Document, DocumentType, SearchSourceConnectorType -from app.prompts import SUMMARY_PROMPT_TEMPLATE from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( build_document_metadata_string, check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -302,15 +303,16 @@ async def index_notion_pages( documents_skipped += 1 continue - # Generate summary + # Generate summary with metadata logger.debug(f"Generating summary for page {page_title}") - summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm - summary_result = await summary_chain.ainvoke( - {"document": combined_document_string} - ) - summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed( - summary_content + document_metadata = { + "page_title": page_title, + "page_id": page_id, + "document_type": "Notion Page", + "connector_type": "Notion", + } + summary_content, summary_embedding = await generate_document_summary( + markdown_content, user_llm, document_metadata ) # Process chunks diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index 78b1308..fcf3253 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -8,19 +8,20 @@ from slack_sdk.errors import SlackApiError from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.config import config from app.connectors.slack_history import SlackHistory from app.db import Document, DocumentType, SearchSourceConnectorType -from app.prompts import SUMMARY_PROMPT_TEMPLATE from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( build_document_metadata_string, calculate_date_range, check_duplicate_document_by_hash, - create_document_chunks, get_connector_by_id, logger, update_connector_last_indexed, @@ -289,14 +290,16 @@ async def index_slack_messages( documents_skipped += 1 continue - # Generate summary - summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm - summary_result = await summary_chain.ainvoke( - {"document": combined_document_string} - ) - summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed( - summary_content + # Generate summary with metadata + document_metadata = { + "channel_name": channel_name, + "channel_id": channel_id, + "message_count": len(formatted_messages), + "document_type": "Slack Channel Messages", + "connector_type": "Slack", + } + summary_content, summary_embedding = await generate_document_summary( + combined_document_string, user_llm, document_metadata ) # Process chunks diff --git a/surfsense_backend/app/tasks/document_processors/base.py b/surfsense_backend/app/tasks/document_processors/base.py index fa0a5ae..d5b1722 100644 --- a/surfsense_backend/app/tasks/document_processors/base.py +++ b/surfsense_backend/app/tasks/document_processors/base.py @@ -6,9 +6,7 @@ from langchain_community.document_transformers import MarkdownifyTransformer from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select -from app.config import config -from app.db import Chunk, Document -from app.prompts import SUMMARY_PROMPT_TEMPLATE +from app.db import Document # Initialize markdown transformer md = MarkdownifyTransformer() @@ -31,44 +29,3 @@ async def check_duplicate_document( select(Document).where(Document.content_hash == content_hash) ) return existing_doc_result.scalars().first() - - -async def create_document_chunks(content: str) -> list[Chunk]: - """ - Create chunks from document content. - - Args: - content: Document content to chunk - - Returns: - List of Chunk objects with embeddings - """ - return [ - Chunk( - content=chunk.text, - embedding=config.embedding_model_instance.embed(chunk.text), - ) - for chunk in config.chunker_instance.chunk(content) - ] - - -async def generate_document_summary( - content: str, user_llm, document_title: str = "" -) -> tuple[str, list[float]]: - """ - Generate summary and embedding for document content. - - Args: - content: Document content - user_llm: User's LLM instance - document_title: Optional document title for context - - Returns: - Tuple of (summary_content, summary_embedding) - """ - summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm - summary_result = await summary_chain.ainvoke({"document": content}) - summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed(summary_content) - - return summary_content, summary_embedding diff --git a/surfsense_backend/app/tasks/document_processors/extension_processor.py b/surfsense_backend/app/tasks/document_processors/extension_processor.py index 492b76d..8f84331 100644 --- a/surfsense_backend/app/tasks/document_processors/extension_processor.py +++ b/surfsense_backend/app/tasks/document_processors/extension_processor.py @@ -11,12 +11,14 @@ from app.db import Document, DocumentType from app.schemas import ExtensionDocumentContent from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( check_duplicate_document, - create_document_chunks, - generate_document_summary, ) @@ -106,9 +108,18 @@ async def add_extension_received_document( if not user_llm: raise RuntimeError(f"No long context LLM configured for user {user_id}") - # Generate summary + # Generate summary with metadata + document_metadata = { + "session_id": content.metadata.BrowsingSessionId, + "url": content.metadata.VisitedWebPageURL, + "title": content.metadata.VisitedWebPageTitle, + "referrer": content.metadata.VisitedWebPageReffererURL, + "timestamp": content.metadata.VisitedWebPageDateWithTimeInISOString, + "duration_ms": content.metadata.VisitedWebPageVisitDurationInMilliseconds, + "document_type": "Browser Extension Capture", + } summary_content, summary_embedding = await generate_document_summary( - combined_document_string, user_llm + combined_document_string, user_llm, document_metadata ) # Process chunks diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 34403e5..3803f4b 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -12,13 +12,13 @@ from app.db import Document, DocumentType from app.services.llm_service import get_user_long_context_llm from app.utils.document_converters import ( convert_document_to_markdown, + create_document_chunks, generate_content_hash, + generate_document_summary, ) from .base import ( check_duplicate_document, - create_document_chunks, - generate_document_summary, ) @@ -64,9 +64,14 @@ async def add_received_file_document_using_unstructured( if not user_llm: raise RuntimeError(f"No long context LLM configured for user {user_id}") - # Generate summary + # Generate summary with metadata + document_metadata = { + "file_name": file_name, + "etl_service": "UNSTRUCTURED", + "document_type": "File Document", + } summary_content, summary_embedding = await generate_document_summary( - file_in_markdown, user_llm + file_in_markdown, user_llm, document_metadata ) # Process chunks @@ -139,9 +144,14 @@ async def add_received_file_document_using_llamacloud( if not user_llm: raise RuntimeError(f"No long context LLM configured for user {user_id}") - # Generate summary + # Generate summary with metadata + document_metadata = { + "file_name": file_name, + "etl_service": "LLAMACLOUD", + "document_type": "File Document", + } summary_content, summary_embedding = await generate_document_summary( - file_in_markdown, user_llm + file_in_markdown, user_llm, document_metadata ) # Process chunks @@ -224,9 +234,30 @@ async def add_received_file_document_using_docling( content=file_in_markdown, llm=user_llm, document_title=file_name ) + # Enhance summary with metadata + document_metadata = { + "file_name": file_name, + "etl_service": "DOCLING", + "document_type": "File Document", + } + metadata_parts = [] + metadata_parts.append("# DOCUMENT METADATA") + + for key, value in document_metadata.items(): + if value: # Only include non-empty values + formatted_key = key.replace("_", " ").title() + metadata_parts.append(f"**{formatted_key}:** {value}") + + metadata_section = "\n".join(metadata_parts) + enhanced_summary_content = ( + f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}" + ) + from app.config import config - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_embedding = config.embedding_model_instance.embed( + enhanced_summary_content + ) # Process chunks chunks = await create_document_chunks(file_in_markdown) @@ -240,7 +271,7 @@ async def add_received_file_document_using_docling( "FILE_NAME": file_name, "ETL_SERVICE": "DOCLING", }, - content=summary_content, + content=enhanced_summary_content, embedding=summary_embedding, chunks=chunks, content_hash=content_hash, diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index c7c8c75..493b046 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -10,12 +10,14 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import Document, DocumentType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( check_duplicate_document, - create_document_chunks, - generate_document_summary, ) @@ -77,9 +79,13 @@ async def add_received_markdown_file_document( if not user_llm: raise RuntimeError(f"No long context LLM configured for user {user_id}") - # Generate summary + # Generate summary with metadata + document_metadata = { + "file_name": file_name, + "document_type": "Markdown File Document", + } summary_content, summary_embedding = await generate_document_summary( - file_in_markdown, user_llm + file_in_markdown, user_llm, document_metadata ) # Process chunks diff --git a/surfsense_backend/app/tasks/document_processors/url_crawler.py b/surfsense_backend/app/tasks/document_processors/url_crawler.py index 85d1bc0..eddcda3 100644 --- a/surfsense_backend/app/tasks/document_processors/url_crawler.py +++ b/surfsense_backend/app/tasks/document_processors/url_crawler.py @@ -13,12 +13,14 @@ from app.config import config from app.db import Document, DocumentType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( check_duplicate_document, - create_document_chunks, - generate_document_summary, md, ) @@ -170,8 +172,15 @@ async def add_crawled_url_document( {"stage": "summary_generation"}, ) + # Generate summary with metadata + document_metadata = { + "url": url, + "title": url_crawled[0].metadata.get("title", url), + "document_type": "Crawled URL Document", + "crawler_type": type(crawl_loader).__name__, + } summary_content, summary_embedding = await generate_document_summary( - combined_document_string, user_llm + combined_document_string, user_llm, document_metadata ) # Process chunks diff --git a/surfsense_backend/app/tasks/document_processors/youtube_processor.py b/surfsense_backend/app/tasks/document_processors/youtube_processor.py index f75ce0a..e918204 100644 --- a/surfsense_backend/app/tasks/document_processors/youtube_processor.py +++ b/surfsense_backend/app/tasks/document_processors/youtube_processor.py @@ -13,12 +13,14 @@ from youtube_transcript_api import YouTubeTranscriptApi from app.db import Document, DocumentType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import generate_content_hash +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) from .base import ( check_duplicate_document, - create_document_chunks, - generate_document_summary, ) @@ -242,8 +244,18 @@ async def add_youtube_video_document( {"stage": "summary_generation"}, ) + # Generate summary with metadata + document_metadata = { + "url": url, + "video_id": video_id, + "title": video_data.get("title", "YouTube Video"), + "author": video_data.get("author_name", "Unknown"), + "thumbnail": video_data.get("thumbnail_url", ""), + "document_type": "YouTube Video Document", + "has_transcript": "No captions available" not in transcript_text, + } summary_content, summary_embedding = await generate_document_summary( - combined_document_string, user_llm + combined_document_string, user_llm, document_metadata ) # Process chunks diff --git a/surfsense_backend/app/utils/document_converters.py b/surfsense_backend/app/utils/document_converters.py index 3b23f54..dc0f636 100644 --- a/surfsense_backend/app/utils/document_converters.py +++ b/surfsense_backend/app/utils/document_converters.py @@ -1,5 +1,73 @@ import hashlib +from app.config import config +from app.db import Chunk +from app.prompts import SUMMARY_PROMPT_TEMPLATE + + +async def generate_document_summary( + content: str, + user_llm, + document_metadata: dict | None = None, + document_title: str = "", +) -> tuple[str, list[float]]: + """ + Generate summary and embedding for document content with metadata. + + Args: + content: Document content + user_llm: User's LLM instance + document_metadata: Optional metadata dictionary to include in summary + document_title: Optional document title for context (deprecated, use metadata) + + Returns: + Tuple of (enhanced_summary_content, summary_embedding) + """ + summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm + content_with_metadata = f"\n\n{document_metadata}\n\n\n\n\n\n{content}\n\n" + summary_result = await summary_chain.ainvoke({"document": content_with_metadata}) + summary_content = summary_result.content + + # Combine summary with metadata if provided + if document_metadata: + metadata_parts = [] + metadata_parts.append("# DOCUMENT METADATA") + + for key, value in document_metadata.items(): + if value: # Only include non-empty values + formatted_key = key.replace("_", " ").title() + metadata_parts.append(f"**{formatted_key}:** {value}") + + metadata_section = "\n".join(metadata_parts) + enhanced_summary_content = ( + f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}" + ) + else: + enhanced_summary_content = summary_content + + summary_embedding = config.embedding_model_instance.embed(enhanced_summary_content) + + return enhanced_summary_content, summary_embedding + + +async def create_document_chunks(content: str) -> list[Chunk]: + """ + Create chunks from document content. + + Args: + content: Document content to chunk + + Returns: + List of Chunk objects with embeddings + """ + return [ + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) + for chunk in config.chunker_instance.chunk(content) + ] + async def convert_element_to_markdown(element) -> str: """