From cd05a06a9152f790d1ccbe8f3fe24442e7c9c396 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 24 Jul 2025 11:52:21 +0200 Subject: [PATCH] update connector indexing / update connector service --- .../app/agents/researcher/nodes.py | 54 +- .../agents/researcher/qna_agent/prompts.py | 2 + .../app/agents/researcher/utils.py | 4 + .../app/connectors/test_jira_connector.py | 218 +++ .../routes/search_source_connectors_routes.py | 52 +- .../app/schemas/search_source_connector.py | 13 + .../app/services/connector_service.py | 636 ++++---- .../app/tasks/connectors_indexing_tasks.py | 1297 +++++++++++------ 8 files changed, 1544 insertions(+), 732 deletions(-) create mode 100644 surfsense_backend/app/connectors/test_jira_connector.py diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 30d572a..0fb9dc3 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -172,20 +172,41 @@ async def fetch_documents_by_ids( channel_id = metadata.get('channel_id', '') guild_id = metadata.get('guild_id', '') message_date = metadata.get('start_date', '') - + title = f"Discord: {channel_name}" if message_date: title += f" ({message_date})" - + description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content - + if guild_id and channel_id: url = f"https://discord.com/channels/{guild_id}/{channel_id}" elif channel_id: url = f"https://discord.com/channels/@me/{channel_id}" else: url = "" - + + elif doc_type == "JIRA_CONNECTOR": + # Extract Jira-specific metadata + issue_key = metadata.get('issue_key', 'Unknown Issue') + issue_title = metadata.get('issue_title', 'Untitled Issue') + status = metadata.get('status', '') + priority = metadata.get('priority', '') + issue_type = metadata.get('issue_type', '') + + title = f"Jira: {issue_key} - {issue_title}" + if status: + title += f" ({status})" + + description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content + + # Construct Jira URL if we have the base URL + base_url = metadata.get('base_url', '') + if base_url and issue_key: + url = f"{base_url}/browse/{issue_key}" + else: + url = "" + elif doc_type == "EXTENSION": # Extract Extension-specific metadata webpage_title = metadata.get('VisitedWebPageTitle', doc.title) @@ -227,6 +248,7 @@ async def fetch_documents_by_ids( "GITHUB_CONNECTOR": "GitHub (Selected)", "YOUTUBE_VIDEO": "YouTube Videos (Selected)", "DISCORD_CONNECTOR": "Discord (Selected)", + "JIRA_CONNECTOR": "Jira Issues (Selected)", "EXTENSION": "Browser Extension (Selected)", "CRAWLED_URL": "Web Pages (Selected)", "FILE": "Files (Selected)" @@ -741,6 +763,30 @@ async def fetch_relevant_documents( } ) + elif connector == "JIRA_CONNECTOR": + source_object, jira_chunks = await connector_service.search_jira( + user_query=reformulated_query, + user_id=user_id, + search_space_id=search_space_id, + top_k=top_k, + search_mode=search_mode + ) + + # Add to sources and raw documents + if source_object: + all_sources.append(source_object) + all_raw_documents.extend(jira_chunks) + + # Stream found document count + if streaming_service and writer: + writer( + { + "yield_value": streaming_service.format_terminal_info_delta( + f"🎫 Found {len(jira_chunks)} Jira issues related to your query" + ) + } + ) + except Exception as e: error_message = f"Error searching connector {connector}: {str(e)}" print(error_message) diff --git a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py index eed0722..d726dfd 100644 --- a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py +++ b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py @@ -15,6 +15,8 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel - YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos) - GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions) - LINEAR_CONNECTOR: "Linear project issues and discussions" (personal project management) +- JIRA_CONNECTOR: "Jira project issues, tickets, and comments" (personal project tracking) +- DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications) - DISCORD_CONNECTOR: "Discord server messages and channels" (personal community interactions) - TAVILY_API: "Tavily search API results" (personalized search results) - LINKUP_API: "Linkup search API results" (personalized search results) diff --git a/surfsense_backend/app/agents/researcher/utils.py b/surfsense_backend/app/agents/researcher/utils.py index c4991cc..647e000 100644 --- a/surfsense_backend/app/agents/researcher/utils.py +++ b/surfsense_backend/app/agents/researcher/utils.py @@ -33,6 +33,8 @@ def get_connector_emoji(connector_name: str) -> str: "NOTION_CONNECTOR": "📘", "GITHUB_CONNECTOR": "🐙", "LINEAR_CONNECTOR": "📊", + "JIRA_CONNECTOR": "🎫", + "DISCORD_CONNECTOR": "🗨️", "TAVILY_API": "🔍", "LINKUP_API": "🔗" } @@ -50,6 +52,8 @@ def get_connector_friendly_name(connector_name: str) -> str: "NOTION_CONNECTOR": "Notion", "GITHUB_CONNECTOR": "GitHub", "LINEAR_CONNECTOR": "Linear", + "JIRA_CONNECTOR": "Jira", + "DISCORD_CONNECTOR": "Discord", "TAVILY_API": "Tavily Search", "LINKUP_API": "Linkup Search" } diff --git a/surfsense_backend/app/connectors/test_jira_connector.py b/surfsense_backend/app/connectors/test_jira_connector.py new file mode 100644 index 0000000..c9b7551 --- /dev/null +++ b/surfsense_backend/app/connectors/test_jira_connector.py @@ -0,0 +1,218 @@ +import unittest +from unittest.mock import patch, Mock +from datetime import datetime + +# Import the JiraConnector +from .jira_connector import JiraConnector + + +class TestJiraConnector(unittest.TestCase): + + def setUp(self): + """Set up test fixtures.""" + self.base_url = "https://test.atlassian.net" + self.token = "test_token" + self.connector = JiraConnector(base_url=self.base_url, personal_access_token=self.token) + + def test_init(self): + """Test JiraConnector initialization.""" + self.assertEqual(self.connector.base_url, self.base_url) + self.assertEqual(self.connector.personal_access_token, self.token) + self.assertEqual(self.connector.api_version, "3") + + def test_init_with_trailing_slash(self): + """Test JiraConnector initialization with trailing slash in URL.""" + connector = JiraConnector(base_url="https://test.atlassian.net/", personal_access_token=self.token) + self.assertEqual(connector.base_url, "https://test.atlassian.net") + + def test_set_credentials(self): + """Test setting credentials.""" + new_url = "https://newtest.atlassian.net/" + new_token = "new_token" + + self.connector.set_credentials(new_url, new_token) + + self.assertEqual(self.connector.base_url, "https://newtest.atlassian.net") + self.assertEqual(self.connector.personal_access_token, new_token) + + def test_get_headers(self): + """Test header generation.""" + headers = self.connector.get_headers() + + self.assertIn('Content-Type', headers) + self.assertIn('Authorization', headers) + self.assertIn('Accept', headers) + self.assertEqual(headers['Content-Type'], 'application/json') + self.assertEqual(headers['Accept'], 'application/json') + self.assertTrue(headers['Authorization'].startswith('Bearer ')) + + def test_get_headers_no_credentials(self): + """Test header generation without credentials.""" + connector = JiraConnector() + + with self.assertRaises(ValueError) as context: + connector.get_headers() + + self.assertIn("Jira credentials not initialized", str(context.exception)) + + @patch('requests.get') + def test_make_api_request_success(self, mock_get): + """Test successful API request.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"test": "data"} + mock_get.return_value = mock_response + + result = self.connector.make_api_request("test/endpoint") + + self.assertEqual(result, {"test": "data"}) + mock_get.assert_called_once() + + @patch('requests.get') + def test_make_api_request_failure(self, mock_get): + """Test failed API request.""" + mock_response = Mock() + mock_response.status_code = 401 + mock_response.text = "Unauthorized" + mock_get.return_value = mock_response + + with self.assertRaises(Exception) as context: + self.connector.make_api_request("test/endpoint") + + self.assertIn("API request failed with status code 401", str(context.exception)) + + @patch.object(JiraConnector, 'make_api_request') + def test_get_all_projects(self, mock_api_request): + """Test getting all projects.""" + mock_api_request.return_value = { + "values": [ + {"id": "1", "key": "TEST", "name": "Test Project"}, + {"id": "2", "key": "DEMO", "name": "Demo Project"} + ] + } + + projects = self.connector.get_all_projects() + + self.assertEqual(len(projects), 2) + self.assertEqual(projects[0]["key"], "TEST") + self.assertEqual(projects[1]["key"], "DEMO") + mock_api_request.assert_called_once_with("project") + + @patch.object(JiraConnector, 'make_api_request') + def test_get_all_issues(self, mock_api_request): + """Test getting all issues.""" + mock_api_request.return_value = { + "issues": [ + { + "id": "1", + "key": "TEST-1", + "fields": { + "summary": "Test Issue", + "description": "Test Description", + "status": {"name": "Open"}, + "priority": {"name": "High"}, + "issuetype": {"name": "Bug"}, + "project": {"key": "TEST"}, + "created": "2023-01-01T10:00:00.000+0000", + "updated": "2023-01-01T12:00:00.000+0000" + } + } + ], + "total": 1 + } + + issues = self.connector.get_all_issues() + + self.assertEqual(len(issues), 1) + self.assertEqual(issues[0]["key"], "TEST-1") + self.assertEqual(issues[0]["fields"]["summary"], "Test Issue") + + def test_format_issue(self): + """Test issue formatting.""" + raw_issue = { + "id": "1", + "key": "TEST-1", + "fields": { + "summary": "Test Issue", + "description": "Test Description", + "status": {"name": "Open", "statusCategory": {"name": "To Do"}}, + "priority": {"name": "High"}, + "issuetype": {"name": "Bug"}, + "project": {"key": "TEST"}, + "created": "2023-01-01T10:00:00.000+0000", + "updated": "2023-01-01T12:00:00.000+0000", + "reporter": { + "accountId": "123", + "displayName": "John Doe", + "emailAddress": "john@example.com" + }, + "assignee": { + "accountId": "456", + "displayName": "Jane Smith", + "emailAddress": "jane@example.com" + } + } + } + + formatted = self.connector.format_issue(raw_issue) + + self.assertEqual(formatted["id"], "1") + self.assertEqual(formatted["key"], "TEST-1") + self.assertEqual(formatted["title"], "Test Issue") + self.assertEqual(formatted["status"], "Open") + self.assertEqual(formatted["priority"], "High") + self.assertEqual(formatted["issue_type"], "Bug") + self.assertEqual(formatted["project"], "TEST") + self.assertEqual(formatted["reporter"]["display_name"], "John Doe") + self.assertEqual(formatted["assignee"]["display_name"], "Jane Smith") + + def test_format_date(self): + """Test date formatting.""" + iso_date = "2023-01-01T10:30:00.000+0000" + formatted_date = JiraConnector.format_date(iso_date) + + self.assertEqual(formatted_date, "2023-01-01 10:30:00") + + def test_format_date_invalid(self): + """Test date formatting with invalid input.""" + formatted_date = JiraConnector.format_date("invalid-date") + self.assertEqual(formatted_date, "invalid-date") + + formatted_date = JiraConnector.format_date("") + self.assertEqual(formatted_date, "Unknown date") + + formatted_date = JiraConnector.format_date(None) + self.assertEqual(formatted_date, "Unknown date") + + def test_format_issue_to_markdown(self): + """Test issue to markdown conversion.""" + formatted_issue = { + "key": "TEST-1", + "title": "Test Issue", + "status": "Open", + "priority": "High", + "issue_type": "Bug", + "project": "TEST", + "assignee": {"display_name": "Jane Smith"}, + "reporter": {"display_name": "John Doe"}, + "created_at": "2023-01-01T10:00:00.000+0000", + "updated_at": "2023-01-01T12:00:00.000+0000", + "description": "Test Description", + "comments": [] + } + + markdown = self.connector.format_issue_to_markdown(formatted_issue) + + self.assertIn("# TEST-1: Test Issue", markdown) + self.assertIn("**Status:** Open", markdown) + self.assertIn("**Priority:** High", markdown) + self.assertIn("**Type:** Bug", markdown) + self.assertIn("**Project:** TEST", markdown) + self.assertIn("**Assignee:** Jane Smith", markdown) + self.assertIn("**Reporter:** John Doe", markdown) + self.assertIn("## Description", markdown) + self.assertIn("Test Description", markdown) + + +if __name__ == '__main__': + unittest.main() diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 54f97d6..33366ff 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -19,7 +19,7 @@ from app.schemas import SearchSourceConnectorCreate, SearchSourceConnectorUpdate from app.users import current_active_user from app.utils.check_ownership import check_ownership from pydantic import BaseModel, Field, ValidationError -from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues, index_discord_messages +from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues, index_discord_messages, index_jira_issues from app.connectors.github_connector import GitHubConnector from datetime import datetime, timedelta import logging @@ -284,6 +284,7 @@ async def index_connector_content( - NOTION_CONNECTOR: Indexes pages from all accessible Notion pages - GITHUB_CONNECTOR: Indexes code and documentation from GitHub repositories - LINEAR_CONNECTOR: Indexes issues and comments from Linear + - JIRA_CONNECTOR: Indexes issues and comments from Jira - DISCORD_CONNECTOR: Indexes messages from all accessible Discord channels Args: @@ -349,6 +350,12 @@ async def index_connector_content( background_tasks.add_task(run_linear_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to) response_message = "Linear indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.JIRA_CONNECTOR: + # Run indexing in background + logger.info(f"Triggering Jira indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}") + background_tasks.add_task(run_jira_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to) + response_message = "Jira indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: # Run indexing in background logger.info( @@ -647,4 +654,45 @@ async def run_discord_indexing( else: logger.error(f"Discord indexing failed or no documents processed: {error_or_warning}") except Exception as e: - logger.error(f"Error in background Discord indexing task: {str(e)}") \ No newline at end of file + logger.error(f"Error in background Discord indexing task: {str(e)}") + + +# Add new helper functions for Jira indexing +async def run_jira_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str +): + """Wrapper to run Jira indexing with its own database session.""" + logger.info(f"Background task started: Indexing Jira connector {connector_id} into space {search_space_id} from {start_date} to {end_date}") + async with async_session_maker() as session: + await run_jira_indexing(session, connector_id, search_space_id, user_id, start_date, end_date) + logger.info(f"Background task finished: Indexing Jira connector {connector_id}") + +async def run_jira_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str +): + """Runs the Jira indexing task and updates the timestamp.""" + try: + indexed_count, error_message = await index_jira_issues( + session, connector_id, search_space_id, user_id, start_date, end_date, update_last_indexed=False + ) + if error_message: + logger.error(f"Jira indexing failed for connector {connector_id}: {error_message}") + # Optionally update status in DB to indicate failure + else: + logger.info(f"Jira indexing successful for connector {connector_id}. Indexed {indexed_count} documents.") + # Update the last indexed timestamp only on success + await update_connector_last_indexed(session, connector_id) + await session.commit() # Commit timestamp update + except Exception as e: + await session.rollback() + logger.error(f"Critical error in run_jira_indexing for connector {connector_id}: {e}", exc_info=True) + # Optionally update status in DB to indicate failure \ No newline at end of file diff --git a/surfsense_backend/app/schemas/search_source_connector.py b/surfsense_backend/app/schemas/search_source_connector.py index 1225d54..17f1867 100644 --- a/surfsense_backend/app/schemas/search_source_connector.py +++ b/surfsense_backend/app/schemas/search_source_connector.py @@ -101,6 +101,19 @@ class SearchSourceConnectorBase(BaseModel): # Ensure the bot token is not empty if not config.get("DISCORD_BOT_TOKEN"): raise ValueError("DISCORD_BOT_TOKEN cannot be empty") + elif connector_type == SearchSourceConnectorType.JIRA_CONNECTOR: + # For JIRA_CONNECTOR, allow JIRA_PERSONAL_ACCESS_TOKEN and JIRA_BASE_URL + allowed_keys = ["JIRA_PERSONAL_ACCESS_TOKEN", "JIRA_BASE_URL"] + if set(config.keys()) != set(allowed_keys): + raise ValueError(f"For JIRA_CONNECTOR connector type, config must only contain these keys: {allowed_keys}") + + # Ensure the token is not empty + if not config.get("JIRA_PERSONAL_ACCESS_TOKEN"): + raise ValueError("JIRA_PERSONAL_ACCESS_TOKEN cannot be empty") + + # Ensure the base URL is not empty + if not config.get("JIRA_BASE_URL"): + raise ValueError("JIRA_BASE_URL cannot be empty") return config diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 8c6f99c..4529366 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -1,15 +1,21 @@ -from typing import List, Dict, Optional import asyncio -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select -from app.retriver.chunks_hybrid_search import ChucksHybridSearchRetriever -from app.retriver.documents_hybrid_search import DocumentHybridSearchRetriever -from app.db import SearchSourceConnector, SearchSourceConnectorType, Chunk, Document, SearchSpace -from tavily import TavilyClient -from linkup import LinkupClient -from sqlalchemy import func +from typing import Dict, List, Optional from app.agents.researcher.configuration import SearchMode +from app.db import ( + Chunk, + Document, + SearchSourceConnector, + SearchSourceConnectorType, + SearchSpace, +) +from app.retriver.chunks_hybrid_search import ChucksHybridSearchRetriever +from app.retriver.documents_hybrid_search import DocumentHybridSearchRetriever +from linkup import LinkupClient +from sqlalchemy import func +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from tavily import TavilyClient class ConnectorService: @@ -18,9 +24,13 @@ class ConnectorService: self.chunk_retriever = ChucksHybridSearchRetriever(session) self.document_retriever = DocumentHybridSearchRetriever(session) self.user_id = user_id - self.source_id_counter = 100000 # High starting value to avoid collisions with existing IDs - self.counter_lock = asyncio.Lock() # Lock to protect counter in multithreaded environments - + self.source_id_counter = ( + 100000 # High starting value to avoid collisions with existing IDs + ) + self.counter_lock = ( + asyncio.Lock() + ) # Lock to protect counter in multithreaded environments + async def initialize_counter(self): """ Initialize the source_id_counter based on the total number of chunks for the user. @@ -38,16 +48,25 @@ class ConnectorService: ) chunk_count = result.scalar() or 0 self.source_id_counter = chunk_count + 1 - print(f"Initialized source_id_counter to {self.source_id_counter} for user {self.user_id}") + print( + f"Initialized source_id_counter to {self.source_id_counter} for user {self.user_id}" + ) except Exception as e: print(f"Error initializing source_id_counter: {str(e)}") # Fallback to default value self.source_id_counter = 1 - - async def search_crawled_urls(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + + async def search_crawled_urls( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for crawled URLs and return both the source information and langchain documents - + Returns: tuple: (sources_info, langchain_documents) """ @@ -57,7 +76,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="CRAWLED_URL" + document_type="CRAWLED_URL", ) elif search_mode == SearchMode.DOCUMENTS: crawled_urls_chunks = await self.document_retriever.hybrid_search( @@ -65,7 +84,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="CRAWLED_URL" + document_type="CRAWLED_URL", ) # Transform document retriever results to match expected format crawled_urls_chunks = self._transform_document_results(crawled_urls_chunks) @@ -84,20 +103,23 @@ class ConnectorService: async with self.counter_lock: for _i, chunk in enumerate(crawled_urls_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Create a source entry source = { - "id": document.get('id', self.source_id_counter), - "title": document.get('title', 'Untitled Document'), - "description": metadata.get('og:description', metadata.get('ogDescription', chunk.get('content', '')[:100])), - "url": metadata.get('url', '') + "id": document.get("id", self.source_id_counter), + "title": document.get("title", "Untitled Document"), + "description": metadata.get( + "og:description", + metadata.get("ogDescription", chunk.get("content", "")[:100]), + ), + "url": metadata.get("url", ""), } self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 1, @@ -105,13 +127,20 @@ class ConnectorService: "type": "CRAWLED_URL", "sources": sources_list, } - + return result_object, crawled_urls_chunks - - async def search_files(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + + async def search_files( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for files and return both the source information and langchain documents - + Returns: tuple: (sources_info, langchain_documents) """ @@ -121,7 +150,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="FILE" + document_type="FILE", ) elif search_mode == SearchMode.DOCUMENTS: files_chunks = await self.document_retriever.hybrid_search( @@ -129,11 +158,11 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="FILE" + document_type="FILE", ) # Transform document retriever results to match expected format files_chunks = self._transform_document_results(files_chunks) - + # Early return if no results if not files_chunks: return { @@ -148,20 +177,23 @@ class ConnectorService: async with self.counter_lock: for _i, chunk in enumerate(files_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Create a source entry source = { - "id": document.get('id', self.source_id_counter), - "title": document.get('title', 'Untitled Document'), - "description": metadata.get('og:description', metadata.get('ogDescription', chunk.get('content', '')[:100])), - "url": metadata.get('url', '') + "id": document.get("id", self.source_id_counter), + "title": document.get("title", "Untitled Document"), + "description": metadata.get( + "og:description", + metadata.get("ogDescription", chunk.get("content", "")[:100]), + ), + "url": metadata.get("url", ""), } self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 2, @@ -169,69 +201,76 @@ class ConnectorService: "type": "FILE", "sources": sources_list, } - + return result_object, files_chunks - + def _transform_document_results(self, document_results: List[Dict]) -> List[Dict]: """ Transform results from document_retriever.hybrid_search() to match the format expected by the processing code. - + Args: document_results: Results from document_retriever.hybrid_search() - + Returns: List of transformed results in the format expected by the processing code """ transformed_results = [] for doc in document_results: - transformed_results.append({ - 'document': { - 'id': doc.get('document_id'), - 'title': doc.get('title', 'Untitled Document'), - 'document_type': doc.get('document_type'), - 'metadata': doc.get('metadata', {}), - }, - 'content': doc.get('chunks_content', doc.get('content', '')), - 'score': doc.get('score', 0.0) - }) + transformed_results.append( + { + "document": { + "id": doc.get("document_id"), + "title": doc.get("title", "Untitled Document"), + "document_type": doc.get("document_type"), + "metadata": doc.get("metadata", {}), + }, + "content": doc.get("chunks_content", doc.get("content", "")), + "score": doc.get("score", 0.0), + } + ) return transformed_results - - async def get_connector_by_type(self, user_id: str, connector_type: SearchSourceConnectorType) -> Optional[SearchSourceConnector]: + + async def get_connector_by_type( + self, user_id: str, connector_type: SearchSourceConnectorType + ) -> Optional[SearchSourceConnector]: """ Get a connector by type for a specific user - + Args: user_id: The user's ID connector_type: The connector type to retrieve - + Returns: Optional[SearchSourceConnector]: The connector if found, None otherwise """ result = await self.session.execute( - select(SearchSourceConnector) - .filter( + select(SearchSourceConnector).filter( SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type == connector_type + SearchSourceConnector.connector_type == connector_type, ) ) return result.scalars().first() - - async def search_tavily(self, user_query: str, user_id: str, top_k: int = 20) -> tuple: + + async def search_tavily( + self, user_query: str, user_id: str, top_k: int = 20 + ) -> tuple: """ Search using Tavily API and return both the source information and documents - + Args: user_query: The user's query user_id: The user's ID top_k: Maximum number of results to return - + Returns: tuple: (sources_info, documents) """ # Get Tavily connector configuration - tavily_connector = await self.get_connector_by_type(user_id, SearchSourceConnectorType.TAVILY_API) - + tavily_connector = await self.get_connector_by_type( + user_id, SearchSourceConnectorType.TAVILY_API + ) + if not tavily_connector: # Return empty results if no Tavily connector is configured return { @@ -240,22 +279,22 @@ class ConnectorService: "type": "TAVILY_API", "sources": [], }, [] - + # Initialize Tavily client with API key from connector config tavily_api_key = tavily_connector.config.get("TAVILY_API_KEY") tavily_client = TavilyClient(api_key=tavily_api_key) - + # Perform search with Tavily try: response = tavily_client.search( query=user_query, max_results=top_k, - search_depth="advanced" # Use advanced search for better results + search_depth="advanced", # Use advanced search for better results ) - + # Extract results from Tavily response tavily_results = response.get("results", []) - + # Early return if no results if not tavily_results: return { @@ -264,23 +303,22 @@ class ConnectorService: "type": "TAVILY_API", "sources": [], }, [] - + # Process each result and create sources directly without deduplication sources_list = [] documents = [] - + async with self.counter_lock: for i, result in enumerate(tavily_results): - # Create a source entry source = { "id": self.source_id_counter, "title": result.get("title", "Tavily Result"), "description": result.get("content", "")[:100], - "url": result.get("url", "") + "url": result.get("url", ""), } sources_list.append(source) - + # Create a document entry document = { "chunk_id": f"tavily_chunk_{i}", @@ -293,9 +331,9 @@ class ConnectorService: "metadata": { "url": result.get("url", ""), "published_date": result.get("published_date", ""), - "source": "TAVILY_API" - } - } + "source": "TAVILY_API", + }, + }, } documents.append(document) self.source_id_counter += 1 @@ -307,9 +345,9 @@ class ConnectorService: "type": "TAVILY_API", "sources": sources_list, } - + return result_object, documents - + except Exception as e: # Log the error and return empty results print(f"Error searching with Tavily: {str(e)}") @@ -319,11 +357,18 @@ class ConnectorService: "type": "TAVILY_API", "sources": [], }, [] - - async def search_slack(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + + async def search_slack( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for slack and return both the source information and langchain documents - + Returns: tuple: (sources_info, langchain_documents) """ @@ -333,7 +378,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="SLACK_CONNECTOR" + document_type="SLACK_CONNECTOR", ) elif search_mode == SearchMode.DOCUMENTS: slack_chunks = await self.document_retriever.hybrid_search( @@ -341,11 +386,11 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="SLACK_CONNECTOR" + document_type="SLACK_CONNECTOR", ) # Transform document retriever results to match expected format slack_chunks = self._transform_document_results(slack_chunks) - + # Early return if no results if not slack_chunks: return { @@ -360,31 +405,31 @@ class ConnectorService: async with self.counter_lock: for _i, chunk in enumerate(slack_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Create a mapped source entry with Slack-specific metadata - channel_name = metadata.get('channel_name', 'Unknown Channel') - channel_id = metadata.get('channel_id', '') - message_date = metadata.get('start_date', '') - + channel_name = metadata.get("channel_name", "Unknown Channel") + channel_id = metadata.get("channel_id", "") + message_date = metadata.get("start_date", "") + # Create a more descriptive title for Slack messages title = f"Slack: {channel_name}" if message_date: title += f" ({message_date})" - + # Create a more descriptive description for Slack messages - description = chunk.get('content', '')[:100] + description = chunk.get("content", "")[:100] if len(description) == 100: description += "..." - + # For URL, we can use a placeholder or construct a URL to the Slack channel if available url = "" if channel_id: url = f"https://slack.com/app_redirect?channel={channel_id}" source = { - "id": document.get('id', self.source_id_counter), + "id": document.get("id", self.source_id_counter), "title": title, "description": description, "url": url, @@ -392,7 +437,7 @@ class ConnectorService: self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 4, @@ -400,19 +445,26 @@ class ConnectorService: "type": "SLACK_CONNECTOR", "sources": sources_list, } - + return result_object, slack_chunks - - async def search_notion(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + + async def search_notion( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for Notion pages and return both the source information and langchain documents - + Args: user_query: The user's query user_id: The user's ID search_space_id: The search space ID to search in top_k: Maximum number of results to return - + Returns: tuple: (sources_info, langchain_documents) """ @@ -422,7 +474,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="NOTION_CONNECTOR" + document_type="NOTION_CONNECTOR", ) elif search_mode == SearchMode.DOCUMENTS: notion_chunks = await self.document_retriever.hybrid_search( @@ -430,11 +482,11 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="NOTION_CONNECTOR" + document_type="NOTION_CONNECTOR", ) # Transform document retriever results to match expected format notion_chunks = self._transform_document_results(notion_chunks) - + # Early return if no results if not notion_chunks: return { @@ -449,24 +501,24 @@ class ConnectorService: async with self.counter_lock: for _i, chunk in enumerate(notion_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Create a mapped source entry with Notion-specific metadata - page_title = metadata.get('page_title', 'Untitled Page') - page_id = metadata.get('page_id', '') - indexed_at = metadata.get('indexed_at', '') - + page_title = metadata.get("page_title", "Untitled Page") + page_id = metadata.get("page_id", "") + indexed_at = metadata.get("indexed_at", "") + # Create a more descriptive title for Notion pages title = f"Notion: {page_title}" if indexed_at: title += f" (indexed: {indexed_at})" - + # Create a more descriptive description for Notion pages - description = chunk.get('content', '')[:100] + description = chunk.get("content", "")[:100] if len(description) == 100: description += "..." - + # For URL, we can use a placeholder or construct a URL to the Notion page if available url = "" if page_id: @@ -474,7 +526,7 @@ class ConnectorService: url = f"https://notion.so/{page_id.replace('-', '')}" source = { - "id": document.get('id', self.source_id_counter), + "id": document.get("id", self.source_id_counter), "title": title, "description": description, "url": url, @@ -482,7 +534,7 @@ class ConnectorService: self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 5, @@ -490,19 +542,26 @@ class ConnectorService: "type": "NOTION_CONNECTOR", "sources": sources_list, } - + return result_object, notion_chunks - - async def search_extension(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + + async def search_extension( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for extension data and return both the source information and langchain documents - + Args: user_query: The user's query user_id: The user's ID search_space_id: The search space ID to search in top_k: Maximum number of results to return - + Returns: tuple: (sources_info, langchain_documents) """ @@ -512,7 +571,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="EXTENSION" + document_type="EXTENSION", ) elif search_mode == SearchMode.DOCUMENTS: extension_chunks = await self.document_retriever.hybrid_search( @@ -520,7 +579,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="EXTENSION" + document_type="EXTENSION", ) # Transform document retriever results to match expected format extension_chunks = self._transform_document_results(extension_chunks) @@ -539,33 +598,39 @@ class ConnectorService: async with self.counter_lock: for i, chunk in enumerate(extension_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Extract extension-specific metadata - webpage_title = metadata.get('VisitedWebPageTitle', 'Untitled Page') - webpage_url = metadata.get('VisitedWebPageURL', '') - visit_date = metadata.get('VisitedWebPageDateWithTimeInISOString', '') - visit_duration = metadata.get('VisitedWebPageVisitDurationInMilliseconds', '') - browsing_session_id = metadata.get('BrowsingSessionId', '') - + webpage_title = metadata.get("VisitedWebPageTitle", "Untitled Page") + webpage_url = metadata.get("VisitedWebPageURL", "") + visit_date = metadata.get("VisitedWebPageDateWithTimeInISOString", "") + visit_duration = metadata.get( + "VisitedWebPageVisitDurationInMilliseconds", "" + ) + browsing_session_id = metadata.get("BrowsingSessionId", "") + # Create a more descriptive title for extension data title = webpage_title if visit_date: # Format the date for display (simplified) try: # Just extract the date part for display - formatted_date = visit_date.split('T')[0] if 'T' in visit_date else visit_date + formatted_date = ( + visit_date.split("T")[0] + if "T" in visit_date + else visit_date + ) title += f" (visited: {formatted_date})" except: # Fallback if date parsing fails title += f" (visited: {visit_date})" - + # Create a more descriptive description for extension data - description = chunk.get('content', '')[:100] + description = chunk.get("content", "")[:100] if len(description) == 100: description += "..." - + # Add visit duration if available if visit_duration: try: @@ -573,8 +638,8 @@ class ConnectorService: if duration_seconds < 60: duration_text = f"{duration_seconds:.1f} seconds" else: - duration_text = f"{duration_seconds/60:.1f} minutes" - + duration_text = f"{duration_seconds / 60:.1f} minutes" + if description: description += f" | Duration: {duration_text}" except: @@ -582,15 +647,15 @@ class ConnectorService: pass source = { - "id": document.get('id', self.source_id_counter), + "id": document.get("id", self.source_id_counter), "title": title, "description": description, - "url": webpage_url + "url": webpage_url, } self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 6, @@ -598,19 +663,26 @@ class ConnectorService: "type": "EXTENSION", "sources": sources_list, } - + return result_object, extension_chunks - - async def search_youtube(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + + async def search_youtube( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for YouTube videos and return both the source information and langchain documents - + Args: user_query: The user's query user_id: The user's ID search_space_id: The search space ID to search in top_k: Maximum number of results to return - + Returns: tuple: (sources_info, langchain_documents) """ @@ -620,7 +692,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="YOUTUBE_VIDEO" + document_type="YOUTUBE_VIDEO", ) elif search_mode == SearchMode.DOCUMENTS: youtube_chunks = await self.document_retriever.hybrid_search( @@ -628,11 +700,11 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="YOUTUBE_VIDEO" + document_type="YOUTUBE_VIDEO", ) # Transform document retriever results to match expected format youtube_chunks = self._transform_document_results(youtube_chunks) - + # Early return if no results if not youtube_chunks: return { @@ -647,40 +719,42 @@ class ConnectorService: async with self.counter_lock: for _i, chunk in enumerate(youtube_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Extract YouTube-specific metadata - video_title = metadata.get('video_title', 'Untitled Video') - video_id = metadata.get('video_id', '') - channel_name = metadata.get('channel_name', '') + video_title = metadata.get("video_title", "Untitled Video") + video_id = metadata.get("video_id", "") + channel_name = metadata.get("channel_name", "") # published_date = metadata.get('published_date', '') - + # Create a more descriptive title for YouTube videos title = video_title if channel_name: title += f" - {channel_name}" - + # Create a more descriptive description for YouTube videos - description = metadata.get('description', chunk.get('content', '')[:100]) + description = metadata.get( + "description", chunk.get("content", "")[:100] + ) if len(description) == 100: description += "..." - + # For URL, construct a URL to the YouTube video url = f"https://www.youtube.com/watch?v={video_id}" if video_id else "" source = { - "id": document.get('id', self.source_id_counter), + "id": document.get("id", self.source_id_counter), "title": title, "description": description, "url": url, "video_id": video_id, # Additional field for YouTube videos - "channel_name": channel_name # Additional field for YouTube videos + "channel_name": channel_name, # Additional field for YouTube videos } self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 7, # Assign a unique ID for the YouTube connector @@ -688,13 +762,20 @@ class ConnectorService: "type": "YOUTUBE_VIDEO", "sources": sources_list, } - + return result_object, youtube_chunks - async def search_github(self, user_query: str, user_id: int, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + async def search_github( + self, + user_query: str, + user_id: int, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for GitHub documents and return both the source information and langchain documents - + Returns: tuple: (sources_info, langchain_documents) """ @@ -704,7 +785,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="GITHUB_CONNECTOR" + document_type="GITHUB_CONNECTOR", ) elif search_mode == SearchMode.DOCUMENTS: github_chunks = await self.document_retriever.hybrid_search( @@ -712,11 +793,11 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="GITHUB_CONNECTOR" + document_type="GITHUB_CONNECTOR", ) # Transform document retriever results to match expected format github_chunks = self._transform_document_results(github_chunks) - + # Early return if no results if not github_chunks: return { @@ -731,20 +812,24 @@ class ConnectorService: async with self.counter_lock: for _i, chunk in enumerate(github_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Create a source entry source = { - "id": document.get('id', self.source_id_counter), - "title": document.get('title', 'GitHub Document'), # Use specific title if available - "description": metadata.get('description', chunk.get('content', '')[:100]), # Use description or content preview - "url": metadata.get('url', '') # Use URL if available in metadata + "id": document.get("id", self.source_id_counter), + "title": document.get( + "title", "GitHub Document" + ), # Use specific title if available + "description": metadata.get( + "description", chunk.get("content", "")[:100] + ), # Use description or content preview + "url": metadata.get("url", ""), # Use URL if available in metadata } self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 8, @@ -752,19 +837,26 @@ class ConnectorService: "type": "GITHUB_CONNECTOR", "sources": sources_list, } - + return result_object, github_chunks - async def search_linear(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + async def search_linear( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for Linear issues and comments and return both the source information and langchain documents - + Args: user_query: The user's query user_id: The user's ID search_space_id: The search space ID to search in top_k: Maximum number of results to return - + Returns: tuple: (sources_info, langchain_documents) """ @@ -774,7 +866,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="LINEAR_CONNECTOR" + document_type="LINEAR_CONNECTOR", ) elif search_mode == SearchMode.DOCUMENTS: linear_chunks = await self.document_retriever.hybrid_search( @@ -782,7 +874,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="LINEAR_CONNECTOR" + document_type="LINEAR_CONNECTOR", ) # Transform document retriever results to match expected format linear_chunks = self._transform_document_results(linear_chunks) @@ -801,32 +893,32 @@ class ConnectorService: async with self.counter_lock: for _i, chunk in enumerate(linear_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Extract Linear-specific metadata - issue_identifier = metadata.get('issue_identifier', '') - issue_title = metadata.get('issue_title', 'Untitled Issue') - issue_state = metadata.get('state', '') - comment_count = metadata.get('comment_count', 0) - + issue_identifier = metadata.get("issue_identifier", "") + issue_title = metadata.get("issue_title", "Untitled Issue") + issue_state = metadata.get("state", "") + comment_count = metadata.get("comment_count", 0) + # Create a more descriptive title for Linear issues title = f"Linear: {issue_identifier} - {issue_title}" if issue_state: title += f" ({issue_state})" - + # Create a more descriptive description for Linear issues - description = chunk.get('content', '')[:100] + description = chunk.get("content", "")[:100] if len(description) == 100: description += "..." - + # Add comment count info to description if comment_count: if description: description += f" | Comments: {comment_count}" else: description = f"Comments: {comment_count}" - + # For URL, we could construct a URL to the Linear issue if we have the workspace info # For now, use a generic placeholder url = "" @@ -835,18 +927,18 @@ class ConnectorService: url = f"https://linear.app/issue/{issue_identifier}" source = { - "id": document.get('id', self.source_id_counter), + "id": document.get("id", self.source_id_counter), "title": title, "description": description, "url": url, "issue_identifier": issue_identifier, "state": issue_state, - "comment_count": comment_count + "comment_count": comment_count, } self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 9, # Assign a unique ID for the Linear connector @@ -854,10 +946,17 @@ class ConnectorService: "type": "LINEAR_CONNECTOR", "sources": sources_list, } - + return result_object, linear_chunks - async def search_jira(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + async def search_jira( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for Jira issues and comments and return both the source information and langchain documents @@ -877,7 +976,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="JIRA_CONNECTOR" + document_type="JIRA_CONNECTOR", ) elif search_mode == SearchMode.DOCUMENTS: jira_chunks = await self.document_retriever.hybrid_search( @@ -885,7 +984,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="JIRA_CONNECTOR" + document_type="JIRA_CONNECTOR", ) # Transform document retriever results to match expected format jira_chunks = self._transform_document_results(jira_chunks) @@ -904,16 +1003,16 @@ class ConnectorService: async with self.counter_lock: for _i, chunk in enumerate(jira_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Extract Jira-specific metadata - issue_key = metadata.get('issue_key', '') - issue_title = metadata.get('issue_title', 'Untitled Issue') - status = metadata.get('status', '') - priority = metadata.get('priority', '') - issue_type = metadata.get('issue_type', '') - comment_count = metadata.get('comment_count', 0) + issue_key = metadata.get("issue_key", "") + issue_title = metadata.get("issue_title", "Untitled Issue") + status = metadata.get("status", "") + priority = metadata.get("priority", "") + issue_type = metadata.get("issue_type", "") + comment_count = metadata.get("comment_count", 0) # Create a more descriptive title for Jira issues title = f"Jira: {issue_key} - {issue_title}" @@ -921,7 +1020,7 @@ class ConnectorService: title += f" ({status})" # Create a more descriptive description for Jira issues - description = chunk.get('content', '')[:100] + description = chunk.get("content", "")[:100] if len(description) == 100: description += "..." @@ -938,16 +1037,16 @@ class ConnectorService: if description: description += f" | {' | '.join(info_parts)}" else: - description = ' | '.join(info_parts) + description = " | ".join(info_parts) # For URL, we could construct a URL to the Jira issue if we have the base URL # For now, use a generic placeholder url = "" - if issue_key and metadata.get('base_url'): + if issue_key and metadata.get("base_url"): url = f"{metadata.get('base_url')}/browse/{issue_key}" source = { - "id": document.get('id', self.source_id_counter), + "id": document.get("id", self.source_id_counter), "title": title, "description": description, "url": url, @@ -955,7 +1054,7 @@ class ConnectorService: "status": status, "priority": priority, "issue_type": issue_type, - "comment_count": comment_count + "comment_count": comment_count, } self.source_id_counter += 1 @@ -971,21 +1070,25 @@ class ConnectorService: return result_object, jira_chunks - async def search_linkup(self, user_query: str, user_id: str, mode: str = "standard") -> tuple: + async def search_linkup( + self, user_query: str, user_id: str, mode: str = "standard" + ) -> tuple: """ Search using Linkup API and return both the source information and documents - + Args: user_query: The user's query user_id: The user's ID mode: Search depth mode, can be "standard" or "deep" - + Returns: tuple: (sources_info, documents) """ # Get Linkup connector configuration - linkup_connector = await self.get_connector_by_type(user_id, SearchSourceConnectorType.LINKUP_API) - + linkup_connector = await self.get_connector_by_type( + user_id, SearchSourceConnectorType.LINKUP_API + ) + if not linkup_connector: # Return empty results if no Linkup connector is configured return { @@ -994,11 +1097,11 @@ class ConnectorService: "type": "LINKUP_API", "sources": [], }, [] - + # Initialize Linkup client with API key from connector config linkup_api_key = linkup_connector.config.get("LINKUP_API_KEY") linkup_client = LinkupClient(api_key=linkup_api_key) - + # Perform search with Linkup try: response = linkup_client.search( @@ -1006,10 +1109,10 @@ class ConnectorService: depth=mode, # Use the provided mode ("standard" or "deep") output_type="searchResults", # Default to search results ) - + # Extract results from Linkup response - access as attribute instead of using .get() - linkup_results = response.results if hasattr(response, 'results') else [] - + linkup_results = response.results if hasattr(response, "results") else [] + # Only proceed if we have results if not linkup_results: return { @@ -1018,41 +1121,49 @@ class ConnectorService: "type": "LINKUP_API", "sources": [], }, [] - + # Process each result and create sources directly without deduplication sources_list = [] documents = [] - + async with self.counter_lock: for i, result in enumerate(linkup_results): # Only process results that have content - if not hasattr(result, 'content') or not result.content: + if not hasattr(result, "content") or not result.content: continue - + # Create a source entry source = { "id": self.source_id_counter, - "title": result.name if hasattr(result, 'name') else "Linkup Result", - "description": result.content[:100] if hasattr(result, 'content') else "", - "url": result.url if hasattr(result, 'url') else "" + "title": ( + result.name if hasattr(result, "name") else "Linkup Result" + ), + "description": ( + result.content[:100] if hasattr(result, "content") else "" + ), + "url": result.url if hasattr(result, "url") else "", } sources_list.append(source) - + # Create a document entry document = { "chunk_id": f"linkup_chunk_{i}", - "content": result.content if hasattr(result, 'content') else "", + "content": result.content if hasattr(result, "content") else "", "score": 1.0, # Default score since not provided by Linkup "document": { "id": self.source_id_counter, - "title": result.name if hasattr(result, 'name') else "Linkup Result", + "title": ( + result.name + if hasattr(result, "name") + else "Linkup Result" + ), "document_type": "LINKUP_API", "metadata": { - "url": result.url if hasattr(result, 'url') else "", - "type": result.type if hasattr(result, 'type') else "", - "source": "LINKUP_API" - } - } + "url": result.url if hasattr(result, "url") else "", + "type": result.type if hasattr(result, "type") else "", + "source": "LINKUP_API", + }, + }, } documents.append(document) self.source_id_counter += 1 @@ -1064,9 +1175,9 @@ class ConnectorService: "type": "LINKUP_API", "sources": sources_list, } - + return result_object, documents - + except Exception as e: # Log the error and return empty results print(f"Error searching with Linkup: {str(e)}") @@ -1076,17 +1187,24 @@ class ConnectorService: "type": "LINKUP_API", "sources": [], }, [] - - async def search_discord(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + + async def search_discord( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: """ Search for Discord messages and return both the source information and langchain documents - + Args: user_query: The user's query user_id: The user's ID search_space_id: The search space ID to search in top_k: Maximum number of results to return - + Returns: tuple: (sources_info, langchain_documents) """ @@ -1096,7 +1214,7 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="DISCORD_CONNECTOR" + document_type="DISCORD_CONNECTOR", ) elif search_mode == SearchMode.DOCUMENTS: discord_chunks = await self.document_retriever.hybrid_search( @@ -1104,11 +1222,11 @@ class ConnectorService: top_k=top_k, user_id=user_id, search_space_id=search_space_id, - document_type="DISCORD_CONNECTOR" + document_type="DISCORD_CONNECTOR", ) # Transform document retriever results to match expected format discord_chunks = self._transform_document_results(discord_chunks) - + # Early return if no results if not discord_chunks: return { @@ -1123,26 +1241,26 @@ class ConnectorService: async with self.counter_lock: for i, chunk in enumerate(discord_chunks): # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) # Create a mapped source entry with Discord-specific metadata - channel_name = metadata.get('channel_name', 'Unknown Channel') - channel_id = metadata.get('channel_id', '') - message_date = metadata.get('start_date', '') - + channel_name = metadata.get("channel_name", "Unknown Channel") + channel_id = metadata.get("channel_id", "") + message_date = metadata.get("start_date", "") + # Create a more descriptive title for Discord messages title = f"Discord: {channel_name}" if message_date: title += f" ({message_date})" - + # Create a more descriptive description for Discord messages - description = chunk.get('content', '')[:100] + description = chunk.get("content", "")[:100] if len(description) == 100: description += "..." - + url = "" - guild_id = metadata.get('guild_id', '') + guild_id = metadata.get("guild_id", "") if guild_id and channel_id: url = f"https://discord.com/channels/{guild_id}/{channel_id}" elif channel_id: @@ -1150,7 +1268,7 @@ class ConnectorService: url = f"https://discord.com/channels/@me/{channel_id}" source = { - "id": document.get('id', self.source_id_counter), + "id": document.get("id", self.source_id_counter), "title": title, "description": description, "url": url, @@ -1158,7 +1276,7 @@ class ConnectorService: self.source_id_counter += 1 sources_list.append(source) - + # Create result object result_object = { "id": 11, @@ -1166,7 +1284,5 @@ class ConnectorService: "type": "DISCORD_CONNECTOR", "sources": sources_list, } - + return result_object, discord_chunks - - diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index ab3bc85..f4ae139 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -1,28 +1,35 @@ -from typing import Optional, Tuple -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.future import select +import asyncio +import logging from datetime import datetime, timedelta, timezone -from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchSourceConnectorType, SearchSpace +from typing import Optional, Tuple + from app.config import config +from app.connectors.discord_connector import DiscordConnector +from app.connectors.github_connector import GitHubConnector +from app.connectors.jira_connector import JiraConnector +from app.connectors.linear_connector import LinearConnector +from app.connectors.notion_history import NotionHistoryConnector +from app.connectors.slack_history import SlackHistory +from app.db import ( + Chunk, + Document, + DocumentType, + SearchSourceConnector, + SearchSourceConnectorType, +) from app.prompts import SUMMARY_PROMPT_TEMPLATE from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.connectors.slack_history import SlackHistory -from app.connectors.notion_history import NotionHistoryConnector -from app.connectors.github_connector import GitHubConnector -from app.connectors.linear_connector import LinearConnector -from app.connectors.discord_connector import DiscordConnector -from app.connectors.jira_connector import JiraConnector -from slack_sdk.errors import SlackApiError -import logging -import asyncio - from app.utils.document_converters import generate_content_hash +from slack_sdk.errors import SlackApiError +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select # Set up logging logger = logging.getLogger(__name__) + async def index_slack_messages( session: AsyncSession, connector_id: int, @@ -30,56 +37,64 @@ async def index_slack_messages( user_id: str, start_date: str = None, end_date: str = None, - update_last_indexed: bool = True + update_last_indexed: bool = True, ) -> Tuple[int, Optional[str]]: """ Index Slack messages from all accessible channels. - + Args: session: Database session connector_id: ID of the Slack connector search_space_id: ID of the search space to store documents in update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - + Returns: Tuple containing (number of documents indexed, error message or None) """ task_logger = TaskLoggingService(session, search_space_id) - + # Log task start log_entry = await task_logger.log_task_start( task_name="slack_messages_indexing", source="connector_indexing_task", message=f"Starting Slack messages indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, ) - + try: # Get the connector await task_logger.log_task_progress( log_entry, f"Retrieving Slack connector {connector_id} from database", - {"stage": "connector_retrieval"} + {"stage": "connector_retrieval"}, ) - + result = await session.execute( - select(SearchSourceConnector) - .filter( + select(SearchSourceConnector).filter( SearchSourceConnector.id == connector_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.SLACK_CONNECTOR + SearchSourceConnector.connector_type + == SearchSourceConnectorType.SLACK_CONNECTOR, ) ) connector = result.scalars().first() - + if not connector: await task_logger.log_task_failure( log_entry, f"Connector with ID {connector_id} not found or is not a Slack connector", "Connector not found", - {"error_type": "ConnectorNotFound"} + {"error_type": "ConnectorNotFound"}, ) - return 0, f"Connector with ID {connector_id} not found or is not a Slack connector" - + return ( + 0, + f"Connector with ID {connector_id} not found or is not a Slack connector", + ) + # Get the Slack token from the connector config slack_token = connector.config.get("SLACK_BOT_TOKEN") if not slack_token: @@ -87,62 +102,86 @@ async def index_slack_messages( log_entry, f"Slack token not found in connector config for connector {connector_id}", "Missing Slack token", - {"error_type": "MissingToken"} + {"error_type": "MissingToken"}, ) return 0, "Slack token not found in connector config" - + # Initialize Slack client await task_logger.log_task_progress( log_entry, f"Initializing Slack client for connector {connector_id}", - {"stage": "client_initialization"} + {"stage": "client_initialization"}, ) - + slack_client = SlackHistory(token=slack_token) - + # Calculate date range await task_logger.log_task_progress( log_entry, - f"Calculating date range for Slack indexing", - {"stage": "date_calculation", "provided_start_date": start_date, "provided_end_date": end_date} + "Calculating date range for Slack indexing", + { + "stage": "date_calculation", + "provided_start_date": start_date, + "provided_end_date": end_date, + }, ) - + if start_date is None or end_date is None: # Fall back to calculating dates based on last_indexed_at calculated_end_date = datetime.now() - + # Use last_indexed_at as start date if available, otherwise use 365 days ago if connector.last_indexed_at: # Convert dates to be comparable (both timezone-naive) - last_indexed_naive = connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at - + last_indexed_naive = ( + connector.last_indexed_at.replace(tzinfo=None) + if connector.last_indexed_at.tzinfo + else connector.last_indexed_at + ) + # Check if last_indexed_at is in the future or after end_date if last_indexed_naive > calculated_end_date: - logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.") + logger.warning( + f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead." + ) calculated_start_date = calculated_end_date - timedelta(days=365) else: calculated_start_date = last_indexed_naive - logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date") + logger.info( + f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date" + ) else: - calculated_start_date = calculated_end_date - timedelta(days=365) # Use 365 days as default - logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date") - + calculated_start_date = calculated_end_date - timedelta( + days=365 + ) # Use 365 days as default + logger.info( + f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date" + ) + # Use calculated dates if not provided - start_date_str = start_date if start_date else calculated_start_date.strftime("%Y-%m-%d") - end_date_str = end_date if end_date else calculated_end_date.strftime("%Y-%m-%d") + start_date_str = ( + start_date if start_date else calculated_start_date.strftime("%Y-%m-%d") + ) + end_date_str = ( + end_date if end_date else calculated_end_date.strftime("%Y-%m-%d") + ) else: # Use provided dates start_date_str = start_date end_date_str = end_date - + logger.info(f"Indexing Slack messages from {start_date_str} to {end_date_str}") - + await task_logger.log_task_progress( log_entry, f"Fetching Slack channels from {start_date_str} to {end_date_str}", - {"stage": "fetch_channels", "start_date": start_date_str, "end_date": end_date_str} + { + "stage": "fetch_channels", + "start_date": start_date_str, + "end_date": end_date_str, + }, ) - + # Get all channels try: channels = slack_client.get_all_channels() @@ -151,133 +190,162 @@ async def index_slack_messages( log_entry, f"Failed to get Slack channels for connector {connector_id}", str(e), - {"error_type": "ChannelFetchError"} + {"error_type": "ChannelFetchError"}, ) return 0, f"Failed to get Slack channels: {str(e)}" - + if not channels: await task_logger.log_task_success( log_entry, f"No Slack channels found for connector {connector_id}", - {"channels_found": 0} + {"channels_found": 0}, ) return 0, "No Slack channels found" - + # Track the number of documents indexed documents_indexed = 0 documents_skipped = 0 skipped_channels = [] - + await task_logger.log_task_progress( log_entry, f"Starting to process {len(channels)} Slack channels", - {"stage": "process_channels", "total_channels": len(channels)} + {"stage": "process_channels", "total_channels": len(channels)}, ) - + # Process each channel - for channel_obj in channels: # Modified loop to iterate over list of channel objects + for ( + channel_obj + ) in channels: # Modified loop to iterate over list of channel objects channel_id = channel_obj["id"] channel_name = channel_obj["name"] is_private = channel_obj["is_private"] - is_member = channel_obj["is_member"] # This might be False for public channels too + is_member = channel_obj[ + "is_member" + ] # This might be False for public channels too try: # If it's a private channel and the bot is not a member, skip. # For public channels, if they are listed by conversations.list, the bot can typically read history. # The `not_in_channel` error in get_conversation_history will be the ultimate gatekeeper if history is inaccessible. if is_private and not is_member: - logger.warning(f"Bot is not a member of private channel {channel_name} ({channel_id}). Skipping.") - skipped_channels.append(f"{channel_name} (private, bot not a member)") + logger.warning( + f"Bot is not a member of private channel {channel_name} ({channel_id}). Skipping." + ) + skipped_channels.append( + f"{channel_name} (private, bot not a member)" + ) documents_skipped += 1 continue - + # Get messages for this channel - # The get_history_by_date_range now uses get_conversation_history, + # The get_history_by_date_range now uses get_conversation_history, # which handles 'not_in_channel' by returning [] and logging. messages, error = slack_client.get_history_by_date_range( channel_id=channel_id, start_date=start_date_str, end_date=end_date_str, - limit=1000 # Limit to 1000 messages per channel + limit=1000, # Limit to 1000 messages per channel ) - + if error: - logger.warning(f"Error getting messages from channel {channel_name}: {error}") + logger.warning( + f"Error getting messages from channel {channel_name}: {error}" + ) skipped_channels.append(f"{channel_name} (error: {error})") documents_skipped += 1 continue # Skip this channel if there's an error - + if not messages: - logger.info(f"No messages found in channel {channel_name} for the specified date range.") + logger.info( + f"No messages found in channel {channel_name} for the specified date range." + ) documents_skipped += 1 continue # Skip if no messages - + # Format messages with user info formatted_messages = [] for msg in messages: # Skip bot messages and system messages - if msg.get("subtype") in ["bot_message", "channel_join", "channel_leave"]: + if msg.get("subtype") in [ + "bot_message", + "channel_join", + "channel_leave", + ]: continue - - formatted_msg = slack_client.format_message(msg, include_user_info=True) + + formatted_msg = slack_client.format_message( + msg, include_user_info=True + ) formatted_messages.append(formatted_msg) - + if not formatted_messages: - logger.info(f"No valid messages found in channel {channel_name} after filtering.") + logger.info( + f"No valid messages found in channel {channel_name} after filtering." + ) documents_skipped += 1 continue # Skip if no valid messages after filtering - + # Convert messages to markdown format channel_content = f"# Slack Channel: {channel_name}\n\n" - + for msg in formatted_messages: user_name = msg.get("user_name", "Unknown User") timestamp = msg.get("datetime", "Unknown Time") text = msg.get("text", "") - - channel_content += f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n" - + + channel_content += ( + f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n" + ) + # Format document metadata metadata_sections = [ - ("METADATA", [ - f"CHANNEL_NAME: {channel_name}", - f"CHANNEL_ID: {channel_id}", - # f"START_DATE: {start_date_str}", - # f"END_DATE: {end_date_str}", - f"MESSAGE_COUNT: {len(formatted_messages)}" - ]), - ("CONTENT", [ - "FORMAT: markdown", - "TEXT_START", - channel_content, - "TEXT_END" - ]) + ( + "METADATA", + [ + f"CHANNEL_NAME: {channel_name}", + f"CHANNEL_ID: {channel_id}", + # f"START_DATE: {start_date_str}", + # f"END_DATE: {end_date_str}", + f"MESSAGE_COUNT: {len(formatted_messages)}", + ], + ), + ( + "CONTENT", + ["FORMAT: markdown", "TEXT_START", channel_content, "TEXT_END"], + ), ] - + # Build the document string document_parts = [] document_parts.append("") - + for section_title, section_content in metadata_sections: document_parts.append(f"<{section_title}>") document_parts.extend(section_content) document_parts.append(f"") - + document_parts.append("") - combined_document_string = '\n'.join(document_parts) - content_hash = generate_content_hash(combined_document_string, search_space_id) + combined_document_string = "\n".join(document_parts) + content_hash = generate_content_hash( + combined_document_string, search_space_id + ) # Check if document with this content hash already exists existing_doc_by_hash_result = await session.execute( select(Document).where(Document.content_hash == content_hash) ) - existing_document_by_hash = existing_doc_by_hash_result.scalars().first() - + existing_document_by_hash = ( + existing_doc_by_hash_result.scalars().first() + ) + if existing_document_by_hash: - logger.info(f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing.") + logger.info( + f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing." + ) documents_skipped += 1 continue - + # Get user's long context LLM user_llm = await get_user_long_context_llm(session, user_id) if not user_llm: @@ -285,19 +353,26 @@ async def index_slack_messages( skipped_channels.append(f"{channel_name} (no LLM configured)") documents_skipped += 1 continue - + # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm - summary_result = await summary_chain.ainvoke({"document": combined_document_string}) + summary_result = await summary_chain.ainvoke( + {"document": combined_document_string} + ) summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed(summary_content) - + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + # Process chunks chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) for chunk in config.chunker_instance.chunk(channel_content) ] - + # Create and store new document document = Document( search_space_id=search_space_id, @@ -309,20 +384,24 @@ async def index_slack_messages( "start_date": start_date_str, "end_date": end_date_str, "message_count": len(formatted_messages), - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, content=summary_content, embedding=summary_embedding, chunks=chunks, content_hash=content_hash, ) - + session.add(document) documents_indexed += 1 - logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages") - + logger.info( + f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages" + ) + except SlackApiError as slack_error: - logger.error(f"Slack API error for channel {channel_name}: {str(slack_error)}") + logger.error( + f"Slack API error for channel {channel_name}: {str(slack_error)}" + ) skipped_channels.append(f"{channel_name} (Slack API error)") documents_skipped += 1 continue # Skip this channel and continue with others @@ -331,23 +410,23 @@ async def index_slack_messages( skipped_channels.append(f"{channel_name} (processing error)") documents_skipped += 1 continue # Skip this channel and continue with others - + # Update the last_indexed_at timestamp for the connector only if requested # and if we successfully indexed at least one channel total_processed = documents_indexed if update_last_indexed and total_processed > 0: connector.last_indexed_at = datetime.now() - + # Commit all changes await session.commit() - + # Prepare result message result_message = None if skipped_channels: result_message = f"Processed {total_processed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" else: result_message = f"Processed {total_processed} channels." - + # Log success await task_logger.log_task_success( log_entry, @@ -357,20 +436,22 @@ async def index_slack_messages( "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, "skipped_channels_count": len(skipped_channels), - "result_message": result_message - } + "result_message": result_message, + }, + ) + + logger.info( + f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped" ) - - logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped") return total_processed, result_message - + except SQLAlchemyError as db_error: await session.rollback() await task_logger.log_task_failure( log_entry, f"Database error during Slack indexing for connector {connector_id}", str(db_error), - {"error_type": "SQLAlchemyError"} + {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {str(db_error)}") return 0, f"Database error: {str(db_error)}" @@ -380,11 +461,12 @@ async def index_slack_messages( log_entry, f"Failed to index Slack messages for connector {connector_id}", str(e), - {"error_type": type(e).__name__} + {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Slack messages: {str(e)}") return 0, f"Failed to index Slack messages: {str(e)}" + async def index_notion_pages( session: AsyncSession, connector_id: int, @@ -392,56 +474,64 @@ async def index_notion_pages( user_id: str, start_date: str = None, end_date: str = None, - update_last_indexed: bool = True + update_last_indexed: bool = True, ) -> Tuple[int, Optional[str]]: """ Index Notion pages from all accessible pages. - + Args: session: Database session connector_id: ID of the Notion connector search_space_id: ID of the search space to store documents in update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - + Returns: Tuple containing (number of documents indexed, error message or None) """ task_logger = TaskLoggingService(session, search_space_id) - + # Log task start log_entry = await task_logger.log_task_start( task_name="notion_pages_indexing", source="connector_indexing_task", message=f"Starting Notion pages indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, ) - + try: # Get the connector await task_logger.log_task_progress( log_entry, f"Retrieving Notion connector {connector_id} from database", - {"stage": "connector_retrieval"} + {"stage": "connector_retrieval"}, ) - + result = await session.execute( - select(SearchSourceConnector) - .filter( + select(SearchSourceConnector).filter( SearchSourceConnector.id == connector_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR + SearchSourceConnector.connector_type + == SearchSourceConnectorType.NOTION_CONNECTOR, ) ) connector = result.scalars().first() - + if not connector: await task_logger.log_task_failure( log_entry, f"Connector with ID {connector_id} not found or is not a Notion connector", "Connector not found", - {"error_type": "ConnectorNotFound"} + {"error_type": "ConnectorNotFound"}, ) - return 0, f"Connector with ID {connector_id} not found or is not a Notion connector" - + return ( + 0, + f"Connector with ID {connector_id} not found or is not a Notion connector", + ) + # Get the Notion token from the connector config notion_token = connector.config.get("NOTION_INTEGRATION_TOKEN") if not notion_token: @@ -449,103 +539,119 @@ async def index_notion_pages( log_entry, f"Notion integration token not found in connector config for connector {connector_id}", "Missing Notion token", - {"error_type": "MissingToken"} + {"error_type": "MissingToken"}, ) return 0, "Notion integration token not found in connector config" - + # Initialize Notion client await task_logger.log_task_progress( log_entry, f"Initializing Notion client for connector {connector_id}", - {"stage": "client_initialization"} + {"stage": "client_initialization"}, ) - + logger.info(f"Initializing Notion client for connector {connector_id}") notion_client = NotionHistoryConnector(token=notion_token) - + # Calculate date range if start_date is None or end_date is None: # Fall back to calculating dates calculated_end_date = datetime.now() - calculated_start_date = calculated_end_date - timedelta(days=365) # Check for last 1 year of pages - + calculated_start_date = calculated_end_date - timedelta( + days=365 + ) # Check for last 1 year of pages + # Use calculated dates if not provided if start_date is None: start_date_iso = calculated_start_date.strftime("%Y-%m-%dT%H:%M:%SZ") else: # Convert YYYY-MM-DD to ISO format - start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ") - + start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + if end_date is None: end_date_iso = calculated_end_date.strftime("%Y-%m-%dT%H:%M:%SZ") else: # Convert YYYY-MM-DD to ISO format - end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ") + end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) else: # Convert provided dates to ISO format for Notion API - start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ") - end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ") - + start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}") - + await task_logger.log_task_progress( log_entry, f"Fetching Notion pages from {start_date_iso} to {end_date_iso}", - {"stage": "fetch_pages", "start_date": start_date_iso, "end_date": end_date_iso} + { + "stage": "fetch_pages", + "start_date": start_date_iso, + "end_date": end_date_iso, + }, ) - + # Get all pages try: - pages = notion_client.get_all_pages(start_date=start_date_iso, end_date=end_date_iso) + pages = notion_client.get_all_pages( + start_date=start_date_iso, end_date=end_date_iso + ) logger.info(f"Found {len(pages)} Notion pages") except Exception as e: await task_logger.log_task_failure( log_entry, f"Failed to get Notion pages for connector {connector_id}", str(e), - {"error_type": "PageFetchError"} + {"error_type": "PageFetchError"}, ) logger.error(f"Error fetching Notion pages: {str(e)}", exc_info=True) return 0, f"Failed to get Notion pages: {str(e)}" - + if not pages: await task_logger.log_task_success( log_entry, f"No Notion pages found for connector {connector_id}", - {"pages_found": 0} + {"pages_found": 0}, ) logger.info("No Notion pages found to index") return 0, "No Notion pages found" - + # Track the number of documents indexed documents_indexed = 0 documents_skipped = 0 skipped_pages = [] - + await task_logger.log_task_progress( log_entry, f"Starting to process {len(pages)} Notion pages", - {"stage": "process_pages", "total_pages": len(pages)} + {"stage": "process_pages", "total_pages": len(pages)}, ) - + # Process each page for page in pages: try: page_id = page.get("page_id") page_title = page.get("title", f"Untitled page ({page_id})") page_content = page.get("content", []) - + logger.info(f"Processing Notion page: {page_title} ({page_id})") - + if not page_content: logger.info(f"No content found in page {page_title}. Skipping.") skipped_pages.append(f"{page_title} (no content)") documents_skipped += 1 continue - + # Convert page content to markdown format markdown_content = f"# Notion Page: {page_title}\n\n" - + # Process blocks recursively def process_blocks(blocks, level=0): result = "" @@ -553,10 +659,10 @@ async def index_notion_pages( block_type = block.get("type") block_content = block.get("content", "") children = block.get("children", []) - + # Add indentation based on level indent = " " * level - + # Format based on block type if block_type in ["paragraph", "text"]: result += f"{indent}{block_content}\n\n" @@ -586,54 +692,62 @@ async def index_notion_pages( # Default for other block types if block_content: result += f"{indent}{block_content}\n\n" - + # Process children recursively if children: result += process_blocks(children, level + 1) - + return result - - logger.debug(f"Converting {len(page_content)} blocks to markdown for page {page_title}") + + logger.debug( + f"Converting {len(page_content)} blocks to markdown for page {page_title}" + ) markdown_content += process_blocks(page_content) - + # Format document metadata metadata_sections = [ - ("METADATA", [ - f"PAGE_TITLE: {page_title}", - f"PAGE_ID: {page_id}" - ]), - ("CONTENT", [ - "FORMAT: markdown", - "TEXT_START", - markdown_content, - "TEXT_END" - ]) + ("METADATA", [f"PAGE_TITLE: {page_title}", f"PAGE_ID: {page_id}"]), + ( + "CONTENT", + [ + "FORMAT: markdown", + "TEXT_START", + markdown_content, + "TEXT_END", + ], + ), ] - + # Build the document string document_parts = [] document_parts.append("") - + for section_title, section_content in metadata_sections: document_parts.append(f"<{section_title}>") document_parts.extend(section_content) document_parts.append(f"") - + document_parts.append("") - combined_document_string = '\n'.join(document_parts) - content_hash = generate_content_hash(combined_document_string, search_space_id) + combined_document_string = "\n".join(document_parts) + content_hash = generate_content_hash( + combined_document_string, search_space_id + ) # Check if document with this content hash already exists existing_doc_by_hash_result = await session.execute( select(Document).where(Document.content_hash == content_hash) ) - existing_document_by_hash = existing_doc_by_hash_result.scalars().first() - + existing_document_by_hash = ( + existing_doc_by_hash_result.scalars().first() + ) + if existing_document_by_hash: - logger.info(f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing.") + logger.info( + f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing." + ) documents_skipped += 1 continue - + # Get user's long context LLM user_llm = await get_user_long_context_llm(session, user_id) if not user_llm: @@ -641,21 +755,28 @@ async def index_notion_pages( skipped_pages.append(f"{page_title} (no LLM configured)") documents_skipped += 1 continue - + # Generate summary logger.debug(f"Generating summary for page {page_title}") summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm - summary_result = await summary_chain.ainvoke({"document": combined_document_string}) + summary_result = await summary_chain.ainvoke( + {"document": combined_document_string} + ) summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed(summary_content) - + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + # Process chunks logger.debug(f"Chunking content for page {page_title}") chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) for chunk in config.chunker_instance.chunk(markdown_content) ] - + # Create and store new document document = Document( search_space_id=search_space_id, @@ -664,41 +785,46 @@ async def index_notion_pages( document_metadata={ "page_title": page_title, "page_id": page_id, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, content=summary_content, content_hash=content_hash, embedding=summary_embedding, - chunks=chunks + chunks=chunks, ) - + session.add(document) documents_indexed += 1 logger.info(f"Successfully indexed new Notion page: {page_title}") - + except Exception as e: - logger.error(f"Error processing Notion page {page.get('title', 'Unknown')}: {str(e)}", exc_info=True) - skipped_pages.append(f"{page.get('title', 'Unknown')} (processing error)") + logger.error( + f"Error processing Notion page {page.get('title', 'Unknown')}: {str(e)}", + exc_info=True, + ) + skipped_pages.append( + f"{page.get('title', 'Unknown')} (processing error)" + ) documents_skipped += 1 continue # Skip this page and continue with others - + # Update the last_indexed_at timestamp for the connector only if requested # and if we successfully indexed at least one page total_processed = documents_indexed if update_last_indexed and total_processed > 0: connector.last_indexed_at = datetime.now() logger.info(f"Updated last_indexed_at for connector {connector_id}") - + # Commit all changes await session.commit() - + # Prepare result message result_message = None if skipped_pages: result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}" else: result_message = f"Processed {total_processed} pages." - + # Log success await task_logger.log_task_success( log_entry, @@ -708,22 +834,26 @@ async def index_notion_pages( "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, "skipped_pages_count": len(skipped_pages), - "result_message": result_message - } + "result_message": result_message, + }, + ) + + logger.info( + f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped" ) - - logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped") return total_processed, result_message - + except SQLAlchemyError as db_error: await session.rollback() await task_logger.log_task_failure( log_entry, f"Database error during Notion indexing for connector {connector_id}", str(db_error), - {"error_type": "SQLAlchemyError"} + {"error_type": "SQLAlchemyError"}, + ) + logger.error( + f"Database error during Notion indexing: {str(db_error)}", exc_info=True ) - logger.error(f"Database error during Notion indexing: {str(db_error)}", exc_info=True) return 0, f"Database error: {str(db_error)}" except Exception as e: await session.rollback() @@ -731,11 +861,12 @@ async def index_notion_pages( log_entry, f"Failed to index Notion pages for connector {connector_id}", str(e), - {"error_type": type(e).__name__} + {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Notion pages: {str(e)}", exc_info=True) return 0, f"Failed to index Notion pages: {str(e)}" + async def index_github_repos( session: AsyncSession, connector_id: int, @@ -743,7 +874,7 @@ async def index_github_repos( user_id: str, start_date: str = None, end_date: str = None, - update_last_indexed: bool = True + update_last_indexed: bool = True, ) -> Tuple[int, Optional[str]]: """ Index code and documentation files from accessible GitHub repositories. @@ -758,15 +889,20 @@ async def index_github_repos( Tuple containing (number of documents indexed, error message or None) """ task_logger = TaskLoggingService(session, search_space_id) - + # Log task start log_entry = await task_logger.log_task_start( task_name="github_repos_indexing", source="connector_indexing_task", message=f"Starting GitHub repositories indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, ) - + documents_processed = 0 errors = [] @@ -775,14 +911,14 @@ async def index_github_repos( await task_logger.log_task_progress( log_entry, f"Retrieving GitHub connector {connector_id} from database", - {"stage": "connector_retrieval"} + {"stage": "connector_retrieval"}, ) - + result = await session.execute( - select(SearchSourceConnector) - .filter( + select(SearchSourceConnector).filter( SearchSourceConnector.id == connector_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.GITHUB_CONNECTOR + SearchSourceConnector.connector_type + == SearchSourceConnectorType.GITHUB_CONNECTOR, ) ) connector = result.scalars().first() @@ -792,9 +928,12 @@ async def index_github_repos( log_entry, f"Connector with ID {connector_id} not found or is not a GitHub connector", "Connector not found", - {"error_type": "ConnectorNotFound"} + {"error_type": "ConnectorNotFound"}, + ) + return ( + 0, + f"Connector with ID {connector_id} not found or is not a GitHub connector", ) - return 0, f"Connector with ID {connector_id} not found or is not a GitHub connector" # 2. Get the GitHub PAT and selected repositories from the connector config github_pat = connector.config.get("GITHUB_PAT") @@ -805,16 +944,18 @@ async def index_github_repos( log_entry, f"GitHub Personal Access Token (PAT) not found in connector config for connector {connector_id}", "Missing GitHub PAT", - {"error_type": "MissingToken"} + {"error_type": "MissingToken"}, ) return 0, "GitHub Personal Access Token (PAT) not found in connector config" - - if not repo_full_names_to_index or not isinstance(repo_full_names_to_index, list): + + if not repo_full_names_to_index or not isinstance( + repo_full_names_to_index, list + ): await task_logger.log_task_failure( log_entry, f"'repo_full_names' not found or is not a list in connector config for connector {connector_id}", "Invalid repo configuration", - {"error_type": "InvalidConfiguration"} + {"error_type": "InvalidConfiguration"}, ) return 0, "'repo_full_names' not found or is not a list in connector config" @@ -822,9 +963,12 @@ async def index_github_repos( await task_logger.log_task_progress( log_entry, f"Initializing GitHub client for connector {connector_id}", - {"stage": "client_initialization", "repo_count": len(repo_full_names_to_index)} + { + "stage": "client_initialization", + "repo_count": len(repo_full_names_to_index), + }, ) - + try: github_client = GitHubConnector(token=github_pat) except ValueError as e: @@ -832,7 +976,7 @@ async def index_github_repos( log_entry, f"Failed to initialize GitHub client for connector {connector_id}", str(e), - {"error_type": "ClientInitializationError"} + {"error_type": "ClientInitializationError"}, ) return 0, f"Failed to initialize GitHub client: {str(e)}" @@ -842,12 +986,21 @@ async def index_github_repos( await task_logger.log_task_progress( log_entry, f"Starting indexing for {len(repo_full_names_to_index)} selected repositories", - {"stage": "repo_processing", "repo_count": len(repo_full_names_to_index), "start_date": start_date, "end_date": end_date} + { + "stage": "repo_processing", + "repo_count": len(repo_full_names_to_index), + "start_date": start_date, + "end_date": end_date, + }, + ) + + logger.info( + f"Starting indexing for {len(repo_full_names_to_index)} selected repositories." ) - - logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.") if start_date and end_date: - logger.info(f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)") + logger.info( + f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)" + ) # 6. Iterate through selected repositories and index files for repo_full_name in repo_full_names_to_index: @@ -859,65 +1012,92 @@ async def index_github_repos( try: files_to_index = github_client.get_repository_files(repo_full_name) if not files_to_index: - logger.info(f"No indexable files found in repository: {repo_full_name}") + logger.info( + f"No indexable files found in repository: {repo_full_name}" + ) continue - logger.info(f"Found {len(files_to_index)} files to process in {repo_full_name}") + logger.info( + f"Found {len(files_to_index)} files to process in {repo_full_name}" + ) for file_info in files_to_index: file_path = file_info.get("path") file_url = file_info.get("url") file_sha = file_info.get("sha") - file_type = file_info.get("type") # 'code' or 'doc' + file_type = file_info.get("type") # 'code' or 'doc' full_path_key = f"{repo_full_name}/{file_path}" if not file_path or not file_url or not file_sha: - logger.warning(f"Skipping file with missing info in {repo_full_name}: {file_info}") + logger.warning( + f"Skipping file with missing info in {repo_full_name}: {file_info}" + ) continue # Get file content - file_content = github_client.get_file_content(repo_full_name, file_path) + file_content = github_client.get_file_content( + repo_full_name, file_path + ) if file_content is None: - logger.warning(f"Could not retrieve content for {full_path_key}. Skipping.") - continue # Skip if content fetch failed - + logger.warning( + f"Could not retrieve content for {full_path_key}. Skipping." + ) + continue # Skip if content fetch failed + content_hash = generate_content_hash(file_content, search_space_id) # Check if document with this content hash already exists existing_doc_by_hash_result = await session.execute( select(Document).where(Document.content_hash == content_hash) ) - existing_document_by_hash = existing_doc_by_hash_result.scalars().first() - + existing_document_by_hash = ( + existing_doc_by_hash_result.scalars().first() + ) + if existing_document_by_hash: - logger.info(f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing.") + logger.info( + f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing." + ) continue - + # Use file_content directly for chunking, maybe summary for main content? # For now, let's use the full content for both, might need refinement - summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) # Chunk the content try: chunks_data = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) - for chunk in config.code_chunker_instance.chunk(file_content) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed( + chunk.text + ), + ) + for chunk in config.code_chunker_instance.chunk( + file_content + ) ] except Exception as chunk_err: - logger.error(f"Failed to chunk file {full_path_key}: {chunk_err}") - errors.append(f"Chunking failed for {full_path_key}: {chunk_err}") - continue # Skip this file if chunking fails + logger.error( + f"Failed to chunk file {full_path_key}: {chunk_err}" + ) + errors.append( + f"Chunking failed for {full_path_key}: {chunk_err}" + ) + continue # Skip this file if chunking fails doc_metadata = { "repository_full_name": repo_full_name, "file_path": file_path, - "full_path": full_path_key, # For easier lookup + "full_path": full_path_key, # For easier lookup "url": file_url, "sha": file_sha, "type": file_type, - "indexed_at": datetime.now(timezone.utc).isoformat() + "indexed_at": datetime.now(timezone.utc).isoformat(), } # Create new document @@ -926,22 +1106,26 @@ async def index_github_repos( title=f"GitHub - {file_path}", document_type=DocumentType.GITHUB_CONNECTOR, document_metadata=doc_metadata, - content=summary_content, # Store summary + content=summary_content, # Store summary content_hash=content_hash, embedding=summary_embedding, search_space_id=search_space_id, - chunks=chunks_data # Associate chunks directly + chunks=chunks_data, # Associate chunks directly ) session.add(document) documents_processed += 1 except Exception as repo_err: - logger.error(f"Failed to process repository {repo_full_name}: {repo_err}") + logger.error( + f"Failed to process repository {repo_full_name}: {repo_err}" + ) errors.append(f"Failed processing {repo_full_name}: {repo_err}") - + # Commit all changes at the end await session.commit() - logger.info(f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files.") + logger.info( + f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files." + ) # Log success await task_logger.log_task_success( @@ -950,8 +1134,8 @@ async def index_github_repos( { "documents_processed": documents_processed, "errors_count": len(errors), - "repo_count": len(repo_full_names_to_index) - } + "repo_count": len(repo_full_names_to_index), + }, ) except SQLAlchemyError as db_err: @@ -960,9 +1144,11 @@ async def index_github_repos( log_entry, f"Database error during GitHub indexing for connector {connector_id}", str(db_err), - {"error_type": "SQLAlchemyError"} + {"error_type": "SQLAlchemyError"}, + ) + logger.error( + f"Database error during GitHub indexing for connector {connector_id}: {db_err}" ) - logger.error(f"Database error during GitHub indexing for connector {connector_id}: {db_err}") errors.append(f"Database error: {db_err}") return documents_processed, "; ".join(errors) if errors else str(db_err) except Exception as e: @@ -971,15 +1157,19 @@ async def index_github_repos( log_entry, f"Unexpected error during GitHub indexing for connector {connector_id}", str(e), - {"error_type": type(e).__name__} + {"error_type": type(e).__name__}, + ) + logger.error( + f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", + exc_info=True, ) - logger.error(f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", exc_info=True) errors.append(f"Unexpected error: {e}") return documents_processed, "; ".join(errors) if errors else str(e) error_message = "; ".join(errors) if errors else None return documents_processed, error_message + async def index_linear_issues( session: AsyncSession, connector_id: int, @@ -987,56 +1177,64 @@ async def index_linear_issues( user_id: str, start_date: str = None, end_date: str = None, - update_last_indexed: bool = True + update_last_indexed: bool = True, ) -> Tuple[int, Optional[str]]: """ Index Linear issues and comments. - + Args: session: Database session connector_id: ID of the Linear connector search_space_id: ID of the search space to store documents in update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - + Returns: Tuple containing (number of documents indexed, error message or None) """ task_logger = TaskLoggingService(session, search_space_id) - + # Log task start log_entry = await task_logger.log_task_start( task_name="linear_issues_indexing", source="connector_indexing_task", message=f"Starting Linear issues indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, ) - + try: # Get the connector await task_logger.log_task_progress( log_entry, f"Retrieving Linear connector {connector_id} from database", - {"stage": "connector_retrieval"} + {"stage": "connector_retrieval"}, ) - + result = await session.execute( - select(SearchSourceConnector) - .filter( + select(SearchSourceConnector).filter( SearchSourceConnector.id == connector_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, ) ) connector = result.scalars().first() - + if not connector: await task_logger.log_task_failure( log_entry, f"Connector with ID {connector_id} not found or is not a Linear connector", "Connector not found", - {"error_type": "ConnectorNotFound"} + {"error_type": "ConnectorNotFound"}, ) - return 0, f"Connector with ID {connector_id} not found or is not a Linear connector" - + return ( + 0, + f"Connector with ID {connector_id} not found or is not a Linear connector", + ) + # Get the Linear token from the connector config linear_token = connector.config.get("LINEAR_API_KEY") if not linear_token: @@ -1044,135 +1242,167 @@ async def index_linear_issues( log_entry, f"Linear API token not found in connector config for connector {connector_id}", "Missing Linear token", - {"error_type": "MissingToken"} + {"error_type": "MissingToken"}, ) return 0, "Linear API token not found in connector config" - + # Initialize Linear client await task_logger.log_task_progress( log_entry, f"Initializing Linear client for connector {connector_id}", - {"stage": "client_initialization"} + {"stage": "client_initialization"}, ) - + linear_client = LinearConnector(token=linear_token) - + # Calculate date range if start_date is None or end_date is None: # Fall back to calculating dates based on last_indexed_at calculated_end_date = datetime.now() - + # Use last_indexed_at as start date if available, otherwise use 365 days ago if connector.last_indexed_at: # Convert dates to be comparable (both timezone-naive) - last_indexed_naive = connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at - + last_indexed_naive = ( + connector.last_indexed_at.replace(tzinfo=None) + if connector.last_indexed_at.tzinfo + else connector.last_indexed_at + ) + # Check if last_indexed_at is in the future or after end_date if last_indexed_naive > calculated_end_date: - logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.") + logger.warning( + f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead." + ) calculated_start_date = calculated_end_date - timedelta(days=365) else: calculated_start_date = last_indexed_naive - logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date") + logger.info( + f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date" + ) else: - calculated_start_date = calculated_end_date - timedelta(days=365) # Use 365 days as default - logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date") - + calculated_start_date = calculated_end_date - timedelta( + days=365 + ) # Use 365 days as default + logger.info( + f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date" + ) + # Use calculated dates if not provided - start_date_str = start_date if start_date else calculated_start_date.strftime("%Y-%m-%d") - end_date_str = end_date if end_date else calculated_end_date.strftime("%Y-%m-%d") + start_date_str = ( + start_date if start_date else calculated_start_date.strftime("%Y-%m-%d") + ) + end_date_str = ( + end_date if end_date else calculated_end_date.strftime("%Y-%m-%d") + ) else: # Use provided dates start_date_str = start_date end_date_str = end_date - + logger.info(f"Fetching Linear issues from {start_date_str} to {end_date_str}") - + await task_logger.log_task_progress( log_entry, f"Fetching Linear issues from {start_date_str} to {end_date_str}", - {"stage": "fetch_issues", "start_date": start_date_str, "end_date": end_date_str} + { + "stage": "fetch_issues", + "start_date": start_date_str, + "end_date": end_date_str, + }, ) - + # Get issues within date range try: issues, error = linear_client.get_issues_by_date_range( - start_date=start_date_str, - end_date=end_date_str, - include_comments=True + start_date=start_date_str, end_date=end_date_str, include_comments=True ) - + if error: logger.error(f"Failed to get Linear issues: {error}") - + # Don't treat "No issues found" as an error that should stop indexing if "No issues found" in error: - logger.info("No issues found is not a critical error, continuing with update") + logger.info( + "No issues found is not a critical error, continuing with update" + ) if update_last_indexed: connector.last_indexed_at = datetime.now() await session.commit() - logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found") + logger.info( + f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found" + ) return 0, None else: return 0, f"Failed to get Linear issues: {error}" - + logger.info(f"Retrieved {len(issues)} issues from Linear API") - + except Exception as e: logger.error(f"Exception when calling Linear API: {str(e)}", exc_info=True) return 0, f"Failed to get Linear issues: {str(e)}" - + if not issues: logger.info("No Linear issues found for the specified date range") if update_last_indexed: connector.last_indexed_at = datetime.now() await session.commit() - logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found") + logger.info( + f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found" + ) return 0, None # Return None instead of error message when no issues found - + # Log issue IDs and titles for debugging logger.info("Issues retrieved from Linear API:") for idx, issue in enumerate(issues[:10]): # Log first 10 issues - logger.info(f" {idx+1}. {issue.get('identifier', 'Unknown')} - {issue.get('title', 'Unknown')} - Created: {issue.get('createdAt', 'Unknown')} - Updated: {issue.get('updatedAt', 'Unknown')}") + logger.info( + f" {idx + 1}. {issue.get('identifier', 'Unknown')} - {issue.get('title', 'Unknown')} - Created: {issue.get('createdAt', 'Unknown')} - Updated: {issue.get('updatedAt', 'Unknown')}" + ) if len(issues) > 10: logger.info(f" ...and {len(issues) - 10} more issues") - + # Track the number of documents indexed documents_indexed = 0 documents_skipped = 0 skipped_issues = [] - + await task_logger.log_task_progress( log_entry, f"Starting to process {len(issues)} Linear issues", - {"stage": "process_issues", "total_issues": len(issues)} + {"stage": "process_issues", "total_issues": len(issues)}, ) - + # Process each issue for issue in issues: try: issue_id = issue.get("id") issue_identifier = issue.get("identifier", "") issue_title = issue.get("title", "") - + if not issue_id or not issue_title: - logger.warning(f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}") - skipped_issues.append(f"{issue_identifier or 'Unknown'} (missing data)") + logger.warning( + f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}" + ) + skipped_issues.append( + f"{issue_identifier or 'Unknown'} (missing data)" + ) documents_skipped += 1 continue - + # Format the issue first to get well-structured data formatted_issue = linear_client.format_issue(issue) - + # Convert issue to markdown format issue_content = linear_client.format_issue_to_markdown(formatted_issue) - + if not issue_content: - logger.warning(f"Skipping issue with no content: {issue_identifier} - {issue_title}") + logger.warning( + f"Skipping issue with no content: {issue_identifier} - {issue_title}" + ) skipped_issues.append(f"{issue_identifier} (no content)") documents_skipped += 1 continue - + # Create a short summary for the embedding # This avoids using the LLM and just uses the issue data directly state = formatted_issue.get("state", "Unknown") @@ -1180,40 +1410,51 @@ async def index_linear_issues( # Truncate description if it's too long for the summary if description and len(description) > 500: description = description[:497] + "..." - + # Create a simple summary from the issue data summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n" if description: summary_content += f"Description: {description}\n\n" - + # Add comment count comment_count = len(formatted_issue.get("comments", [])) summary_content += f"Comments: {comment_count}" - + content_hash = generate_content_hash(issue_content, search_space_id) # Check if document with this content hash already exists existing_doc_by_hash_result = await session.execute( select(Document).where(Document.content_hash == content_hash) ) - existing_document_by_hash = existing_doc_by_hash_result.scalars().first() - + existing_document_by_hash = ( + existing_doc_by_hash_result.scalars().first() + ) + if existing_document_by_hash: - logger.info(f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing.") + logger.info( + f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing." + ) documents_skipped += 1 continue - + # Generate embedding for the summary - summary_embedding = config.embedding_model_instance.embed(summary_content) - + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + # Process chunks - using the full issue content with comments chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) for chunk in config.chunker_instance.chunk(issue_content) ] - + # Create and store new document - logger.info(f"Creating new document for issue {issue_identifier} - {issue_title}") + logger.info( + f"Creating new document for issue {issue_identifier} - {issue_title}" + ) document = Document( search_space_id=search_space_id, title=f"Linear - {issue_identifier}: {issue_title}", @@ -1224,34 +1465,41 @@ async def index_linear_issues( "issue_title": issue_title, "state": state, "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, content=summary_content, content_hash=content_hash, embedding=summary_embedding, - chunks=chunks + chunks=chunks, ) - + session.add(document) documents_indexed += 1 - logger.info(f"Successfully indexed new issue {issue_identifier} - {issue_title}") - + logger.info( + f"Successfully indexed new issue {issue_identifier} - {issue_title}" + ) + except Exception as e: - logger.error(f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}", exc_info=True) - skipped_issues.append(f"{issue.get('identifier', 'Unknown')} (processing error)") + logger.error( + f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}", + exc_info=True, + ) + skipped_issues.append( + f"{issue.get('identifier', 'Unknown')} (processing error)" + ) documents_skipped += 1 continue # Skip this issue and continue with others - + # Update the last_indexed_at timestamp for the connector only if requested total_processed = documents_indexed if update_last_indexed: connector.last_indexed_at = datetime.now() logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") - + # Commit all changes await session.commit() - logger.info(f"Successfully committed all Linear document changes to database") - + logger.info("Successfully committed all Linear document changes to database") + # Log success await task_logger.log_task_success( log_entry, @@ -1260,20 +1508,25 @@ async def index_linear_issues( "issues_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, - "skipped_issues_count": len(skipped_issues) - } + "skipped_issues_count": len(skipped_issues), + }, ) - - logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped") - return total_processed, None # Return None as the error message to indicate success - + + logger.info( + f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped" + ) + return ( + total_processed, + None, + ) # Return None as the error message to indicate success + except SQLAlchemyError as db_error: await session.rollback() await task_logger.log_task_failure( log_entry, f"Database error during Linear indexing for connector {connector_id}", str(db_error), - {"error_type": "SQLAlchemyError"} + {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {str(db_error)}", exc_info=True) return 0, f"Database error: {str(db_error)}" @@ -1283,11 +1536,12 @@ async def index_linear_issues( log_entry, f"Failed to index Linear issues for connector {connector_id}", str(e), - {"error_type": type(e).__name__} + {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True) return 0, f"Failed to index Linear issues: {str(e)}" + async def index_discord_messages( session: AsyncSession, connector_id: int, @@ -1295,7 +1549,7 @@ async def index_discord_messages( user_id: str, start_date: str = None, end_date: str = None, - update_last_indexed: bool = True + update_last_indexed: bool = True, ) -> Tuple[int, Optional[str]]: """ Index Discord messages from all accessible channels. @@ -1310,28 +1564,33 @@ async def index_discord_messages( Tuple containing (number of documents indexed, error message or None) """ task_logger = TaskLoggingService(session, search_space_id) - + # Log task start log_entry = await task_logger.log_task_start( task_name="discord_messages_indexing", source="connector_indexing_task", message=f"Starting Discord messages indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, ) - + try: # Get the connector await task_logger.log_task_progress( log_entry, f"Retrieving Discord connector {connector_id} from database", - {"stage": "connector_retrieval"} + {"stage": "connector_retrieval"}, ) - + result = await session.execute( - select(SearchSourceConnector) - .filter( + select(SearchSourceConnector).filter( SearchSourceConnector.id == connector_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR + SearchSourceConnector.connector_type + == SearchSourceConnectorType.DISCORD_CONNECTOR, ) ) connector = result.scalars().first() @@ -1341,9 +1600,12 @@ async def index_discord_messages( log_entry, f"Connector with ID {connector_id} not found or is not a Discord connector", "Connector not found", - {"error_type": "ConnectorNotFound"} + {"error_type": "ConnectorNotFound"}, + ) + return ( + 0, + f"Connector with ID {connector_id} not found or is not a Discord connector", ) - return 0, f"Connector with ID {connector_id} not found or is not a Discord connector" # Get the Discord token from the connector config discord_token = connector.config.get("DISCORD_BOT_TOKEN") @@ -1352,7 +1614,7 @@ async def index_discord_messages( log_entry, f"Discord token not found in connector config for connector {connector_id}", "Missing Discord token", - {"error_type": "MissingToken"} + {"error_type": "MissingToken"}, ) return 0, "Discord token not found in connector config" @@ -1362,9 +1624,9 @@ async def index_discord_messages( await task_logger.log_task_progress( log_entry, f"Initializing Discord client for connector {connector_id}", - {"stage": "client_initialization"} + {"stage": "client_initialization"}, ) - + discord_client = DiscordConnector(token=discord_token) # Calculate date range @@ -1374,30 +1636,54 @@ async def index_discord_messages( # Use last_indexed_at as start date if available, otherwise use 365 days ago if connector.last_indexed_at: - calculated_start_date = connector.last_indexed_at.replace(tzinfo=timezone.utc) - logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date") + calculated_start_date = connector.last_indexed_at.replace( + tzinfo=timezone.utc + ) + logger.info( + f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date" + ) else: calculated_start_date = calculated_end_date - timedelta(days=365) - logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date") + logger.info( + f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date" + ) # Use calculated dates if not provided, convert to ISO format for Discord API if start_date is None: start_date_iso = calculated_start_date.isoformat() else: # Convert YYYY-MM-DD to ISO format - start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat() - + start_date_iso = ( + datetime.strptime(start_date, "%Y-%m-%d") + .replace(tzinfo=timezone.utc) + .isoformat() + ) + if end_date is None: end_date_iso = calculated_end_date.isoformat() else: - # Convert YYYY-MM-DD to ISO format - end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat() + # Convert YYYY-MM-DD to ISO format + end_date_iso = ( + datetime.strptime(end_date, "%Y-%m-%d") + .replace(tzinfo=timezone.utc) + .isoformat() + ) else: # Convert provided dates to ISO format for Discord API - start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat() - end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat() - - logger.info(f"Indexing Discord messages from {start_date_iso} to {end_date_iso}") + start_date_iso = ( + datetime.strptime(start_date, "%Y-%m-%d") + .replace(tzinfo=timezone.utc) + .isoformat() + ) + end_date_iso = ( + datetime.strptime(end_date, "%Y-%m-%d") + .replace(tzinfo=timezone.utc) + .isoformat() + ) + + logger.info( + f"Indexing Discord messages from {start_date_iso} to {end_date_iso}" + ) documents_indexed = 0 documents_skipped = 0 @@ -1407,9 +1693,9 @@ async def index_discord_messages( await task_logger.log_task_progress( log_entry, f"Starting Discord bot and fetching guilds for connector {connector_id}", - {"stage": "fetch_guilds"} + {"stage": "fetch_guilds"}, ) - + logger.info("Starting Discord bot to fetch guilds") discord_client._bot_task = asyncio.create_task(discord_client.start_bot()) await discord_client._wait_until_ready() @@ -1422,7 +1708,7 @@ async def index_discord_messages( log_entry, f"Failed to get Discord guilds for connector {connector_id}", str(e), - {"error_type": "GuildFetchError"} + {"error_type": "GuildFetchError"}, ) logger.error(f"Failed to get Discord guilds: {str(e)}", exc_info=True) await discord_client.close_bot() @@ -1431,7 +1717,7 @@ async def index_discord_messages( await task_logger.log_task_success( log_entry, f"No Discord guilds found for connector {connector_id}", - {"guilds_found": 0} + {"guilds_found": 0}, ) logger.info("No Discord guilds found to index") await discord_client.close_bot() @@ -1441,9 +1727,9 @@ async def index_discord_messages( await task_logger.log_task_progress( log_entry, f"Starting to process {len(guilds)} Discord guilds", - {"stage": "process_guilds", "total_guilds": len(guilds)} + {"stage": "process_guilds", "total_guilds": len(guilds)}, ) - + for guild in guilds: guild_id = guild["id"] guild_name = guild["name"] @@ -1467,13 +1753,19 @@ async def index_discord_messages( end_date=end_date_iso, ) except Exception as e: - logger.error(f"Failed to get messages for channel {channel_name}: {str(e)}") - skipped_channels.append(f"{guild_name}#{channel_name} (fetch error)") + logger.error( + f"Failed to get messages for channel {channel_name}: {str(e)}" + ) + skipped_channels.append( + f"{guild_name}#{channel_name} (fetch error)" + ) documents_skipped += 1 continue if not messages: - logger.info(f"No messages found in channel {channel_name} for the specified date range.") + logger.info( + f"No messages found in channel {channel_name} for the specified date range." + ) documents_skipped += 1 continue @@ -1486,33 +1778,45 @@ async def index_discord_messages( formatted_messages.append(msg) if not formatted_messages: - logger.info(f"No valid messages found in channel {channel_name} after filtering.") + logger.info( + f"No valid messages found in channel {channel_name} after filtering." + ) documents_skipped += 1 continue # Convert messages to markdown format - channel_content = f"# Discord Channel: {guild_name} / {channel_name}\n\n" + channel_content = ( + f"# Discord Channel: {guild_name} / {channel_name}\n\n" + ) for msg in formatted_messages: user_name = msg.get("author_name", "Unknown User") timestamp = msg.get("created_at", "Unknown Time") text = msg.get("content", "") - channel_content += f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n" + channel_content += ( + f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n" + ) # Format document metadata metadata_sections = [ - ("METADATA", [ - f"GUILD_NAME: {guild_name}", - f"GUILD_ID: {guild_id}", - f"CHANNEL_NAME: {channel_name}", - f"CHANNEL_ID: {channel_id}", - f"MESSAGE_COUNT: {len(formatted_messages)}" - ]), - ("CONTENT", [ - "FORMAT: markdown", - "TEXT_START", - channel_content, - "TEXT_END" - ]) + ( + "METADATA", + [ + f"GUILD_NAME: {guild_name}", + f"GUILD_ID: {guild_id}", + f"CHANNEL_NAME: {channel_name}", + f"CHANNEL_ID: {channel_id}", + f"MESSAGE_COUNT: {len(formatted_messages)}", + ], + ), + ( + "CONTENT", + [ + "FORMAT: markdown", + "TEXT_START", + channel_content, + "TEXT_END", + ], + ), ] # Build the document string @@ -1523,31 +1827,43 @@ async def index_discord_messages( document_parts.extend(section_content) document_parts.append(f"") document_parts.append("") - combined_document_string = '\n'.join(document_parts) - content_hash = generate_content_hash(combined_document_string, search_space_id) + combined_document_string = "\n".join(document_parts) + content_hash = generate_content_hash( + combined_document_string, search_space_id + ) # Check if document with this content hash already exists existing_doc_by_hash_result = await session.execute( select(Document).where(Document.content_hash == content_hash) ) - existing_document_by_hash = existing_doc_by_hash_result.scalars().first() + existing_document_by_hash = ( + existing_doc_by_hash_result.scalars().first() + ) if existing_document_by_hash: - logger.info(f"Document with content hash {content_hash} already exists for channel {guild_name}#{channel_name}. Skipping processing.") + logger.info( + f"Document with content hash {content_hash} already exists for channel {guild_name}#{channel_name}. Skipping processing." + ) documents_skipped += 1 continue # Get user's long context LLM user_llm = await get_user_long_context_llm(session, user_id) if not user_llm: - logger.error(f"No long context LLM configured for user {user_id}") - skipped_channels.append(f"{guild_name}#{channel_name} (no LLM configured)") + logger.error( + f"No long context LLM configured for user {user_id}" + ) + skipped_channels.append( + f"{guild_name}#{channel_name} (no LLM configured)" + ) documents_skipped += 1 continue # Generate summary using summary_chain summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm - summary_result = await summary_chain.ainvoke({"document": combined_document_string}) + summary_result = await summary_chain.ainvoke( + {"document": combined_document_string} + ) summary_content = summary_result.content summary_embedding = await asyncio.to_thread( config.embedding_model_instance.embed, summary_content @@ -1555,14 +1871,17 @@ async def index_discord_messages( # Process chunks raw_chunks = await asyncio.to_thread( - config.chunker_instance.chunk, - channel_content + config.chunker_instance.chunk, channel_content ) - chunk_texts = [chunk.text for chunk in raw_chunks if chunk.text.strip()] + chunk_texts = [ + chunk.text for chunk in raw_chunks if chunk.text.strip() + ] chunk_embeddings = await asyncio.to_thread( - lambda texts: [config.embedding_model_instance.embed(t) for t in texts], - chunk_texts + lambda texts: [ + config.embedding_model_instance.embed(t) for t in texts + ], + chunk_texts, ) chunks = [ @@ -1583,20 +1902,26 @@ async def index_discord_messages( "message_count": len(formatted_messages), "start_date": start_date_iso, "end_date": end_date_iso, - "indexed_at": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + "indexed_at": datetime.now(timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ), }, content=summary_content, content_hash=content_hash, embedding=summary_embedding, - chunks=chunks + chunks=chunks, ) session.add(document) documents_indexed += 1 - logger.info(f"Successfully indexed new channel {guild_name}#{channel_name} with {len(formatted_messages)} messages") + logger.info( + f"Successfully indexed new channel {guild_name}#{channel_name} with {len(formatted_messages)} messages" + ) except Exception as e: - logger.error(f"Error processing guild {guild_name}: {str(e)}", exc_info=True) + logger.error( + f"Error processing guild {guild_name}: {str(e)}", exc_info=True + ) skipped_channels.append(f"{guild_name} (processing error)") documents_skipped += 1 continue @@ -1625,11 +1950,13 @@ async def index_discord_messages( "documents_skipped": documents_skipped, "skipped_channels_count": len(skipped_channels), "guilds_processed": len(guilds), - "result_message": result_message - } + "result_message": result_message, + }, ) - logger.info(f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped") + logger.info( + f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped" + ) return documents_indexed, result_message except SQLAlchemyError as db_error: @@ -1638,9 +1965,11 @@ async def index_discord_messages( log_entry, f"Database error during Discord indexing for connector {connector_id}", str(db_error), - {"error_type": "SQLAlchemyError"} + {"error_type": "SQLAlchemyError"}, + ) + logger.error( + f"Database error during Discord indexing: {str(db_error)}", exc_info=True ) - logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True) return 0, f"Database error: {str(db_error)}" except Exception as e: await session.rollback() @@ -1648,7 +1977,7 @@ async def index_discord_messages( log_entry, f"Failed to index Discord messages for connector {connector_id}", str(e), - {"error_type": type(e).__name__} + {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True) return 0, f"Failed to index Discord messages: {str(e)}" @@ -1661,7 +1990,7 @@ async def index_jira_issues( user_id: str, start_date: str = None, end_date: str = None, - update_last_indexed: bool = True + update_last_indexed: bool = True, ) -> Tuple[int, Optional[str]]: """ Index Jira issues and comments. @@ -1685,13 +2014,20 @@ async def index_jira_issues( task_name="jira_issues_indexing", source="connector_indexing_task", message=f"Starting Jira issues indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, ) try: # Get the connector from the database result = await session.execute( - select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id) + select(SearchSourceConnector).where( + SearchSourceConnector.id == connector_id + ) ) connector = result.scalar_one_or_none() @@ -1700,7 +2036,7 @@ async def index_jira_issues( log_entry, f"Connector with ID {connector_id} not found", "Connector not found", - {"error_type": "ConnectorNotFound"} + {"error_type": "ConnectorNotFound"}, ) return 0, f"Connector with ID {connector_id} not found" @@ -1713,7 +2049,7 @@ async def index_jira_issues( log_entry, f"Jira credentials not found in connector config for connector {connector_id}", "Missing Jira credentials", - {"error_type": "MissingCredentials"} + {"error_type": "MissingCredentials"}, ) return 0, "Jira credentials not found in connector config" @@ -1721,10 +2057,12 @@ async def index_jira_issues( await task_logger.log_task_progress( log_entry, f"Initializing Jira client for connector {connector_id}", - {"stage": "client_initialization"} + {"stage": "client_initialization"}, ) - jira_client = JiraConnector(base_url=jira_base_url, personal_access_token=jira_token) + jira_client = JiraConnector( + base_url=jira_base_url, personal_access_token=jira_token + ) # Calculate date range if start_date is None or end_date is None: @@ -1737,8 +2075,8 @@ async def index_jira_issues( # If never indexed, go back 30 days calculated_start_date = calculated_end_date - timedelta(days=30) - start_date_str = calculated_start_date.strftime('%Y-%m-%d') - end_date_str = calculated_end_date.strftime('%Y-%m-%d') + start_date_str = calculated_start_date.strftime("%Y-%m-%d") + end_date_str = calculated_end_date.strftime("%Y-%m-%d") else: start_date_str = start_date end_date_str = end_date @@ -1746,15 +2084,17 @@ async def index_jira_issues( await task_logger.log_task_progress( log_entry, f"Fetching Jira issues from {start_date_str} to {end_date_str}", - {"stage": "fetching_issues", "start_date": start_date_str, "end_date": end_date_str} + { + "stage": "fetching_issues", + "start_date": start_date_str, + "end_date": end_date_str, + }, ) # Get issues within date range try: issues, error = jira_client.get_issues_by_date_range( - start_date=start_date_str, - end_date=end_date_str, - include_comments=True + start_date=start_date_str, end_date=end_date_str, include_comments=True ) if error: @@ -1762,16 +2102,20 @@ async def index_jira_issues( # Don't treat "No issues found" as an error that should stop indexing if "No issues found" in error: - logger.info("No issues found is not a critical error, continuing with update") + logger.info( + "No issues found is not a critical error, continuing with update" + ) if update_last_indexed: connector.last_indexed_at = datetime.now() await session.commit() - logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found") + logger.info( + f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found" + ) await task_logger.log_task_completion( log_entry, f"No Jira issues found in date range {start_date_str} to {end_date_str}", - {"indexed_count": 0} + {"indexed_count": 0}, ) return 0, None else: @@ -1779,7 +2123,7 @@ async def index_jira_issues( log_entry, f"Failed to get Jira issues: {error}", "API Error", - {"error_type": "APIError"} + {"error_type": "APIError"}, ) return 0, f"Failed to get Jira issues: {error}" @@ -1788,7 +2132,7 @@ async def index_jira_issues( await task_logger.log_task_progress( log_entry, f"Retrieved {len(issues)} issues from Jira API", - {"stage": "processing_issues", "issue_count": len(issues)} + {"stage": "processing_issues", "issue_count": len(issues)}, ) except Exception as e: @@ -1796,7 +2140,7 @@ async def index_jira_issues( log_entry, f"Error fetching Jira issues: {str(e)}", "Fetch Error", - {"error_type": type(e).__name__} + {"error_type": type(e).__name__}, ) logger.error(f"Error fetching Jira issues: {str(e)}", exc_info=True) return 0, f"Error fetching Jira issues: {str(e)}" @@ -1820,14 +2164,20 @@ async def index_jira_issues( "priority": formatted_issue.get("priority", ""), "issue_type": formatted_issue.get("issue_type", ""), "project": formatted_issue.get("project", ""), - "assignee": formatted_issue.get("assignee", {}).get("display_name", "") if formatted_issue.get("assignee") else "", - "reporter": formatted_issue.get("reporter", {}).get("display_name", ""), + "assignee": ( + formatted_issue.get("assignee", {}).get("display_name", "") + if formatted_issue.get("assignee") + else "" + ), + "reporter": formatted_issue.get("reporter", {}).get( + "display_name", "" + ), "created_at": formatted_issue.get("created_at", ""), "updated_at": formatted_issue.get("updated_at", ""), "comment_count": len(formatted_issue.get("comments", [])), "connector_id": connector_id, "source": "jira", - "base_url": jira_base_url + "base_url": jira_base_url, } # Generate content hash @@ -1840,7 +2190,9 @@ async def index_jira_issues( existing_doc = existing_doc_result.scalar_one_or_none() if existing_doc: - logger.debug(f"Document with hash {content_hash} already exists, skipping") + logger.debug( + f"Document with hash {content_hash} already exists, skipping" + ) continue # Create new document @@ -1850,34 +2202,47 @@ async def index_jira_issues( document_metadata=metadata, content=issue_markdown, content_hash=content_hash, - search_space_id=search_space_id + search_space_id=search_space_id, ) # Generate embedding - embedding = await config.embedding_model_instance.get_embedding(issue_markdown) + embedding = await config.embedding_model_instance.get_embedding( + issue_markdown + ) document.embedding = embedding session.add(document) await session.flush() # Flush to get the document ID # Create chunks for the document - chunks = await config.chunking_model_instance.chunk_document(issue_markdown) + chunks = await config.chunking_model_instance.chunk_document( + issue_markdown + ) for chunk_content in chunks: - chunk_embedding = await config.embedding_model_instance.get_embedding(chunk_content) + chunk_embedding = ( + await config.embedding_model_instance.get_embedding( + chunk_content + ) + ) chunk = Chunk( content=chunk_content, embedding=chunk_embedding, - document_id=document.id + document_id=document.id, ) session.add(chunk) indexed_count += 1 - logger.debug(f"Indexed Jira issue: {formatted_issue.get('key', 'Unknown')}") + logger.debug( + f"Indexed Jira issue: {formatted_issue.get('key', 'Unknown')}" + ) except Exception as e: - logger.error(f"Error processing Jira issue {issue.get('key', 'Unknown')}: {str(e)}", exc_info=True) + logger.error( + f"Error processing Jira issue {issue.get('key', 'Unknown')}: {str(e)}", + exc_info=True, + ) continue # Commit all changes @@ -1892,7 +2257,7 @@ async def index_jira_issues( await task_logger.log_task_completion( log_entry, f"Successfully indexed {indexed_count} Jira issues", - {"indexed_count": indexed_count} + {"indexed_count": indexed_count}, ) logger.info(f"Successfully indexed {indexed_count} Jira issues") @@ -1903,7 +2268,7 @@ async def index_jira_issues( log_entry, f"Failed to index Jira issues: {str(e)}", str(e), - {"error_type": type(e).__name__} + {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Jira issues: {str(e)}", exc_info=True) return 0, f"Failed to index Jira issues: {str(e)}"