diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py
index 30d572a..0fb9dc3 100644
--- a/surfsense_backend/app/agents/researcher/nodes.py
+++ b/surfsense_backend/app/agents/researcher/nodes.py
@@ -172,20 +172,41 @@ async def fetch_documents_by_ids(
channel_id = metadata.get('channel_id', '')
guild_id = metadata.get('guild_id', '')
message_date = metadata.get('start_date', '')
-
+
title = f"Discord: {channel_name}"
if message_date:
title += f" ({message_date})"
-
+
description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content
-
+
if guild_id and channel_id:
url = f"https://discord.com/channels/{guild_id}/{channel_id}"
elif channel_id:
url = f"https://discord.com/channels/@me/{channel_id}"
else:
url = ""
-
+
+ elif doc_type == "JIRA_CONNECTOR":
+ # Extract Jira-specific metadata
+ issue_key = metadata.get('issue_key', 'Unknown Issue')
+ issue_title = metadata.get('issue_title', 'Untitled Issue')
+ status = metadata.get('status', '')
+ priority = metadata.get('priority', '')
+ issue_type = metadata.get('issue_type', '')
+
+ title = f"Jira: {issue_key} - {issue_title}"
+ if status:
+ title += f" ({status})"
+
+ description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content
+
+ # Construct Jira URL if we have the base URL
+ base_url = metadata.get('base_url', '')
+ if base_url and issue_key:
+ url = f"{base_url}/browse/{issue_key}"
+ else:
+ url = ""
+
elif doc_type == "EXTENSION":
# Extract Extension-specific metadata
webpage_title = metadata.get('VisitedWebPageTitle', doc.title)
@@ -227,6 +248,7 @@ async def fetch_documents_by_ids(
"GITHUB_CONNECTOR": "GitHub (Selected)",
"YOUTUBE_VIDEO": "YouTube Videos (Selected)",
"DISCORD_CONNECTOR": "Discord (Selected)",
+ "JIRA_CONNECTOR": "Jira Issues (Selected)",
"EXTENSION": "Browser Extension (Selected)",
"CRAWLED_URL": "Web Pages (Selected)",
"FILE": "Files (Selected)"
@@ -741,6 +763,30 @@ async def fetch_relevant_documents(
}
)
+ elif connector == "JIRA_CONNECTOR":
+ source_object, jira_chunks = await connector_service.search_jira(
+ user_query=reformulated_query,
+ user_id=user_id,
+ search_space_id=search_space_id,
+ top_k=top_k,
+ search_mode=search_mode
+ )
+
+ # Add to sources and raw documents
+ if source_object:
+ all_sources.append(source_object)
+ all_raw_documents.extend(jira_chunks)
+
+ # Stream found document count
+ if streaming_service and writer:
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"🎫 Found {len(jira_chunks)} Jira issues related to your query"
+ )
+ }
+ )
+
except Exception as e:
error_message = f"Error searching connector {connector}: {str(e)}"
print(error_message)
diff --git a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py
index eed0722..d726dfd 100644
--- a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py
+++ b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py
@@ -15,6 +15,8 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel
- YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos)
- GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions)
- LINEAR_CONNECTOR: "Linear project issues and discussions" (personal project management)
+- JIRA_CONNECTOR: "Jira project issues, tickets, and comments" (personal project tracking)
+- DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications)
- DISCORD_CONNECTOR: "Discord server messages and channels" (personal community interactions)
- TAVILY_API: "Tavily search API results" (personalized search results)
- LINKUP_API: "Linkup search API results" (personalized search results)
diff --git a/surfsense_backend/app/agents/researcher/utils.py b/surfsense_backend/app/agents/researcher/utils.py
index c4991cc..647e000 100644
--- a/surfsense_backend/app/agents/researcher/utils.py
+++ b/surfsense_backend/app/agents/researcher/utils.py
@@ -33,6 +33,8 @@ def get_connector_emoji(connector_name: str) -> str:
"NOTION_CONNECTOR": "📘",
"GITHUB_CONNECTOR": "🐙",
"LINEAR_CONNECTOR": "📊",
+ "JIRA_CONNECTOR": "🎫",
+ "DISCORD_CONNECTOR": "🗨️",
"TAVILY_API": "🔍",
"LINKUP_API": "🔗"
}
@@ -50,6 +52,8 @@ def get_connector_friendly_name(connector_name: str) -> str:
"NOTION_CONNECTOR": "Notion",
"GITHUB_CONNECTOR": "GitHub",
"LINEAR_CONNECTOR": "Linear",
+ "JIRA_CONNECTOR": "Jira",
+ "DISCORD_CONNECTOR": "Discord",
"TAVILY_API": "Tavily Search",
"LINKUP_API": "Linkup Search"
}
diff --git a/surfsense_backend/app/connectors/test_jira_connector.py b/surfsense_backend/app/connectors/test_jira_connector.py
new file mode 100644
index 0000000..c9b7551
--- /dev/null
+++ b/surfsense_backend/app/connectors/test_jira_connector.py
@@ -0,0 +1,218 @@
+import unittest
+from unittest.mock import patch, Mock
+from datetime import datetime
+
+# Import the JiraConnector
+from .jira_connector import JiraConnector
+
+
+class TestJiraConnector(unittest.TestCase):
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.base_url = "https://test.atlassian.net"
+ self.token = "test_token"
+ self.connector = JiraConnector(base_url=self.base_url, personal_access_token=self.token)
+
+ def test_init(self):
+ """Test JiraConnector initialization."""
+ self.assertEqual(self.connector.base_url, self.base_url)
+ self.assertEqual(self.connector.personal_access_token, self.token)
+ self.assertEqual(self.connector.api_version, "3")
+
+ def test_init_with_trailing_slash(self):
+ """Test JiraConnector initialization with trailing slash in URL."""
+ connector = JiraConnector(base_url="https://test.atlassian.net/", personal_access_token=self.token)
+ self.assertEqual(connector.base_url, "https://test.atlassian.net")
+
+ def test_set_credentials(self):
+ """Test setting credentials."""
+ new_url = "https://newtest.atlassian.net/"
+ new_token = "new_token"
+
+ self.connector.set_credentials(new_url, new_token)
+
+ self.assertEqual(self.connector.base_url, "https://newtest.atlassian.net")
+ self.assertEqual(self.connector.personal_access_token, new_token)
+
+ def test_get_headers(self):
+ """Test header generation."""
+ headers = self.connector.get_headers()
+
+ self.assertIn('Content-Type', headers)
+ self.assertIn('Authorization', headers)
+ self.assertIn('Accept', headers)
+ self.assertEqual(headers['Content-Type'], 'application/json')
+ self.assertEqual(headers['Accept'], 'application/json')
+ self.assertTrue(headers['Authorization'].startswith('Bearer '))
+
+ def test_get_headers_no_credentials(self):
+ """Test header generation without credentials."""
+ connector = JiraConnector()
+
+ with self.assertRaises(ValueError) as context:
+ connector.get_headers()
+
+ self.assertIn("Jira credentials not initialized", str(context.exception))
+
+ @patch('requests.get')
+ def test_make_api_request_success(self, mock_get):
+ """Test successful API request."""
+ mock_response = Mock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"test": "data"}
+ mock_get.return_value = mock_response
+
+ result = self.connector.make_api_request("test/endpoint")
+
+ self.assertEqual(result, {"test": "data"})
+ mock_get.assert_called_once()
+
+ @patch('requests.get')
+ def test_make_api_request_failure(self, mock_get):
+ """Test failed API request."""
+ mock_response = Mock()
+ mock_response.status_code = 401
+ mock_response.text = "Unauthorized"
+ mock_get.return_value = mock_response
+
+ with self.assertRaises(Exception) as context:
+ self.connector.make_api_request("test/endpoint")
+
+ self.assertIn("API request failed with status code 401", str(context.exception))
+
+ @patch.object(JiraConnector, 'make_api_request')
+ def test_get_all_projects(self, mock_api_request):
+ """Test getting all projects."""
+ mock_api_request.return_value = {
+ "values": [
+ {"id": "1", "key": "TEST", "name": "Test Project"},
+ {"id": "2", "key": "DEMO", "name": "Demo Project"}
+ ]
+ }
+
+ projects = self.connector.get_all_projects()
+
+ self.assertEqual(len(projects), 2)
+ self.assertEqual(projects[0]["key"], "TEST")
+ self.assertEqual(projects[1]["key"], "DEMO")
+ mock_api_request.assert_called_once_with("project")
+
+ @patch.object(JiraConnector, 'make_api_request')
+ def test_get_all_issues(self, mock_api_request):
+ """Test getting all issues."""
+ mock_api_request.return_value = {
+ "issues": [
+ {
+ "id": "1",
+ "key": "TEST-1",
+ "fields": {
+ "summary": "Test Issue",
+ "description": "Test Description",
+ "status": {"name": "Open"},
+ "priority": {"name": "High"},
+ "issuetype": {"name": "Bug"},
+ "project": {"key": "TEST"},
+ "created": "2023-01-01T10:00:00.000+0000",
+ "updated": "2023-01-01T12:00:00.000+0000"
+ }
+ }
+ ],
+ "total": 1
+ }
+
+ issues = self.connector.get_all_issues()
+
+ self.assertEqual(len(issues), 1)
+ self.assertEqual(issues[0]["key"], "TEST-1")
+ self.assertEqual(issues[0]["fields"]["summary"], "Test Issue")
+
+ def test_format_issue(self):
+ """Test issue formatting."""
+ raw_issue = {
+ "id": "1",
+ "key": "TEST-1",
+ "fields": {
+ "summary": "Test Issue",
+ "description": "Test Description",
+ "status": {"name": "Open", "statusCategory": {"name": "To Do"}},
+ "priority": {"name": "High"},
+ "issuetype": {"name": "Bug"},
+ "project": {"key": "TEST"},
+ "created": "2023-01-01T10:00:00.000+0000",
+ "updated": "2023-01-01T12:00:00.000+0000",
+ "reporter": {
+ "accountId": "123",
+ "displayName": "John Doe",
+ "emailAddress": "john@example.com"
+ },
+ "assignee": {
+ "accountId": "456",
+ "displayName": "Jane Smith",
+ "emailAddress": "jane@example.com"
+ }
+ }
+ }
+
+ formatted = self.connector.format_issue(raw_issue)
+
+ self.assertEqual(formatted["id"], "1")
+ self.assertEqual(formatted["key"], "TEST-1")
+ self.assertEqual(formatted["title"], "Test Issue")
+ self.assertEqual(formatted["status"], "Open")
+ self.assertEqual(formatted["priority"], "High")
+ self.assertEqual(formatted["issue_type"], "Bug")
+ self.assertEqual(formatted["project"], "TEST")
+ self.assertEqual(formatted["reporter"]["display_name"], "John Doe")
+ self.assertEqual(formatted["assignee"]["display_name"], "Jane Smith")
+
+ def test_format_date(self):
+ """Test date formatting."""
+ iso_date = "2023-01-01T10:30:00.000+0000"
+ formatted_date = JiraConnector.format_date(iso_date)
+
+ self.assertEqual(formatted_date, "2023-01-01 10:30:00")
+
+ def test_format_date_invalid(self):
+ """Test date formatting with invalid input."""
+ formatted_date = JiraConnector.format_date("invalid-date")
+ self.assertEqual(formatted_date, "invalid-date")
+
+ formatted_date = JiraConnector.format_date("")
+ self.assertEqual(formatted_date, "Unknown date")
+
+ formatted_date = JiraConnector.format_date(None)
+ self.assertEqual(formatted_date, "Unknown date")
+
+ def test_format_issue_to_markdown(self):
+ """Test issue to markdown conversion."""
+ formatted_issue = {
+ "key": "TEST-1",
+ "title": "Test Issue",
+ "status": "Open",
+ "priority": "High",
+ "issue_type": "Bug",
+ "project": "TEST",
+ "assignee": {"display_name": "Jane Smith"},
+ "reporter": {"display_name": "John Doe"},
+ "created_at": "2023-01-01T10:00:00.000+0000",
+ "updated_at": "2023-01-01T12:00:00.000+0000",
+ "description": "Test Description",
+ "comments": []
+ }
+
+ markdown = self.connector.format_issue_to_markdown(formatted_issue)
+
+ self.assertIn("# TEST-1: Test Issue", markdown)
+ self.assertIn("**Status:** Open", markdown)
+ self.assertIn("**Priority:** High", markdown)
+ self.assertIn("**Type:** Bug", markdown)
+ self.assertIn("**Project:** TEST", markdown)
+ self.assertIn("**Assignee:** Jane Smith", markdown)
+ self.assertIn("**Reporter:** John Doe", markdown)
+ self.assertIn("## Description", markdown)
+ self.assertIn("Test Description", markdown)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py
index 54f97d6..33366ff 100644
--- a/surfsense_backend/app/routes/search_source_connectors_routes.py
+++ b/surfsense_backend/app/routes/search_source_connectors_routes.py
@@ -19,7 +19,7 @@ from app.schemas import SearchSourceConnectorCreate, SearchSourceConnectorUpdate
from app.users import current_active_user
from app.utils.check_ownership import check_ownership
from pydantic import BaseModel, Field, ValidationError
-from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues, index_discord_messages
+from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues, index_discord_messages, index_jira_issues
from app.connectors.github_connector import GitHubConnector
from datetime import datetime, timedelta
import logging
@@ -284,6 +284,7 @@ async def index_connector_content(
- NOTION_CONNECTOR: Indexes pages from all accessible Notion pages
- GITHUB_CONNECTOR: Indexes code and documentation from GitHub repositories
- LINEAR_CONNECTOR: Indexes issues and comments from Linear
+ - JIRA_CONNECTOR: Indexes issues and comments from Jira
- DISCORD_CONNECTOR: Indexes messages from all accessible Discord channels
Args:
@@ -349,6 +350,12 @@ async def index_connector_content(
background_tasks.add_task(run_linear_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to)
response_message = "Linear indexing started in the background."
+ elif connector.connector_type == SearchSourceConnectorType.JIRA_CONNECTOR:
+ # Run indexing in background
+ logger.info(f"Triggering Jira indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}")
+ background_tasks.add_task(run_jira_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to)
+ response_message = "Jira indexing started in the background."
+
elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
# Run indexing in background
logger.info(
@@ -647,4 +654,45 @@ async def run_discord_indexing(
else:
logger.error(f"Discord indexing failed or no documents processed: {error_or_warning}")
except Exception as e:
- logger.error(f"Error in background Discord indexing task: {str(e)}")
\ No newline at end of file
+ logger.error(f"Error in background Discord indexing task: {str(e)}")
+
+
+# Add new helper functions for Jira indexing
+async def run_jira_indexing_with_new_session(
+ connector_id: int,
+ search_space_id: int,
+ user_id: str,
+ start_date: str,
+ end_date: str
+):
+ """Wrapper to run Jira indexing with its own database session."""
+ logger.info(f"Background task started: Indexing Jira connector {connector_id} into space {search_space_id} from {start_date} to {end_date}")
+ async with async_session_maker() as session:
+ await run_jira_indexing(session, connector_id, search_space_id, user_id, start_date, end_date)
+ logger.info(f"Background task finished: Indexing Jira connector {connector_id}")
+
+async def run_jira_indexing(
+ session: AsyncSession,
+ connector_id: int,
+ search_space_id: int,
+ user_id: str,
+ start_date: str,
+ end_date: str
+):
+ """Runs the Jira indexing task and updates the timestamp."""
+ try:
+ indexed_count, error_message = await index_jira_issues(
+ session, connector_id, search_space_id, user_id, start_date, end_date, update_last_indexed=False
+ )
+ if error_message:
+ logger.error(f"Jira indexing failed for connector {connector_id}: {error_message}")
+ # Optionally update status in DB to indicate failure
+ else:
+ logger.info(f"Jira indexing successful for connector {connector_id}. Indexed {indexed_count} documents.")
+ # Update the last indexed timestamp only on success
+ await update_connector_last_indexed(session, connector_id)
+ await session.commit() # Commit timestamp update
+ except Exception as e:
+ await session.rollback()
+ logger.error(f"Critical error in run_jira_indexing for connector {connector_id}: {e}", exc_info=True)
+ # Optionally update status in DB to indicate failure
\ No newline at end of file
diff --git a/surfsense_backend/app/schemas/search_source_connector.py b/surfsense_backend/app/schemas/search_source_connector.py
index 1225d54..17f1867 100644
--- a/surfsense_backend/app/schemas/search_source_connector.py
+++ b/surfsense_backend/app/schemas/search_source_connector.py
@@ -101,6 +101,19 @@ class SearchSourceConnectorBase(BaseModel):
# Ensure the bot token is not empty
if not config.get("DISCORD_BOT_TOKEN"):
raise ValueError("DISCORD_BOT_TOKEN cannot be empty")
+ elif connector_type == SearchSourceConnectorType.JIRA_CONNECTOR:
+ # For JIRA_CONNECTOR, allow JIRA_PERSONAL_ACCESS_TOKEN and JIRA_BASE_URL
+ allowed_keys = ["JIRA_PERSONAL_ACCESS_TOKEN", "JIRA_BASE_URL"]
+ if set(config.keys()) != set(allowed_keys):
+ raise ValueError(f"For JIRA_CONNECTOR connector type, config must only contain these keys: {allowed_keys}")
+
+ # Ensure the token is not empty
+ if not config.get("JIRA_PERSONAL_ACCESS_TOKEN"):
+ raise ValueError("JIRA_PERSONAL_ACCESS_TOKEN cannot be empty")
+
+ # Ensure the base URL is not empty
+ if not config.get("JIRA_BASE_URL"):
+ raise ValueError("JIRA_BASE_URL cannot be empty")
return config
diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py
index 8c6f99c..4529366 100644
--- a/surfsense_backend/app/services/connector_service.py
+++ b/surfsense_backend/app/services/connector_service.py
@@ -1,15 +1,21 @@
-from typing import List, Dict, Optional
import asyncio
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.future import select
-from app.retriver.chunks_hybrid_search import ChucksHybridSearchRetriever
-from app.retriver.documents_hybrid_search import DocumentHybridSearchRetriever
-from app.db import SearchSourceConnector, SearchSourceConnectorType, Chunk, Document, SearchSpace
-from tavily import TavilyClient
-from linkup import LinkupClient
-from sqlalchemy import func
+from typing import Dict, List, Optional
from app.agents.researcher.configuration import SearchMode
+from app.db import (
+ Chunk,
+ Document,
+ SearchSourceConnector,
+ SearchSourceConnectorType,
+ SearchSpace,
+)
+from app.retriver.chunks_hybrid_search import ChucksHybridSearchRetriever
+from app.retriver.documents_hybrid_search import DocumentHybridSearchRetriever
+from linkup import LinkupClient
+from sqlalchemy import func
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.future import select
+from tavily import TavilyClient
class ConnectorService:
@@ -18,9 +24,13 @@ class ConnectorService:
self.chunk_retriever = ChucksHybridSearchRetriever(session)
self.document_retriever = DocumentHybridSearchRetriever(session)
self.user_id = user_id
- self.source_id_counter = 100000 # High starting value to avoid collisions with existing IDs
- self.counter_lock = asyncio.Lock() # Lock to protect counter in multithreaded environments
-
+ self.source_id_counter = (
+ 100000 # High starting value to avoid collisions with existing IDs
+ )
+ self.counter_lock = (
+ asyncio.Lock()
+ ) # Lock to protect counter in multithreaded environments
+
async def initialize_counter(self):
"""
Initialize the source_id_counter based on the total number of chunks for the user.
@@ -38,16 +48,25 @@ class ConnectorService:
)
chunk_count = result.scalar() or 0
self.source_id_counter = chunk_count + 1
- print(f"Initialized source_id_counter to {self.source_id_counter} for user {self.user_id}")
+ print(
+ f"Initialized source_id_counter to {self.source_id_counter} for user {self.user_id}"
+ )
except Exception as e:
print(f"Error initializing source_id_counter: {str(e)}")
# Fallback to default value
self.source_id_counter = 1
-
- async def search_crawled_urls(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+
+ async def search_crawled_urls(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for crawled URLs and return both the source information and langchain documents
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -57,7 +76,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="CRAWLED_URL"
+ document_type="CRAWLED_URL",
)
elif search_mode == SearchMode.DOCUMENTS:
crawled_urls_chunks = await self.document_retriever.hybrid_search(
@@ -65,7 +84,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="CRAWLED_URL"
+ document_type="CRAWLED_URL",
)
# Transform document retriever results to match expected format
crawled_urls_chunks = self._transform_document_results(crawled_urls_chunks)
@@ -84,20 +103,23 @@ class ConnectorService:
async with self.counter_lock:
for _i, chunk in enumerate(crawled_urls_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Create a source entry
source = {
- "id": document.get('id', self.source_id_counter),
- "title": document.get('title', 'Untitled Document'),
- "description": metadata.get('og:description', metadata.get('ogDescription', chunk.get('content', '')[:100])),
- "url": metadata.get('url', '')
+ "id": document.get("id", self.source_id_counter),
+ "title": document.get("title", "Untitled Document"),
+ "description": metadata.get(
+ "og:description",
+ metadata.get("ogDescription", chunk.get("content", "")[:100]),
+ ),
+ "url": metadata.get("url", ""),
}
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 1,
@@ -105,13 +127,20 @@ class ConnectorService:
"type": "CRAWLED_URL",
"sources": sources_list,
}
-
+
return result_object, crawled_urls_chunks
-
- async def search_files(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+
+ async def search_files(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for files and return both the source information and langchain documents
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -121,7 +150,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="FILE"
+ document_type="FILE",
)
elif search_mode == SearchMode.DOCUMENTS:
files_chunks = await self.document_retriever.hybrid_search(
@@ -129,11 +158,11 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="FILE"
+ document_type="FILE",
)
# Transform document retriever results to match expected format
files_chunks = self._transform_document_results(files_chunks)
-
+
# Early return if no results
if not files_chunks:
return {
@@ -148,20 +177,23 @@ class ConnectorService:
async with self.counter_lock:
for _i, chunk in enumerate(files_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Create a source entry
source = {
- "id": document.get('id', self.source_id_counter),
- "title": document.get('title', 'Untitled Document'),
- "description": metadata.get('og:description', metadata.get('ogDescription', chunk.get('content', '')[:100])),
- "url": metadata.get('url', '')
+ "id": document.get("id", self.source_id_counter),
+ "title": document.get("title", "Untitled Document"),
+ "description": metadata.get(
+ "og:description",
+ metadata.get("ogDescription", chunk.get("content", "")[:100]),
+ ),
+ "url": metadata.get("url", ""),
}
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 2,
@@ -169,69 +201,76 @@ class ConnectorService:
"type": "FILE",
"sources": sources_list,
}
-
+
return result_object, files_chunks
-
+
def _transform_document_results(self, document_results: List[Dict]) -> List[Dict]:
"""
Transform results from document_retriever.hybrid_search() to match the format
expected by the processing code.
-
+
Args:
document_results: Results from document_retriever.hybrid_search()
-
+
Returns:
List of transformed results in the format expected by the processing code
"""
transformed_results = []
for doc in document_results:
- transformed_results.append({
- 'document': {
- 'id': doc.get('document_id'),
- 'title': doc.get('title', 'Untitled Document'),
- 'document_type': doc.get('document_type'),
- 'metadata': doc.get('metadata', {}),
- },
- 'content': doc.get('chunks_content', doc.get('content', '')),
- 'score': doc.get('score', 0.0)
- })
+ transformed_results.append(
+ {
+ "document": {
+ "id": doc.get("document_id"),
+ "title": doc.get("title", "Untitled Document"),
+ "document_type": doc.get("document_type"),
+ "metadata": doc.get("metadata", {}),
+ },
+ "content": doc.get("chunks_content", doc.get("content", "")),
+ "score": doc.get("score", 0.0),
+ }
+ )
return transformed_results
-
- async def get_connector_by_type(self, user_id: str, connector_type: SearchSourceConnectorType) -> Optional[SearchSourceConnector]:
+
+ async def get_connector_by_type(
+ self, user_id: str, connector_type: SearchSourceConnectorType
+ ) -> Optional[SearchSourceConnector]:
"""
Get a connector by type for a specific user
-
+
Args:
user_id: The user's ID
connector_type: The connector type to retrieve
-
+
Returns:
Optional[SearchSourceConnector]: The connector if found, None otherwise
"""
result = await self.session.execute(
- select(SearchSourceConnector)
- .filter(
+ select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == user_id,
- SearchSourceConnector.connector_type == connector_type
+ SearchSourceConnector.connector_type == connector_type,
)
)
return result.scalars().first()
-
- async def search_tavily(self, user_query: str, user_id: str, top_k: int = 20) -> tuple:
+
+ async def search_tavily(
+ self, user_query: str, user_id: str, top_k: int = 20
+ ) -> tuple:
"""
Search using Tavily API and return both the source information and documents
-
+
Args:
user_query: The user's query
user_id: The user's ID
top_k: Maximum number of results to return
-
+
Returns:
tuple: (sources_info, documents)
"""
# Get Tavily connector configuration
- tavily_connector = await self.get_connector_by_type(user_id, SearchSourceConnectorType.TAVILY_API)
-
+ tavily_connector = await self.get_connector_by_type(
+ user_id, SearchSourceConnectorType.TAVILY_API
+ )
+
if not tavily_connector:
# Return empty results if no Tavily connector is configured
return {
@@ -240,22 +279,22 @@ class ConnectorService:
"type": "TAVILY_API",
"sources": [],
}, []
-
+
# Initialize Tavily client with API key from connector config
tavily_api_key = tavily_connector.config.get("TAVILY_API_KEY")
tavily_client = TavilyClient(api_key=tavily_api_key)
-
+
# Perform search with Tavily
try:
response = tavily_client.search(
query=user_query,
max_results=top_k,
- search_depth="advanced" # Use advanced search for better results
+ search_depth="advanced", # Use advanced search for better results
)
-
+
# Extract results from Tavily response
tavily_results = response.get("results", [])
-
+
# Early return if no results
if not tavily_results:
return {
@@ -264,23 +303,22 @@ class ConnectorService:
"type": "TAVILY_API",
"sources": [],
}, []
-
+
# Process each result and create sources directly without deduplication
sources_list = []
documents = []
-
+
async with self.counter_lock:
for i, result in enumerate(tavily_results):
-
# Create a source entry
source = {
"id": self.source_id_counter,
"title": result.get("title", "Tavily Result"),
"description": result.get("content", "")[:100],
- "url": result.get("url", "")
+ "url": result.get("url", ""),
}
sources_list.append(source)
-
+
# Create a document entry
document = {
"chunk_id": f"tavily_chunk_{i}",
@@ -293,9 +331,9 @@ class ConnectorService:
"metadata": {
"url": result.get("url", ""),
"published_date": result.get("published_date", ""),
- "source": "TAVILY_API"
- }
- }
+ "source": "TAVILY_API",
+ },
+ },
}
documents.append(document)
self.source_id_counter += 1
@@ -307,9 +345,9 @@ class ConnectorService:
"type": "TAVILY_API",
"sources": sources_list,
}
-
+
return result_object, documents
-
+
except Exception as e:
# Log the error and return empty results
print(f"Error searching with Tavily: {str(e)}")
@@ -319,11 +357,18 @@ class ConnectorService:
"type": "TAVILY_API",
"sources": [],
}, []
-
- async def search_slack(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+
+ async def search_slack(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for slack and return both the source information and langchain documents
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -333,7 +378,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="SLACK_CONNECTOR"
+ document_type="SLACK_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
slack_chunks = await self.document_retriever.hybrid_search(
@@ -341,11 +386,11 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="SLACK_CONNECTOR"
+ document_type="SLACK_CONNECTOR",
)
# Transform document retriever results to match expected format
slack_chunks = self._transform_document_results(slack_chunks)
-
+
# Early return if no results
if not slack_chunks:
return {
@@ -360,31 +405,31 @@ class ConnectorService:
async with self.counter_lock:
for _i, chunk in enumerate(slack_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Create a mapped source entry with Slack-specific metadata
- channel_name = metadata.get('channel_name', 'Unknown Channel')
- channel_id = metadata.get('channel_id', '')
- message_date = metadata.get('start_date', '')
-
+ channel_name = metadata.get("channel_name", "Unknown Channel")
+ channel_id = metadata.get("channel_id", "")
+ message_date = metadata.get("start_date", "")
+
# Create a more descriptive title for Slack messages
title = f"Slack: {channel_name}"
if message_date:
title += f" ({message_date})"
-
+
# Create a more descriptive description for Slack messages
- description = chunk.get('content', '')[:100]
+ description = chunk.get("content", "")[:100]
if len(description) == 100:
description += "..."
-
+
# For URL, we can use a placeholder or construct a URL to the Slack channel if available
url = ""
if channel_id:
url = f"https://slack.com/app_redirect?channel={channel_id}"
source = {
- "id": document.get('id', self.source_id_counter),
+ "id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
"url": url,
@@ -392,7 +437,7 @@ class ConnectorService:
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 4,
@@ -400,19 +445,26 @@ class ConnectorService:
"type": "SLACK_CONNECTOR",
"sources": sources_list,
}
-
+
return result_object, slack_chunks
-
- async def search_notion(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+
+ async def search_notion(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for Notion pages and return both the source information and langchain documents
-
+
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -422,7 +474,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="NOTION_CONNECTOR"
+ document_type="NOTION_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
notion_chunks = await self.document_retriever.hybrid_search(
@@ -430,11 +482,11 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="NOTION_CONNECTOR"
+ document_type="NOTION_CONNECTOR",
)
# Transform document retriever results to match expected format
notion_chunks = self._transform_document_results(notion_chunks)
-
+
# Early return if no results
if not notion_chunks:
return {
@@ -449,24 +501,24 @@ class ConnectorService:
async with self.counter_lock:
for _i, chunk in enumerate(notion_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Create a mapped source entry with Notion-specific metadata
- page_title = metadata.get('page_title', 'Untitled Page')
- page_id = metadata.get('page_id', '')
- indexed_at = metadata.get('indexed_at', '')
-
+ page_title = metadata.get("page_title", "Untitled Page")
+ page_id = metadata.get("page_id", "")
+ indexed_at = metadata.get("indexed_at", "")
+
# Create a more descriptive title for Notion pages
title = f"Notion: {page_title}"
if indexed_at:
title += f" (indexed: {indexed_at})"
-
+
# Create a more descriptive description for Notion pages
- description = chunk.get('content', '')[:100]
+ description = chunk.get("content", "")[:100]
if len(description) == 100:
description += "..."
-
+
# For URL, we can use a placeholder or construct a URL to the Notion page if available
url = ""
if page_id:
@@ -474,7 +526,7 @@ class ConnectorService:
url = f"https://notion.so/{page_id.replace('-', '')}"
source = {
- "id": document.get('id', self.source_id_counter),
+ "id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
"url": url,
@@ -482,7 +534,7 @@ class ConnectorService:
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 5,
@@ -490,19 +542,26 @@ class ConnectorService:
"type": "NOTION_CONNECTOR",
"sources": sources_list,
}
-
+
return result_object, notion_chunks
-
- async def search_extension(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+
+ async def search_extension(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for extension data and return both the source information and langchain documents
-
+
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -512,7 +571,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="EXTENSION"
+ document_type="EXTENSION",
)
elif search_mode == SearchMode.DOCUMENTS:
extension_chunks = await self.document_retriever.hybrid_search(
@@ -520,7 +579,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="EXTENSION"
+ document_type="EXTENSION",
)
# Transform document retriever results to match expected format
extension_chunks = self._transform_document_results(extension_chunks)
@@ -539,33 +598,39 @@ class ConnectorService:
async with self.counter_lock:
for i, chunk in enumerate(extension_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Extract extension-specific metadata
- webpage_title = metadata.get('VisitedWebPageTitle', 'Untitled Page')
- webpage_url = metadata.get('VisitedWebPageURL', '')
- visit_date = metadata.get('VisitedWebPageDateWithTimeInISOString', '')
- visit_duration = metadata.get('VisitedWebPageVisitDurationInMilliseconds', '')
- browsing_session_id = metadata.get('BrowsingSessionId', '')
-
+ webpage_title = metadata.get("VisitedWebPageTitle", "Untitled Page")
+ webpage_url = metadata.get("VisitedWebPageURL", "")
+ visit_date = metadata.get("VisitedWebPageDateWithTimeInISOString", "")
+ visit_duration = metadata.get(
+ "VisitedWebPageVisitDurationInMilliseconds", ""
+ )
+ browsing_session_id = metadata.get("BrowsingSessionId", "")
+
# Create a more descriptive title for extension data
title = webpage_title
if visit_date:
# Format the date for display (simplified)
try:
# Just extract the date part for display
- formatted_date = visit_date.split('T')[0] if 'T' in visit_date else visit_date
+ formatted_date = (
+ visit_date.split("T")[0]
+ if "T" in visit_date
+ else visit_date
+ )
title += f" (visited: {formatted_date})"
except:
# Fallback if date parsing fails
title += f" (visited: {visit_date})"
-
+
# Create a more descriptive description for extension data
- description = chunk.get('content', '')[:100]
+ description = chunk.get("content", "")[:100]
if len(description) == 100:
description += "..."
-
+
# Add visit duration if available
if visit_duration:
try:
@@ -573,8 +638,8 @@ class ConnectorService:
if duration_seconds < 60:
duration_text = f"{duration_seconds:.1f} seconds"
else:
- duration_text = f"{duration_seconds/60:.1f} minutes"
-
+ duration_text = f"{duration_seconds / 60:.1f} minutes"
+
if description:
description += f" | Duration: {duration_text}"
except:
@@ -582,15 +647,15 @@ class ConnectorService:
pass
source = {
- "id": document.get('id', self.source_id_counter),
+ "id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
- "url": webpage_url
+ "url": webpage_url,
}
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 6,
@@ -598,19 +663,26 @@ class ConnectorService:
"type": "EXTENSION",
"sources": sources_list,
}
-
+
return result_object, extension_chunks
-
- async def search_youtube(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+
+ async def search_youtube(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for YouTube videos and return both the source information and langchain documents
-
+
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -620,7 +692,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="YOUTUBE_VIDEO"
+ document_type="YOUTUBE_VIDEO",
)
elif search_mode == SearchMode.DOCUMENTS:
youtube_chunks = await self.document_retriever.hybrid_search(
@@ -628,11 +700,11 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="YOUTUBE_VIDEO"
+ document_type="YOUTUBE_VIDEO",
)
# Transform document retriever results to match expected format
youtube_chunks = self._transform_document_results(youtube_chunks)
-
+
# Early return if no results
if not youtube_chunks:
return {
@@ -647,40 +719,42 @@ class ConnectorService:
async with self.counter_lock:
for _i, chunk in enumerate(youtube_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Extract YouTube-specific metadata
- video_title = metadata.get('video_title', 'Untitled Video')
- video_id = metadata.get('video_id', '')
- channel_name = metadata.get('channel_name', '')
+ video_title = metadata.get("video_title", "Untitled Video")
+ video_id = metadata.get("video_id", "")
+ channel_name = metadata.get("channel_name", "")
# published_date = metadata.get('published_date', '')
-
+
# Create a more descriptive title for YouTube videos
title = video_title
if channel_name:
title += f" - {channel_name}"
-
+
# Create a more descriptive description for YouTube videos
- description = metadata.get('description', chunk.get('content', '')[:100])
+ description = metadata.get(
+ "description", chunk.get("content", "")[:100]
+ )
if len(description) == 100:
description += "..."
-
+
# For URL, construct a URL to the YouTube video
url = f"https://www.youtube.com/watch?v={video_id}" if video_id else ""
source = {
- "id": document.get('id', self.source_id_counter),
+ "id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
"url": url,
"video_id": video_id, # Additional field for YouTube videos
- "channel_name": channel_name # Additional field for YouTube videos
+ "channel_name": channel_name, # Additional field for YouTube videos
}
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 7, # Assign a unique ID for the YouTube connector
@@ -688,13 +762,20 @@ class ConnectorService:
"type": "YOUTUBE_VIDEO",
"sources": sources_list,
}
-
+
return result_object, youtube_chunks
- async def search_github(self, user_query: str, user_id: int, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+ async def search_github(
+ self,
+ user_query: str,
+ user_id: int,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for GitHub documents and return both the source information and langchain documents
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -704,7 +785,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="GITHUB_CONNECTOR"
+ document_type="GITHUB_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
github_chunks = await self.document_retriever.hybrid_search(
@@ -712,11 +793,11 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="GITHUB_CONNECTOR"
+ document_type="GITHUB_CONNECTOR",
)
# Transform document retriever results to match expected format
github_chunks = self._transform_document_results(github_chunks)
-
+
# Early return if no results
if not github_chunks:
return {
@@ -731,20 +812,24 @@ class ConnectorService:
async with self.counter_lock:
for _i, chunk in enumerate(github_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Create a source entry
source = {
- "id": document.get('id', self.source_id_counter),
- "title": document.get('title', 'GitHub Document'), # Use specific title if available
- "description": metadata.get('description', chunk.get('content', '')[:100]), # Use description or content preview
- "url": metadata.get('url', '') # Use URL if available in metadata
+ "id": document.get("id", self.source_id_counter),
+ "title": document.get(
+ "title", "GitHub Document"
+ ), # Use specific title if available
+ "description": metadata.get(
+ "description", chunk.get("content", "")[:100]
+ ), # Use description or content preview
+ "url": metadata.get("url", ""), # Use URL if available in metadata
}
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 8,
@@ -752,19 +837,26 @@ class ConnectorService:
"type": "GITHUB_CONNECTOR",
"sources": sources_list,
}
-
+
return result_object, github_chunks
- async def search_linear(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+ async def search_linear(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for Linear issues and comments and return both the source information and langchain documents
-
+
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -774,7 +866,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="LINEAR_CONNECTOR"
+ document_type="LINEAR_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
linear_chunks = await self.document_retriever.hybrid_search(
@@ -782,7 +874,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="LINEAR_CONNECTOR"
+ document_type="LINEAR_CONNECTOR",
)
# Transform document retriever results to match expected format
linear_chunks = self._transform_document_results(linear_chunks)
@@ -801,32 +893,32 @@ class ConnectorService:
async with self.counter_lock:
for _i, chunk in enumerate(linear_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Extract Linear-specific metadata
- issue_identifier = metadata.get('issue_identifier', '')
- issue_title = metadata.get('issue_title', 'Untitled Issue')
- issue_state = metadata.get('state', '')
- comment_count = metadata.get('comment_count', 0)
-
+ issue_identifier = metadata.get("issue_identifier", "")
+ issue_title = metadata.get("issue_title", "Untitled Issue")
+ issue_state = metadata.get("state", "")
+ comment_count = metadata.get("comment_count", 0)
+
# Create a more descriptive title for Linear issues
title = f"Linear: {issue_identifier} - {issue_title}"
if issue_state:
title += f" ({issue_state})"
-
+
# Create a more descriptive description for Linear issues
- description = chunk.get('content', '')[:100]
+ description = chunk.get("content", "")[:100]
if len(description) == 100:
description += "..."
-
+
# Add comment count info to description
if comment_count:
if description:
description += f" | Comments: {comment_count}"
else:
description = f"Comments: {comment_count}"
-
+
# For URL, we could construct a URL to the Linear issue if we have the workspace info
# For now, use a generic placeholder
url = ""
@@ -835,18 +927,18 @@ class ConnectorService:
url = f"https://linear.app/issue/{issue_identifier}"
source = {
- "id": document.get('id', self.source_id_counter),
+ "id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
"url": url,
"issue_identifier": issue_identifier,
"state": issue_state,
- "comment_count": comment_count
+ "comment_count": comment_count,
}
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 9, # Assign a unique ID for the Linear connector
@@ -854,10 +946,17 @@ class ConnectorService:
"type": "LINEAR_CONNECTOR",
"sources": sources_list,
}
-
+
return result_object, linear_chunks
- async def search_jira(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+ async def search_jira(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for Jira issues and comments and return both the source information and langchain documents
@@ -877,7 +976,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="JIRA_CONNECTOR"
+ document_type="JIRA_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
jira_chunks = await self.document_retriever.hybrid_search(
@@ -885,7 +984,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="JIRA_CONNECTOR"
+ document_type="JIRA_CONNECTOR",
)
# Transform document retriever results to match expected format
jira_chunks = self._transform_document_results(jira_chunks)
@@ -904,16 +1003,16 @@ class ConnectorService:
async with self.counter_lock:
for _i, chunk in enumerate(jira_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Extract Jira-specific metadata
- issue_key = metadata.get('issue_key', '')
- issue_title = metadata.get('issue_title', 'Untitled Issue')
- status = metadata.get('status', '')
- priority = metadata.get('priority', '')
- issue_type = metadata.get('issue_type', '')
- comment_count = metadata.get('comment_count', 0)
+ issue_key = metadata.get("issue_key", "")
+ issue_title = metadata.get("issue_title", "Untitled Issue")
+ status = metadata.get("status", "")
+ priority = metadata.get("priority", "")
+ issue_type = metadata.get("issue_type", "")
+ comment_count = metadata.get("comment_count", 0)
# Create a more descriptive title for Jira issues
title = f"Jira: {issue_key} - {issue_title}"
@@ -921,7 +1020,7 @@ class ConnectorService:
title += f" ({status})"
# Create a more descriptive description for Jira issues
- description = chunk.get('content', '')[:100]
+ description = chunk.get("content", "")[:100]
if len(description) == 100:
description += "..."
@@ -938,16 +1037,16 @@ class ConnectorService:
if description:
description += f" | {' | '.join(info_parts)}"
else:
- description = ' | '.join(info_parts)
+ description = " | ".join(info_parts)
# For URL, we could construct a URL to the Jira issue if we have the base URL
# For now, use a generic placeholder
url = ""
- if issue_key and metadata.get('base_url'):
+ if issue_key and metadata.get("base_url"):
url = f"{metadata.get('base_url')}/browse/{issue_key}"
source = {
- "id": document.get('id', self.source_id_counter),
+ "id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
"url": url,
@@ -955,7 +1054,7 @@ class ConnectorService:
"status": status,
"priority": priority,
"issue_type": issue_type,
- "comment_count": comment_count
+ "comment_count": comment_count,
}
self.source_id_counter += 1
@@ -971,21 +1070,25 @@ class ConnectorService:
return result_object, jira_chunks
- async def search_linkup(self, user_query: str, user_id: str, mode: str = "standard") -> tuple:
+ async def search_linkup(
+ self, user_query: str, user_id: str, mode: str = "standard"
+ ) -> tuple:
"""
Search using Linkup API and return both the source information and documents
-
+
Args:
user_query: The user's query
user_id: The user's ID
mode: Search depth mode, can be "standard" or "deep"
-
+
Returns:
tuple: (sources_info, documents)
"""
# Get Linkup connector configuration
- linkup_connector = await self.get_connector_by_type(user_id, SearchSourceConnectorType.LINKUP_API)
-
+ linkup_connector = await self.get_connector_by_type(
+ user_id, SearchSourceConnectorType.LINKUP_API
+ )
+
if not linkup_connector:
# Return empty results if no Linkup connector is configured
return {
@@ -994,11 +1097,11 @@ class ConnectorService:
"type": "LINKUP_API",
"sources": [],
}, []
-
+
# Initialize Linkup client with API key from connector config
linkup_api_key = linkup_connector.config.get("LINKUP_API_KEY")
linkup_client = LinkupClient(api_key=linkup_api_key)
-
+
# Perform search with Linkup
try:
response = linkup_client.search(
@@ -1006,10 +1109,10 @@ class ConnectorService:
depth=mode, # Use the provided mode ("standard" or "deep")
output_type="searchResults", # Default to search results
)
-
+
# Extract results from Linkup response - access as attribute instead of using .get()
- linkup_results = response.results if hasattr(response, 'results') else []
-
+ linkup_results = response.results if hasattr(response, "results") else []
+
# Only proceed if we have results
if not linkup_results:
return {
@@ -1018,41 +1121,49 @@ class ConnectorService:
"type": "LINKUP_API",
"sources": [],
}, []
-
+
# Process each result and create sources directly without deduplication
sources_list = []
documents = []
-
+
async with self.counter_lock:
for i, result in enumerate(linkup_results):
# Only process results that have content
- if not hasattr(result, 'content') or not result.content:
+ if not hasattr(result, "content") or not result.content:
continue
-
+
# Create a source entry
source = {
"id": self.source_id_counter,
- "title": result.name if hasattr(result, 'name') else "Linkup Result",
- "description": result.content[:100] if hasattr(result, 'content') else "",
- "url": result.url if hasattr(result, 'url') else ""
+ "title": (
+ result.name if hasattr(result, "name") else "Linkup Result"
+ ),
+ "description": (
+ result.content[:100] if hasattr(result, "content") else ""
+ ),
+ "url": result.url if hasattr(result, "url") else "",
}
sources_list.append(source)
-
+
# Create a document entry
document = {
"chunk_id": f"linkup_chunk_{i}",
- "content": result.content if hasattr(result, 'content') else "",
+ "content": result.content if hasattr(result, "content") else "",
"score": 1.0, # Default score since not provided by Linkup
"document": {
"id": self.source_id_counter,
- "title": result.name if hasattr(result, 'name') else "Linkup Result",
+ "title": (
+ result.name
+ if hasattr(result, "name")
+ else "Linkup Result"
+ ),
"document_type": "LINKUP_API",
"metadata": {
- "url": result.url if hasattr(result, 'url') else "",
- "type": result.type if hasattr(result, 'type') else "",
- "source": "LINKUP_API"
- }
- }
+ "url": result.url if hasattr(result, "url") else "",
+ "type": result.type if hasattr(result, "type") else "",
+ "source": "LINKUP_API",
+ },
+ },
}
documents.append(document)
self.source_id_counter += 1
@@ -1064,9 +1175,9 @@ class ConnectorService:
"type": "LINKUP_API",
"sources": sources_list,
}
-
+
return result_object, documents
-
+
except Exception as e:
# Log the error and return empty results
print(f"Error searching with Linkup: {str(e)}")
@@ -1076,17 +1187,24 @@ class ConnectorService:
"type": "LINKUP_API",
"sources": [],
}, []
-
- async def search_discord(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple:
+
+ async def search_discord(
+ self,
+ user_query: str,
+ user_id: str,
+ search_space_id: int,
+ top_k: int = 20,
+ search_mode: SearchMode = SearchMode.CHUNKS,
+ ) -> tuple:
"""
Search for Discord messages and return both the source information and langchain documents
-
+
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
-
+
Returns:
tuple: (sources_info, langchain_documents)
"""
@@ -1096,7 +1214,7 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="DISCORD_CONNECTOR"
+ document_type="DISCORD_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
discord_chunks = await self.document_retriever.hybrid_search(
@@ -1104,11 +1222,11 @@ class ConnectorService:
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
- document_type="DISCORD_CONNECTOR"
+ document_type="DISCORD_CONNECTOR",
)
# Transform document retriever results to match expected format
discord_chunks = self._transform_document_results(discord_chunks)
-
+
# Early return if no results
if not discord_chunks:
return {
@@ -1123,26 +1241,26 @@ class ConnectorService:
async with self.counter_lock:
for i, chunk in enumerate(discord_chunks):
# Extract document metadata
- document = chunk.get('document', {})
- metadata = document.get('metadata', {})
+ document = chunk.get("document", {})
+ metadata = document.get("metadata", {})
# Create a mapped source entry with Discord-specific metadata
- channel_name = metadata.get('channel_name', 'Unknown Channel')
- channel_id = metadata.get('channel_id', '')
- message_date = metadata.get('start_date', '')
-
+ channel_name = metadata.get("channel_name", "Unknown Channel")
+ channel_id = metadata.get("channel_id", "")
+ message_date = metadata.get("start_date", "")
+
# Create a more descriptive title for Discord messages
title = f"Discord: {channel_name}"
if message_date:
title += f" ({message_date})"
-
+
# Create a more descriptive description for Discord messages
- description = chunk.get('content', '')[:100]
+ description = chunk.get("content", "")[:100]
if len(description) == 100:
description += "..."
-
+
url = ""
- guild_id = metadata.get('guild_id', '')
+ guild_id = metadata.get("guild_id", "")
if guild_id and channel_id:
url = f"https://discord.com/channels/{guild_id}/{channel_id}"
elif channel_id:
@@ -1150,7 +1268,7 @@ class ConnectorService:
url = f"https://discord.com/channels/@me/{channel_id}"
source = {
- "id": document.get('id', self.source_id_counter),
+ "id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
"url": url,
@@ -1158,7 +1276,7 @@ class ConnectorService:
self.source_id_counter += 1
sources_list.append(source)
-
+
# Create result object
result_object = {
"id": 11,
@@ -1166,7 +1284,5 @@ class ConnectorService:
"type": "DISCORD_CONNECTOR",
"sources": sources_list,
}
-
+
return result_object, discord_chunks
-
-
diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py
index ab3bc85..f4ae139 100644
--- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py
+++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py
@@ -1,28 +1,35 @@
-from typing import Optional, Tuple
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.exc import SQLAlchemyError
-from sqlalchemy.future import select
+import asyncio
+import logging
from datetime import datetime, timedelta, timezone
-from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchSourceConnectorType, SearchSpace
+from typing import Optional, Tuple
+
from app.config import config
+from app.connectors.discord_connector import DiscordConnector
+from app.connectors.github_connector import GitHubConnector
+from app.connectors.jira_connector import JiraConnector
+from app.connectors.linear_connector import LinearConnector
+from app.connectors.notion_history import NotionHistoryConnector
+from app.connectors.slack_history import SlackHistory
+from app.db import (
+ Chunk,
+ Document,
+ DocumentType,
+ SearchSourceConnector,
+ SearchSourceConnectorType,
+)
from app.prompts import SUMMARY_PROMPT_TEMPLATE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
-from app.connectors.slack_history import SlackHistory
-from app.connectors.notion_history import NotionHistoryConnector
-from app.connectors.github_connector import GitHubConnector
-from app.connectors.linear_connector import LinearConnector
-from app.connectors.discord_connector import DiscordConnector
-from app.connectors.jira_connector import JiraConnector
-from slack_sdk.errors import SlackApiError
-import logging
-import asyncio
-
from app.utils.document_converters import generate_content_hash
+from slack_sdk.errors import SlackApiError
+from sqlalchemy.exc import SQLAlchemyError
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.future import select
# Set up logging
logger = logging.getLogger(__name__)
+
async def index_slack_messages(
session: AsyncSession,
connector_id: int,
@@ -30,56 +37,64 @@ async def index_slack_messages(
user_id: str,
start_date: str = None,
end_date: str = None,
- update_last_indexed: bool = True
+ update_last_indexed: bool = True,
) -> Tuple[int, Optional[str]]:
"""
Index Slack messages from all accessible channels.
-
+
Args:
session: Database session
connector_id: ID of the Slack connector
search_space_id: ID of the search space to store documents in
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
-
+
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
-
+
# Log task start
log_entry = await task_logger.log_task_start(
task_name="slack_messages_indexing",
source="connector_indexing_task",
message=f"Starting Slack messages indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ metadata={
+ "connector_id": connector_id,
+ "user_id": str(user_id),
+ "start_date": start_date,
+ "end_date": end_date,
+ },
)
-
+
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Slack connector {connector_id} from database",
- {"stage": "connector_retrieval"}
+ {"stage": "connector_retrieval"},
)
-
+
result = await session.execute(
- select(SearchSourceConnector)
- .filter(
+ select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
- SearchSourceConnector.connector_type == SearchSourceConnectorType.SLACK_CONNECTOR
+ SearchSourceConnector.connector_type
+ == SearchSourceConnectorType.SLACK_CONNECTOR,
)
)
connector = result.scalars().first()
-
+
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a Slack connector",
"Connector not found",
- {"error_type": "ConnectorNotFound"}
+ {"error_type": "ConnectorNotFound"},
)
- return 0, f"Connector with ID {connector_id} not found or is not a Slack connector"
-
+ return (
+ 0,
+ f"Connector with ID {connector_id} not found or is not a Slack connector",
+ )
+
# Get the Slack token from the connector config
slack_token = connector.config.get("SLACK_BOT_TOKEN")
if not slack_token:
@@ -87,62 +102,86 @@ async def index_slack_messages(
log_entry,
f"Slack token not found in connector config for connector {connector_id}",
"Missing Slack token",
- {"error_type": "MissingToken"}
+ {"error_type": "MissingToken"},
)
return 0, "Slack token not found in connector config"
-
+
# Initialize Slack client
await task_logger.log_task_progress(
log_entry,
f"Initializing Slack client for connector {connector_id}",
- {"stage": "client_initialization"}
+ {"stage": "client_initialization"},
)
-
+
slack_client = SlackHistory(token=slack_token)
-
+
# Calculate date range
await task_logger.log_task_progress(
log_entry,
- f"Calculating date range for Slack indexing",
- {"stage": "date_calculation", "provided_start_date": start_date, "provided_end_date": end_date}
+ "Calculating date range for Slack indexing",
+ {
+ "stage": "date_calculation",
+ "provided_start_date": start_date,
+ "provided_end_date": end_date,
+ },
)
-
+
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now()
-
+
# Use last_indexed_at as start date if available, otherwise use 365 days ago
if connector.last_indexed_at:
# Convert dates to be comparable (both timezone-naive)
- last_indexed_naive = connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at
-
+ last_indexed_naive = (
+ connector.last_indexed_at.replace(tzinfo=None)
+ if connector.last_indexed_at.tzinfo
+ else connector.last_indexed_at
+ )
+
# Check if last_indexed_at is in the future or after end_date
if last_indexed_naive > calculated_end_date:
- logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.")
+ logger.warning(
+ f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead."
+ )
calculated_start_date = calculated_end_date - timedelta(days=365)
else:
calculated_start_date = last_indexed_naive
- logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date")
+ logger.info(
+ f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
+ )
else:
- calculated_start_date = calculated_end_date - timedelta(days=365) # Use 365 days as default
- logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date")
-
+ calculated_start_date = calculated_end_date - timedelta(
+ days=365
+ ) # Use 365 days as default
+ logger.info(
+ f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date"
+ )
+
# Use calculated dates if not provided
- start_date_str = start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
- end_date_str = end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
+ start_date_str = (
+ start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
+ )
+ end_date_str = (
+ end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
+ )
else:
# Use provided dates
start_date_str = start_date
end_date_str = end_date
-
+
logger.info(f"Indexing Slack messages from {start_date_str} to {end_date_str}")
-
+
await task_logger.log_task_progress(
log_entry,
f"Fetching Slack channels from {start_date_str} to {end_date_str}",
- {"stage": "fetch_channels", "start_date": start_date_str, "end_date": end_date_str}
+ {
+ "stage": "fetch_channels",
+ "start_date": start_date_str,
+ "end_date": end_date_str,
+ },
)
-
+
# Get all channels
try:
channels = slack_client.get_all_channels()
@@ -151,133 +190,162 @@ async def index_slack_messages(
log_entry,
f"Failed to get Slack channels for connector {connector_id}",
str(e),
- {"error_type": "ChannelFetchError"}
+ {"error_type": "ChannelFetchError"},
)
return 0, f"Failed to get Slack channels: {str(e)}"
-
+
if not channels:
await task_logger.log_task_success(
log_entry,
f"No Slack channels found for connector {connector_id}",
- {"channels_found": 0}
+ {"channels_found": 0},
)
return 0, "No Slack channels found"
-
+
# Track the number of documents indexed
documents_indexed = 0
documents_skipped = 0
skipped_channels = []
-
+
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(channels)} Slack channels",
- {"stage": "process_channels", "total_channels": len(channels)}
+ {"stage": "process_channels", "total_channels": len(channels)},
)
-
+
# Process each channel
- for channel_obj in channels: # Modified loop to iterate over list of channel objects
+ for (
+ channel_obj
+ ) in channels: # Modified loop to iterate over list of channel objects
channel_id = channel_obj["id"]
channel_name = channel_obj["name"]
is_private = channel_obj["is_private"]
- is_member = channel_obj["is_member"] # This might be False for public channels too
+ is_member = channel_obj[
+ "is_member"
+ ] # This might be False for public channels too
try:
# If it's a private channel and the bot is not a member, skip.
# For public channels, if they are listed by conversations.list, the bot can typically read history.
# The `not_in_channel` error in get_conversation_history will be the ultimate gatekeeper if history is inaccessible.
if is_private and not is_member:
- logger.warning(f"Bot is not a member of private channel {channel_name} ({channel_id}). Skipping.")
- skipped_channels.append(f"{channel_name} (private, bot not a member)")
+ logger.warning(
+ f"Bot is not a member of private channel {channel_name} ({channel_id}). Skipping."
+ )
+ skipped_channels.append(
+ f"{channel_name} (private, bot not a member)"
+ )
documents_skipped += 1
continue
-
+
# Get messages for this channel
- # The get_history_by_date_range now uses get_conversation_history,
+ # The get_history_by_date_range now uses get_conversation_history,
# which handles 'not_in_channel' by returning [] and logging.
messages, error = slack_client.get_history_by_date_range(
channel_id=channel_id,
start_date=start_date_str,
end_date=end_date_str,
- limit=1000 # Limit to 1000 messages per channel
+ limit=1000, # Limit to 1000 messages per channel
)
-
+
if error:
- logger.warning(f"Error getting messages from channel {channel_name}: {error}")
+ logger.warning(
+ f"Error getting messages from channel {channel_name}: {error}"
+ )
skipped_channels.append(f"{channel_name} (error: {error})")
documents_skipped += 1
continue # Skip this channel if there's an error
-
+
if not messages:
- logger.info(f"No messages found in channel {channel_name} for the specified date range.")
+ logger.info(
+ f"No messages found in channel {channel_name} for the specified date range."
+ )
documents_skipped += 1
continue # Skip if no messages
-
+
# Format messages with user info
formatted_messages = []
for msg in messages:
# Skip bot messages and system messages
- if msg.get("subtype") in ["bot_message", "channel_join", "channel_leave"]:
+ if msg.get("subtype") in [
+ "bot_message",
+ "channel_join",
+ "channel_leave",
+ ]:
continue
-
- formatted_msg = slack_client.format_message(msg, include_user_info=True)
+
+ formatted_msg = slack_client.format_message(
+ msg, include_user_info=True
+ )
formatted_messages.append(formatted_msg)
-
+
if not formatted_messages:
- logger.info(f"No valid messages found in channel {channel_name} after filtering.")
+ logger.info(
+ f"No valid messages found in channel {channel_name} after filtering."
+ )
documents_skipped += 1
continue # Skip if no valid messages after filtering
-
+
# Convert messages to markdown format
channel_content = f"# Slack Channel: {channel_name}\n\n"
-
+
for msg in formatted_messages:
user_name = msg.get("user_name", "Unknown User")
timestamp = msg.get("datetime", "Unknown Time")
text = msg.get("text", "")
-
- channel_content += f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n"
-
+
+ channel_content += (
+ f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n"
+ )
+
# Format document metadata
metadata_sections = [
- ("METADATA", [
- f"CHANNEL_NAME: {channel_name}",
- f"CHANNEL_ID: {channel_id}",
- # f"START_DATE: {start_date_str}",
- # f"END_DATE: {end_date_str}",
- f"MESSAGE_COUNT: {len(formatted_messages)}"
- ]),
- ("CONTENT", [
- "FORMAT: markdown",
- "TEXT_START",
- channel_content,
- "TEXT_END"
- ])
+ (
+ "METADATA",
+ [
+ f"CHANNEL_NAME: {channel_name}",
+ f"CHANNEL_ID: {channel_id}",
+ # f"START_DATE: {start_date_str}",
+ # f"END_DATE: {end_date_str}",
+ f"MESSAGE_COUNT: {len(formatted_messages)}",
+ ],
+ ),
+ (
+ "CONTENT",
+ ["FORMAT: markdown", "TEXT_START", channel_content, "TEXT_END"],
+ ),
]
-
+
# Build the document string
document_parts = []
document_parts.append("")
-
+
for section_title, section_content in metadata_sections:
document_parts.append(f"<{section_title}>")
document_parts.extend(section_content)
document_parts.append(f"{section_title}>")
-
+
document_parts.append("")
- combined_document_string = '\n'.join(document_parts)
- content_hash = generate_content_hash(combined_document_string, search_space_id)
+ combined_document_string = "\n".join(document_parts)
+ content_hash = generate_content_hash(
+ combined_document_string, search_space_id
+ )
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
- existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
-
+ existing_document_by_hash = (
+ existing_doc_by_hash_result.scalars().first()
+ )
+
if existing_document_by_hash:
- logger.info(f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing.")
+ logger.info(
+ f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing."
+ )
documents_skipped += 1
continue
-
+
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
@@ -285,19 +353,26 @@ async def index_slack_messages(
skipped_channels.append(f"{channel_name} (no LLM configured)")
documents_skipped += 1
continue
-
+
# Generate summary
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
- summary_result = await summary_chain.ainvoke({"document": combined_document_string})
+ summary_result = await summary_chain.ainvoke(
+ {"document": combined_document_string}
+ )
summary_content = summary_result.content
- summary_embedding = config.embedding_model_instance.embed(summary_content)
-
+ summary_embedding = config.embedding_model_instance.embed(
+ summary_content
+ )
+
# Process chunks
chunks = [
- Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
+ Chunk(
+ content=chunk.text,
+ embedding=config.embedding_model_instance.embed(chunk.text),
+ )
for chunk in config.chunker_instance.chunk(channel_content)
]
-
+
# Create and store new document
document = Document(
search_space_id=search_space_id,
@@ -309,20 +384,24 @@ async def index_slack_messages(
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
- "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
-
+
session.add(document)
documents_indexed += 1
- logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages")
-
+ logger.info(
+ f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages"
+ )
+
except SlackApiError as slack_error:
- logger.error(f"Slack API error for channel {channel_name}: {str(slack_error)}")
+ logger.error(
+ f"Slack API error for channel {channel_name}: {str(slack_error)}"
+ )
skipped_channels.append(f"{channel_name} (Slack API error)")
documents_skipped += 1
continue # Skip this channel and continue with others
@@ -331,23 +410,23 @@ async def index_slack_messages(
skipped_channels.append(f"{channel_name} (processing error)")
documents_skipped += 1
continue # Skip this channel and continue with others
-
+
# Update the last_indexed_at timestamp for the connector only if requested
# and if we successfully indexed at least one channel
total_processed = documents_indexed
if update_last_indexed and total_processed > 0:
connector.last_indexed_at = datetime.now()
-
+
# Commit all changes
await session.commit()
-
+
# Prepare result message
result_message = None
if skipped_channels:
result_message = f"Processed {total_processed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}"
else:
result_message = f"Processed {total_processed} channels."
-
+
# Log success
await task_logger.log_task_success(
log_entry,
@@ -357,20 +436,22 @@ async def index_slack_messages(
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_channels_count": len(skipped_channels),
- "result_message": result_message
- }
+ "result_message": result_message,
+ },
+ )
+
+ logger.info(
+ f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped"
)
-
- logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
return total_processed, result_message
-
+
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Slack indexing for connector {connector_id}",
str(db_error),
- {"error_type": "SQLAlchemyError"}
+ {"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {str(db_error)}")
return 0, f"Database error: {str(db_error)}"
@@ -380,11 +461,12 @@ async def index_slack_messages(
log_entry,
f"Failed to index Slack messages for connector {connector_id}",
str(e),
- {"error_type": type(e).__name__}
+ {"error_type": type(e).__name__},
)
logger.error(f"Failed to index Slack messages: {str(e)}")
return 0, f"Failed to index Slack messages: {str(e)}"
+
async def index_notion_pages(
session: AsyncSession,
connector_id: int,
@@ -392,56 +474,64 @@ async def index_notion_pages(
user_id: str,
start_date: str = None,
end_date: str = None,
- update_last_indexed: bool = True
+ update_last_indexed: bool = True,
) -> Tuple[int, Optional[str]]:
"""
Index Notion pages from all accessible pages.
-
+
Args:
session: Database session
connector_id: ID of the Notion connector
search_space_id: ID of the search space to store documents in
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
-
+
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
-
+
# Log task start
log_entry = await task_logger.log_task_start(
task_name="notion_pages_indexing",
source="connector_indexing_task",
message=f"Starting Notion pages indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ metadata={
+ "connector_id": connector_id,
+ "user_id": str(user_id),
+ "start_date": start_date,
+ "end_date": end_date,
+ },
)
-
+
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Notion connector {connector_id} from database",
- {"stage": "connector_retrieval"}
+ {"stage": "connector_retrieval"},
)
-
+
result = await session.execute(
- select(SearchSourceConnector)
- .filter(
+ select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
- SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR
+ SearchSourceConnector.connector_type
+ == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connector = result.scalars().first()
-
+
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a Notion connector",
"Connector not found",
- {"error_type": "ConnectorNotFound"}
+ {"error_type": "ConnectorNotFound"},
)
- return 0, f"Connector with ID {connector_id} not found or is not a Notion connector"
-
+ return (
+ 0,
+ f"Connector with ID {connector_id} not found or is not a Notion connector",
+ )
+
# Get the Notion token from the connector config
notion_token = connector.config.get("NOTION_INTEGRATION_TOKEN")
if not notion_token:
@@ -449,103 +539,119 @@ async def index_notion_pages(
log_entry,
f"Notion integration token not found in connector config for connector {connector_id}",
"Missing Notion token",
- {"error_type": "MissingToken"}
+ {"error_type": "MissingToken"},
)
return 0, "Notion integration token not found in connector config"
-
+
# Initialize Notion client
await task_logger.log_task_progress(
log_entry,
f"Initializing Notion client for connector {connector_id}",
- {"stage": "client_initialization"}
+ {"stage": "client_initialization"},
)
-
+
logger.info(f"Initializing Notion client for connector {connector_id}")
notion_client = NotionHistoryConnector(token=notion_token)
-
+
# Calculate date range
if start_date is None or end_date is None:
# Fall back to calculating dates
calculated_end_date = datetime.now()
- calculated_start_date = calculated_end_date - timedelta(days=365) # Check for last 1 year of pages
-
+ calculated_start_date = calculated_end_date - timedelta(
+ days=365
+ ) # Check for last 1 year of pages
+
# Use calculated dates if not provided
if start_date is None:
start_date_iso = calculated_start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
else:
# Convert YYYY-MM-DD to ISO format
- start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
-
+ start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
+
if end_date is None:
end_date_iso = calculated_end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
else:
# Convert YYYY-MM-DD to ISO format
- end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
+ end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
else:
# Convert provided dates to ISO format for Notion API
- start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
- end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
-
+ start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
+ end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
+
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
-
+
await task_logger.log_task_progress(
log_entry,
f"Fetching Notion pages from {start_date_iso} to {end_date_iso}",
- {"stage": "fetch_pages", "start_date": start_date_iso, "end_date": end_date_iso}
+ {
+ "stage": "fetch_pages",
+ "start_date": start_date_iso,
+ "end_date": end_date_iso,
+ },
)
-
+
# Get all pages
try:
- pages = notion_client.get_all_pages(start_date=start_date_iso, end_date=end_date_iso)
+ pages = notion_client.get_all_pages(
+ start_date=start_date_iso, end_date=end_date_iso
+ )
logger.info(f"Found {len(pages)} Notion pages")
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Notion pages for connector {connector_id}",
str(e),
- {"error_type": "PageFetchError"}
+ {"error_type": "PageFetchError"},
)
logger.error(f"Error fetching Notion pages: {str(e)}", exc_info=True)
return 0, f"Failed to get Notion pages: {str(e)}"
-
+
if not pages:
await task_logger.log_task_success(
log_entry,
f"No Notion pages found for connector {connector_id}",
- {"pages_found": 0}
+ {"pages_found": 0},
)
logger.info("No Notion pages found to index")
return 0, "No Notion pages found"
-
+
# Track the number of documents indexed
documents_indexed = 0
documents_skipped = 0
skipped_pages = []
-
+
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(pages)} Notion pages",
- {"stage": "process_pages", "total_pages": len(pages)}
+ {"stage": "process_pages", "total_pages": len(pages)},
)
-
+
# Process each page
for page in pages:
try:
page_id = page.get("page_id")
page_title = page.get("title", f"Untitled page ({page_id})")
page_content = page.get("content", [])
-
+
logger.info(f"Processing Notion page: {page_title} ({page_id})")
-
+
if not page_content:
logger.info(f"No content found in page {page_title}. Skipping.")
skipped_pages.append(f"{page_title} (no content)")
documents_skipped += 1
continue
-
+
# Convert page content to markdown format
markdown_content = f"# Notion Page: {page_title}\n\n"
-
+
# Process blocks recursively
def process_blocks(blocks, level=0):
result = ""
@@ -553,10 +659,10 @@ async def index_notion_pages(
block_type = block.get("type")
block_content = block.get("content", "")
children = block.get("children", [])
-
+
# Add indentation based on level
indent = " " * level
-
+
# Format based on block type
if block_type in ["paragraph", "text"]:
result += f"{indent}{block_content}\n\n"
@@ -586,54 +692,62 @@ async def index_notion_pages(
# Default for other block types
if block_content:
result += f"{indent}{block_content}\n\n"
-
+
# Process children recursively
if children:
result += process_blocks(children, level + 1)
-
+
return result
-
- logger.debug(f"Converting {len(page_content)} blocks to markdown for page {page_title}")
+
+ logger.debug(
+ f"Converting {len(page_content)} blocks to markdown for page {page_title}"
+ )
markdown_content += process_blocks(page_content)
-
+
# Format document metadata
metadata_sections = [
- ("METADATA", [
- f"PAGE_TITLE: {page_title}",
- f"PAGE_ID: {page_id}"
- ]),
- ("CONTENT", [
- "FORMAT: markdown",
- "TEXT_START",
- markdown_content,
- "TEXT_END"
- ])
+ ("METADATA", [f"PAGE_TITLE: {page_title}", f"PAGE_ID: {page_id}"]),
+ (
+ "CONTENT",
+ [
+ "FORMAT: markdown",
+ "TEXT_START",
+ markdown_content,
+ "TEXT_END",
+ ],
+ ),
]
-
+
# Build the document string
document_parts = []
document_parts.append("")
-
+
for section_title, section_content in metadata_sections:
document_parts.append(f"<{section_title}>")
document_parts.extend(section_content)
document_parts.append(f"{section_title}>")
-
+
document_parts.append("")
- combined_document_string = '\n'.join(document_parts)
- content_hash = generate_content_hash(combined_document_string, search_space_id)
+ combined_document_string = "\n".join(document_parts)
+ content_hash = generate_content_hash(
+ combined_document_string, search_space_id
+ )
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
- existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
-
+ existing_document_by_hash = (
+ existing_doc_by_hash_result.scalars().first()
+ )
+
if existing_document_by_hash:
- logger.info(f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing.")
+ logger.info(
+ f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing."
+ )
documents_skipped += 1
continue
-
+
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
@@ -641,21 +755,28 @@ async def index_notion_pages(
skipped_pages.append(f"{page_title} (no LLM configured)")
documents_skipped += 1
continue
-
+
# Generate summary
logger.debug(f"Generating summary for page {page_title}")
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
- summary_result = await summary_chain.ainvoke({"document": combined_document_string})
+ summary_result = await summary_chain.ainvoke(
+ {"document": combined_document_string}
+ )
summary_content = summary_result.content
- summary_embedding = config.embedding_model_instance.embed(summary_content)
-
+ summary_embedding = config.embedding_model_instance.embed(
+ summary_content
+ )
+
# Process chunks
logger.debug(f"Chunking content for page {page_title}")
chunks = [
- Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
+ Chunk(
+ content=chunk.text,
+ embedding=config.embedding_model_instance.embed(chunk.text),
+ )
for chunk in config.chunker_instance.chunk(markdown_content)
]
-
+
# Create and store new document
document = Document(
search_space_id=search_space_id,
@@ -664,41 +785,46 @@ async def index_notion_pages(
document_metadata={
"page_title": page_title,
"page_id": page_id,
- "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
- chunks=chunks
+ chunks=chunks,
)
-
+
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new Notion page: {page_title}")
-
+
except Exception as e:
- logger.error(f"Error processing Notion page {page.get('title', 'Unknown')}: {str(e)}", exc_info=True)
- skipped_pages.append(f"{page.get('title', 'Unknown')} (processing error)")
+ logger.error(
+ f"Error processing Notion page {page.get('title', 'Unknown')}: {str(e)}",
+ exc_info=True,
+ )
+ skipped_pages.append(
+ f"{page.get('title', 'Unknown')} (processing error)"
+ )
documents_skipped += 1
continue # Skip this page and continue with others
-
+
# Update the last_indexed_at timestamp for the connector only if requested
# and if we successfully indexed at least one page
total_processed = documents_indexed
if update_last_indexed and total_processed > 0:
connector.last_indexed_at = datetime.now()
logger.info(f"Updated last_indexed_at for connector {connector_id}")
-
+
# Commit all changes
await session.commit()
-
+
# Prepare result message
result_message = None
if skipped_pages:
result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}"
else:
result_message = f"Processed {total_processed} pages."
-
+
# Log success
await task_logger.log_task_success(
log_entry,
@@ -708,22 +834,26 @@ async def index_notion_pages(
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_pages_count": len(skipped_pages),
- "result_message": result_message
- }
+ "result_message": result_message,
+ },
+ )
+
+ logger.info(
+ f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped"
)
-
- logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped")
return total_processed, result_message
-
+
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Notion indexing for connector {connector_id}",
str(db_error),
- {"error_type": "SQLAlchemyError"}
+ {"error_type": "SQLAlchemyError"},
+ )
+ logger.error(
+ f"Database error during Notion indexing: {str(db_error)}", exc_info=True
)
- logger.error(f"Database error during Notion indexing: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
@@ -731,11 +861,12 @@ async def index_notion_pages(
log_entry,
f"Failed to index Notion pages for connector {connector_id}",
str(e),
- {"error_type": type(e).__name__}
+ {"error_type": type(e).__name__},
)
logger.error(f"Failed to index Notion pages: {str(e)}", exc_info=True)
return 0, f"Failed to index Notion pages: {str(e)}"
+
async def index_github_repos(
session: AsyncSession,
connector_id: int,
@@ -743,7 +874,7 @@ async def index_github_repos(
user_id: str,
start_date: str = None,
end_date: str = None,
- update_last_indexed: bool = True
+ update_last_indexed: bool = True,
) -> Tuple[int, Optional[str]]:
"""
Index code and documentation files from accessible GitHub repositories.
@@ -758,15 +889,20 @@ async def index_github_repos(
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
-
+
# Log task start
log_entry = await task_logger.log_task_start(
task_name="github_repos_indexing",
source="connector_indexing_task",
message=f"Starting GitHub repositories indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ metadata={
+ "connector_id": connector_id,
+ "user_id": str(user_id),
+ "start_date": start_date,
+ "end_date": end_date,
+ },
)
-
+
documents_processed = 0
errors = []
@@ -775,14 +911,14 @@ async def index_github_repos(
await task_logger.log_task_progress(
log_entry,
f"Retrieving GitHub connector {connector_id} from database",
- {"stage": "connector_retrieval"}
+ {"stage": "connector_retrieval"},
)
-
+
result = await session.execute(
- select(SearchSourceConnector)
- .filter(
+ select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
- SearchSourceConnector.connector_type == SearchSourceConnectorType.GITHUB_CONNECTOR
+ SearchSourceConnector.connector_type
+ == SearchSourceConnectorType.GITHUB_CONNECTOR,
)
)
connector = result.scalars().first()
@@ -792,9 +928,12 @@ async def index_github_repos(
log_entry,
f"Connector with ID {connector_id} not found or is not a GitHub connector",
"Connector not found",
- {"error_type": "ConnectorNotFound"}
+ {"error_type": "ConnectorNotFound"},
+ )
+ return (
+ 0,
+ f"Connector with ID {connector_id} not found or is not a GitHub connector",
)
- return 0, f"Connector with ID {connector_id} not found or is not a GitHub connector"
# 2. Get the GitHub PAT and selected repositories from the connector config
github_pat = connector.config.get("GITHUB_PAT")
@@ -805,16 +944,18 @@ async def index_github_repos(
log_entry,
f"GitHub Personal Access Token (PAT) not found in connector config for connector {connector_id}",
"Missing GitHub PAT",
- {"error_type": "MissingToken"}
+ {"error_type": "MissingToken"},
)
return 0, "GitHub Personal Access Token (PAT) not found in connector config"
-
- if not repo_full_names_to_index or not isinstance(repo_full_names_to_index, list):
+
+ if not repo_full_names_to_index or not isinstance(
+ repo_full_names_to_index, list
+ ):
await task_logger.log_task_failure(
log_entry,
f"'repo_full_names' not found or is not a list in connector config for connector {connector_id}",
"Invalid repo configuration",
- {"error_type": "InvalidConfiguration"}
+ {"error_type": "InvalidConfiguration"},
)
return 0, "'repo_full_names' not found or is not a list in connector config"
@@ -822,9 +963,12 @@ async def index_github_repos(
await task_logger.log_task_progress(
log_entry,
f"Initializing GitHub client for connector {connector_id}",
- {"stage": "client_initialization", "repo_count": len(repo_full_names_to_index)}
+ {
+ "stage": "client_initialization",
+ "repo_count": len(repo_full_names_to_index),
+ },
)
-
+
try:
github_client = GitHubConnector(token=github_pat)
except ValueError as e:
@@ -832,7 +976,7 @@ async def index_github_repos(
log_entry,
f"Failed to initialize GitHub client for connector {connector_id}",
str(e),
- {"error_type": "ClientInitializationError"}
+ {"error_type": "ClientInitializationError"},
)
return 0, f"Failed to initialize GitHub client: {str(e)}"
@@ -842,12 +986,21 @@ async def index_github_repos(
await task_logger.log_task_progress(
log_entry,
f"Starting indexing for {len(repo_full_names_to_index)} selected repositories",
- {"stage": "repo_processing", "repo_count": len(repo_full_names_to_index), "start_date": start_date, "end_date": end_date}
+ {
+ "stage": "repo_processing",
+ "repo_count": len(repo_full_names_to_index),
+ "start_date": start_date,
+ "end_date": end_date,
+ },
+ )
+
+ logger.info(
+ f"Starting indexing for {len(repo_full_names_to_index)} selected repositories."
)
-
- logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.")
if start_date and end_date:
- logger.info(f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)")
+ logger.info(
+ f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)"
+ )
# 6. Iterate through selected repositories and index files
for repo_full_name in repo_full_names_to_index:
@@ -859,65 +1012,92 @@ async def index_github_repos(
try:
files_to_index = github_client.get_repository_files(repo_full_name)
if not files_to_index:
- logger.info(f"No indexable files found in repository: {repo_full_name}")
+ logger.info(
+ f"No indexable files found in repository: {repo_full_name}"
+ )
continue
- logger.info(f"Found {len(files_to_index)} files to process in {repo_full_name}")
+ logger.info(
+ f"Found {len(files_to_index)} files to process in {repo_full_name}"
+ )
for file_info in files_to_index:
file_path = file_info.get("path")
file_url = file_info.get("url")
file_sha = file_info.get("sha")
- file_type = file_info.get("type") # 'code' or 'doc'
+ file_type = file_info.get("type") # 'code' or 'doc'
full_path_key = f"{repo_full_name}/{file_path}"
if not file_path or not file_url or not file_sha:
- logger.warning(f"Skipping file with missing info in {repo_full_name}: {file_info}")
+ logger.warning(
+ f"Skipping file with missing info in {repo_full_name}: {file_info}"
+ )
continue
# Get file content
- file_content = github_client.get_file_content(repo_full_name, file_path)
+ file_content = github_client.get_file_content(
+ repo_full_name, file_path
+ )
if file_content is None:
- logger.warning(f"Could not retrieve content for {full_path_key}. Skipping.")
- continue # Skip if content fetch failed
-
+ logger.warning(
+ f"Could not retrieve content for {full_path_key}. Skipping."
+ )
+ continue # Skip if content fetch failed
+
content_hash = generate_content_hash(file_content, search_space_id)
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
- existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
-
+ existing_document_by_hash = (
+ existing_doc_by_hash_result.scalars().first()
+ )
+
if existing_document_by_hash:
- logger.info(f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing.")
+ logger.info(
+ f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing."
+ )
continue
-
+
# Use file_content directly for chunking, maybe summary for main content?
# For now, let's use the full content for both, might need refinement
- summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary
- summary_embedding = config.embedding_model_instance.embed(summary_content)
+ summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." # Simple summary
+ summary_embedding = config.embedding_model_instance.embed(
+ summary_content
+ )
# Chunk the content
try:
chunks_data = [
- Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
- for chunk in config.code_chunker_instance.chunk(file_content)
+ Chunk(
+ content=chunk.text,
+ embedding=config.embedding_model_instance.embed(
+ chunk.text
+ ),
+ )
+ for chunk in config.code_chunker_instance.chunk(
+ file_content
+ )
]
except Exception as chunk_err:
- logger.error(f"Failed to chunk file {full_path_key}: {chunk_err}")
- errors.append(f"Chunking failed for {full_path_key}: {chunk_err}")
- continue # Skip this file if chunking fails
+ logger.error(
+ f"Failed to chunk file {full_path_key}: {chunk_err}"
+ )
+ errors.append(
+ f"Chunking failed for {full_path_key}: {chunk_err}"
+ )
+ continue # Skip this file if chunking fails
doc_metadata = {
"repository_full_name": repo_full_name,
"file_path": file_path,
- "full_path": full_path_key, # For easier lookup
+ "full_path": full_path_key, # For easier lookup
"url": file_url,
"sha": file_sha,
"type": file_type,
- "indexed_at": datetime.now(timezone.utc).isoformat()
+ "indexed_at": datetime.now(timezone.utc).isoformat(),
}
# Create new document
@@ -926,22 +1106,26 @@ async def index_github_repos(
title=f"GitHub - {file_path}",
document_type=DocumentType.GITHUB_CONNECTOR,
document_metadata=doc_metadata,
- content=summary_content, # Store summary
+ content=summary_content, # Store summary
content_hash=content_hash,
embedding=summary_embedding,
search_space_id=search_space_id,
- chunks=chunks_data # Associate chunks directly
+ chunks=chunks_data, # Associate chunks directly
)
session.add(document)
documents_processed += 1
except Exception as repo_err:
- logger.error(f"Failed to process repository {repo_full_name}: {repo_err}")
+ logger.error(
+ f"Failed to process repository {repo_full_name}: {repo_err}"
+ )
errors.append(f"Failed processing {repo_full_name}: {repo_err}")
-
+
# Commit all changes at the end
await session.commit()
- logger.info(f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files.")
+ logger.info(
+ f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files."
+ )
# Log success
await task_logger.log_task_success(
@@ -950,8 +1134,8 @@ async def index_github_repos(
{
"documents_processed": documents_processed,
"errors_count": len(errors),
- "repo_count": len(repo_full_names_to_index)
- }
+ "repo_count": len(repo_full_names_to_index),
+ },
)
except SQLAlchemyError as db_err:
@@ -960,9 +1144,11 @@ async def index_github_repos(
log_entry,
f"Database error during GitHub indexing for connector {connector_id}",
str(db_err),
- {"error_type": "SQLAlchemyError"}
+ {"error_type": "SQLAlchemyError"},
+ )
+ logger.error(
+ f"Database error during GitHub indexing for connector {connector_id}: {db_err}"
)
- logger.error(f"Database error during GitHub indexing for connector {connector_id}: {db_err}")
errors.append(f"Database error: {db_err}")
return documents_processed, "; ".join(errors) if errors else str(db_err)
except Exception as e:
@@ -971,15 +1157,19 @@ async def index_github_repos(
log_entry,
f"Unexpected error during GitHub indexing for connector {connector_id}",
str(e),
- {"error_type": type(e).__name__}
+ {"error_type": type(e).__name__},
+ )
+ logger.error(
+ f"Unexpected error during GitHub indexing for connector {connector_id}: {e}",
+ exc_info=True,
)
- logger.error(f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", exc_info=True)
errors.append(f"Unexpected error: {e}")
return documents_processed, "; ".join(errors) if errors else str(e)
error_message = "; ".join(errors) if errors else None
return documents_processed, error_message
+
async def index_linear_issues(
session: AsyncSession,
connector_id: int,
@@ -987,56 +1177,64 @@ async def index_linear_issues(
user_id: str,
start_date: str = None,
end_date: str = None,
- update_last_indexed: bool = True
+ update_last_indexed: bool = True,
) -> Tuple[int, Optional[str]]:
"""
Index Linear issues and comments.
-
+
Args:
session: Database session
connector_id: ID of the Linear connector
search_space_id: ID of the search space to store documents in
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
-
+
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
-
+
# Log task start
log_entry = await task_logger.log_task_start(
task_name="linear_issues_indexing",
source="connector_indexing_task",
message=f"Starting Linear issues indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ metadata={
+ "connector_id": connector_id,
+ "user_id": str(user_id),
+ "start_date": start_date,
+ "end_date": end_date,
+ },
)
-
+
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Linear connector {connector_id} from database",
- {"stage": "connector_retrieval"}
+ {"stage": "connector_retrieval"},
)
-
+
result = await session.execute(
- select(SearchSourceConnector)
- .filter(
+ select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
- SearchSourceConnector.connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR
+ SearchSourceConnector.connector_type
+ == SearchSourceConnectorType.LINEAR_CONNECTOR,
)
)
connector = result.scalars().first()
-
+
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a Linear connector",
"Connector not found",
- {"error_type": "ConnectorNotFound"}
+ {"error_type": "ConnectorNotFound"},
)
- return 0, f"Connector with ID {connector_id} not found or is not a Linear connector"
-
+ return (
+ 0,
+ f"Connector with ID {connector_id} not found or is not a Linear connector",
+ )
+
# Get the Linear token from the connector config
linear_token = connector.config.get("LINEAR_API_KEY")
if not linear_token:
@@ -1044,135 +1242,167 @@ async def index_linear_issues(
log_entry,
f"Linear API token not found in connector config for connector {connector_id}",
"Missing Linear token",
- {"error_type": "MissingToken"}
+ {"error_type": "MissingToken"},
)
return 0, "Linear API token not found in connector config"
-
+
# Initialize Linear client
await task_logger.log_task_progress(
log_entry,
f"Initializing Linear client for connector {connector_id}",
- {"stage": "client_initialization"}
+ {"stage": "client_initialization"},
)
-
+
linear_client = LinearConnector(token=linear_token)
-
+
# Calculate date range
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now()
-
+
# Use last_indexed_at as start date if available, otherwise use 365 days ago
if connector.last_indexed_at:
# Convert dates to be comparable (both timezone-naive)
- last_indexed_naive = connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at
-
+ last_indexed_naive = (
+ connector.last_indexed_at.replace(tzinfo=None)
+ if connector.last_indexed_at.tzinfo
+ else connector.last_indexed_at
+ )
+
# Check if last_indexed_at is in the future or after end_date
if last_indexed_naive > calculated_end_date:
- logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead.")
+ logger.warning(
+ f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead."
+ )
calculated_start_date = calculated_end_date - timedelta(days=365)
else:
calculated_start_date = last_indexed_naive
- logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date")
+ logger.info(
+ f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
+ )
else:
- calculated_start_date = calculated_end_date - timedelta(days=365) # Use 365 days as default
- logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date")
-
+ calculated_start_date = calculated_end_date - timedelta(
+ days=365
+ ) # Use 365 days as default
+ logger.info(
+ f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date"
+ )
+
# Use calculated dates if not provided
- start_date_str = start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
- end_date_str = end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
+ start_date_str = (
+ start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
+ )
+ end_date_str = (
+ end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
+ )
else:
# Use provided dates
start_date_str = start_date
end_date_str = end_date
-
+
logger.info(f"Fetching Linear issues from {start_date_str} to {end_date_str}")
-
+
await task_logger.log_task_progress(
log_entry,
f"Fetching Linear issues from {start_date_str} to {end_date_str}",
- {"stage": "fetch_issues", "start_date": start_date_str, "end_date": end_date_str}
+ {
+ "stage": "fetch_issues",
+ "start_date": start_date_str,
+ "end_date": end_date_str,
+ },
)
-
+
# Get issues within date range
try:
issues, error = linear_client.get_issues_by_date_range(
- start_date=start_date_str,
- end_date=end_date_str,
- include_comments=True
+ start_date=start_date_str, end_date=end_date_str, include_comments=True
)
-
+
if error:
logger.error(f"Failed to get Linear issues: {error}")
-
+
# Don't treat "No issues found" as an error that should stop indexing
if "No issues found" in error:
- logger.info("No issues found is not a critical error, continuing with update")
+ logger.info(
+ "No issues found is not a critical error, continuing with update"
+ )
if update_last_indexed:
connector.last_indexed_at = datetime.now()
await session.commit()
- logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found")
+ logger.info(
+ f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found"
+ )
return 0, None
else:
return 0, f"Failed to get Linear issues: {error}"
-
+
logger.info(f"Retrieved {len(issues)} issues from Linear API")
-
+
except Exception as e:
logger.error(f"Exception when calling Linear API: {str(e)}", exc_info=True)
return 0, f"Failed to get Linear issues: {str(e)}"
-
+
if not issues:
logger.info("No Linear issues found for the specified date range")
if update_last_indexed:
connector.last_indexed_at = datetime.now()
await session.commit()
- logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found")
+ logger.info(
+ f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found"
+ )
return 0, None # Return None instead of error message when no issues found
-
+
# Log issue IDs and titles for debugging
logger.info("Issues retrieved from Linear API:")
for idx, issue in enumerate(issues[:10]): # Log first 10 issues
- logger.info(f" {idx+1}. {issue.get('identifier', 'Unknown')} - {issue.get('title', 'Unknown')} - Created: {issue.get('createdAt', 'Unknown')} - Updated: {issue.get('updatedAt', 'Unknown')}")
+ logger.info(
+ f" {idx + 1}. {issue.get('identifier', 'Unknown')} - {issue.get('title', 'Unknown')} - Created: {issue.get('createdAt', 'Unknown')} - Updated: {issue.get('updatedAt', 'Unknown')}"
+ )
if len(issues) > 10:
logger.info(f" ...and {len(issues) - 10} more issues")
-
+
# Track the number of documents indexed
documents_indexed = 0
documents_skipped = 0
skipped_issues = []
-
+
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(issues)} Linear issues",
- {"stage": "process_issues", "total_issues": len(issues)}
+ {"stage": "process_issues", "total_issues": len(issues)},
)
-
+
# Process each issue
for issue in issues:
try:
issue_id = issue.get("id")
issue_identifier = issue.get("identifier", "")
issue_title = issue.get("title", "")
-
+
if not issue_id or not issue_title:
- logger.warning(f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}")
- skipped_issues.append(f"{issue_identifier or 'Unknown'} (missing data)")
+ logger.warning(
+ f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}"
+ )
+ skipped_issues.append(
+ f"{issue_identifier or 'Unknown'} (missing data)"
+ )
documents_skipped += 1
continue
-
+
# Format the issue first to get well-structured data
formatted_issue = linear_client.format_issue(issue)
-
+
# Convert issue to markdown format
issue_content = linear_client.format_issue_to_markdown(formatted_issue)
-
+
if not issue_content:
- logger.warning(f"Skipping issue with no content: {issue_identifier} - {issue_title}")
+ logger.warning(
+ f"Skipping issue with no content: {issue_identifier} - {issue_title}"
+ )
skipped_issues.append(f"{issue_identifier} (no content)")
documents_skipped += 1
continue
-
+
# Create a short summary for the embedding
# This avoids using the LLM and just uses the issue data directly
state = formatted_issue.get("state", "Unknown")
@@ -1180,40 +1410,51 @@ async def index_linear_issues(
# Truncate description if it's too long for the summary
if description and len(description) > 500:
description = description[:497] + "..."
-
+
# Create a simple summary from the issue data
summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n"
if description:
summary_content += f"Description: {description}\n\n"
-
+
# Add comment count
comment_count = len(formatted_issue.get("comments", []))
summary_content += f"Comments: {comment_count}"
-
+
content_hash = generate_content_hash(issue_content, search_space_id)
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
- existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
-
+ existing_document_by_hash = (
+ existing_doc_by_hash_result.scalars().first()
+ )
+
if existing_document_by_hash:
- logger.info(f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing.")
+ logger.info(
+ f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing."
+ )
documents_skipped += 1
continue
-
+
# Generate embedding for the summary
- summary_embedding = config.embedding_model_instance.embed(summary_content)
-
+ summary_embedding = config.embedding_model_instance.embed(
+ summary_content
+ )
+
# Process chunks - using the full issue content with comments
chunks = [
- Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text))
+ Chunk(
+ content=chunk.text,
+ embedding=config.embedding_model_instance.embed(chunk.text),
+ )
for chunk in config.chunker_instance.chunk(issue_content)
]
-
+
# Create and store new document
- logger.info(f"Creating new document for issue {issue_identifier} - {issue_title}")
+ logger.info(
+ f"Creating new document for issue {issue_identifier} - {issue_title}"
+ )
document = Document(
search_space_id=search_space_id,
title=f"Linear - {issue_identifier}: {issue_title}",
@@ -1224,34 +1465,41 @@ async def index_linear_issues(
"issue_title": issue_title,
"state": state,
"comment_count": comment_count,
- "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
- chunks=chunks
+ chunks=chunks,
)
-
+
session.add(document)
documents_indexed += 1
- logger.info(f"Successfully indexed new issue {issue_identifier} - {issue_title}")
-
+ logger.info(
+ f"Successfully indexed new issue {issue_identifier} - {issue_title}"
+ )
+
except Exception as e:
- logger.error(f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}", exc_info=True)
- skipped_issues.append(f"{issue.get('identifier', 'Unknown')} (processing error)")
+ logger.error(
+ f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}",
+ exc_info=True,
+ )
+ skipped_issues.append(
+ f"{issue.get('identifier', 'Unknown')} (processing error)"
+ )
documents_skipped += 1
continue # Skip this issue and continue with others
-
+
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if update_last_indexed:
connector.last_indexed_at = datetime.now()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
-
+
# Commit all changes
await session.commit()
- logger.info(f"Successfully committed all Linear document changes to database")
-
+ logger.info("Successfully committed all Linear document changes to database")
+
# Log success
await task_logger.log_task_success(
log_entry,
@@ -1260,20 +1508,25 @@ async def index_linear_issues(
"issues_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
- "skipped_issues_count": len(skipped_issues)
- }
+ "skipped_issues_count": len(skipped_issues),
+ },
)
-
- logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped")
- return total_processed, None # Return None as the error message to indicate success
-
+
+ logger.info(
+ f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped"
+ )
+ return (
+ total_processed,
+ None,
+ ) # Return None as the error message to indicate success
+
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Linear indexing for connector {connector_id}",
str(db_error),
- {"error_type": "SQLAlchemyError"}
+ {"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
@@ -1283,11 +1536,12 @@ async def index_linear_issues(
log_entry,
f"Failed to index Linear issues for connector {connector_id}",
str(e),
- {"error_type": type(e).__name__}
+ {"error_type": type(e).__name__},
)
logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True)
return 0, f"Failed to index Linear issues: {str(e)}"
+
async def index_discord_messages(
session: AsyncSession,
connector_id: int,
@@ -1295,7 +1549,7 @@ async def index_discord_messages(
user_id: str,
start_date: str = None,
end_date: str = None,
- update_last_indexed: bool = True
+ update_last_indexed: bool = True,
) -> Tuple[int, Optional[str]]:
"""
Index Discord messages from all accessible channels.
@@ -1310,28 +1564,33 @@ async def index_discord_messages(
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
-
+
# Log task start
log_entry = await task_logger.log_task_start(
task_name="discord_messages_indexing",
source="connector_indexing_task",
message=f"Starting Discord messages indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ metadata={
+ "connector_id": connector_id,
+ "user_id": str(user_id),
+ "start_date": start_date,
+ "end_date": end_date,
+ },
)
-
+
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Discord connector {connector_id} from database",
- {"stage": "connector_retrieval"}
+ {"stage": "connector_retrieval"},
)
-
+
result = await session.execute(
- select(SearchSourceConnector)
- .filter(
+ select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
- SearchSourceConnector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR
+ SearchSourceConnector.connector_type
+ == SearchSourceConnectorType.DISCORD_CONNECTOR,
)
)
connector = result.scalars().first()
@@ -1341,9 +1600,12 @@ async def index_discord_messages(
log_entry,
f"Connector with ID {connector_id} not found or is not a Discord connector",
"Connector not found",
- {"error_type": "ConnectorNotFound"}
+ {"error_type": "ConnectorNotFound"},
+ )
+ return (
+ 0,
+ f"Connector with ID {connector_id} not found or is not a Discord connector",
)
- return 0, f"Connector with ID {connector_id} not found or is not a Discord connector"
# Get the Discord token from the connector config
discord_token = connector.config.get("DISCORD_BOT_TOKEN")
@@ -1352,7 +1614,7 @@ async def index_discord_messages(
log_entry,
f"Discord token not found in connector config for connector {connector_id}",
"Missing Discord token",
- {"error_type": "MissingToken"}
+ {"error_type": "MissingToken"},
)
return 0, "Discord token not found in connector config"
@@ -1362,9 +1624,9 @@ async def index_discord_messages(
await task_logger.log_task_progress(
log_entry,
f"Initializing Discord client for connector {connector_id}",
- {"stage": "client_initialization"}
+ {"stage": "client_initialization"},
)
-
+
discord_client = DiscordConnector(token=discord_token)
# Calculate date range
@@ -1374,30 +1636,54 @@ async def index_discord_messages(
# Use last_indexed_at as start date if available, otherwise use 365 days ago
if connector.last_indexed_at:
- calculated_start_date = connector.last_indexed_at.replace(tzinfo=timezone.utc)
- logger.info(f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date")
+ calculated_start_date = connector.last_indexed_at.replace(
+ tzinfo=timezone.utc
+ )
+ logger.info(
+ f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
+ )
else:
calculated_start_date = calculated_end_date - timedelta(days=365)
- logger.info(f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date")
+ logger.info(
+ f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date"
+ )
# Use calculated dates if not provided, convert to ISO format for Discord API
if start_date is None:
start_date_iso = calculated_start_date.isoformat()
else:
# Convert YYYY-MM-DD to ISO format
- start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat()
-
+ start_date_iso = (
+ datetime.strptime(start_date, "%Y-%m-%d")
+ .replace(tzinfo=timezone.utc)
+ .isoformat()
+ )
+
if end_date is None:
end_date_iso = calculated_end_date.isoformat()
else:
- # Convert YYYY-MM-DD to ISO format
- end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat()
+ # Convert YYYY-MM-DD to ISO format
+ end_date_iso = (
+ datetime.strptime(end_date, "%Y-%m-%d")
+ .replace(tzinfo=timezone.utc)
+ .isoformat()
+ )
else:
# Convert provided dates to ISO format for Discord API
- start_date_iso = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat()
- end_date_iso = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).isoformat()
-
- logger.info(f"Indexing Discord messages from {start_date_iso} to {end_date_iso}")
+ start_date_iso = (
+ datetime.strptime(start_date, "%Y-%m-%d")
+ .replace(tzinfo=timezone.utc)
+ .isoformat()
+ )
+ end_date_iso = (
+ datetime.strptime(end_date, "%Y-%m-%d")
+ .replace(tzinfo=timezone.utc)
+ .isoformat()
+ )
+
+ logger.info(
+ f"Indexing Discord messages from {start_date_iso} to {end_date_iso}"
+ )
documents_indexed = 0
documents_skipped = 0
@@ -1407,9 +1693,9 @@ async def index_discord_messages(
await task_logger.log_task_progress(
log_entry,
f"Starting Discord bot and fetching guilds for connector {connector_id}",
- {"stage": "fetch_guilds"}
+ {"stage": "fetch_guilds"},
)
-
+
logger.info("Starting Discord bot to fetch guilds")
discord_client._bot_task = asyncio.create_task(discord_client.start_bot())
await discord_client._wait_until_ready()
@@ -1422,7 +1708,7 @@ async def index_discord_messages(
log_entry,
f"Failed to get Discord guilds for connector {connector_id}",
str(e),
- {"error_type": "GuildFetchError"}
+ {"error_type": "GuildFetchError"},
)
logger.error(f"Failed to get Discord guilds: {str(e)}", exc_info=True)
await discord_client.close_bot()
@@ -1431,7 +1717,7 @@ async def index_discord_messages(
await task_logger.log_task_success(
log_entry,
f"No Discord guilds found for connector {connector_id}",
- {"guilds_found": 0}
+ {"guilds_found": 0},
)
logger.info("No Discord guilds found to index")
await discord_client.close_bot()
@@ -1441,9 +1727,9 @@ async def index_discord_messages(
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(guilds)} Discord guilds",
- {"stage": "process_guilds", "total_guilds": len(guilds)}
+ {"stage": "process_guilds", "total_guilds": len(guilds)},
)
-
+
for guild in guilds:
guild_id = guild["id"]
guild_name = guild["name"]
@@ -1467,13 +1753,19 @@ async def index_discord_messages(
end_date=end_date_iso,
)
except Exception as e:
- logger.error(f"Failed to get messages for channel {channel_name}: {str(e)}")
- skipped_channels.append(f"{guild_name}#{channel_name} (fetch error)")
+ logger.error(
+ f"Failed to get messages for channel {channel_name}: {str(e)}"
+ )
+ skipped_channels.append(
+ f"{guild_name}#{channel_name} (fetch error)"
+ )
documents_skipped += 1
continue
if not messages:
- logger.info(f"No messages found in channel {channel_name} for the specified date range.")
+ logger.info(
+ f"No messages found in channel {channel_name} for the specified date range."
+ )
documents_skipped += 1
continue
@@ -1486,33 +1778,45 @@ async def index_discord_messages(
formatted_messages.append(msg)
if not formatted_messages:
- logger.info(f"No valid messages found in channel {channel_name} after filtering.")
+ logger.info(
+ f"No valid messages found in channel {channel_name} after filtering."
+ )
documents_skipped += 1
continue
# Convert messages to markdown format
- channel_content = f"# Discord Channel: {guild_name} / {channel_name}\n\n"
+ channel_content = (
+ f"# Discord Channel: {guild_name} / {channel_name}\n\n"
+ )
for msg in formatted_messages:
user_name = msg.get("author_name", "Unknown User")
timestamp = msg.get("created_at", "Unknown Time")
text = msg.get("content", "")
- channel_content += f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n"
+ channel_content += (
+ f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n"
+ )
# Format document metadata
metadata_sections = [
- ("METADATA", [
- f"GUILD_NAME: {guild_name}",
- f"GUILD_ID: {guild_id}",
- f"CHANNEL_NAME: {channel_name}",
- f"CHANNEL_ID: {channel_id}",
- f"MESSAGE_COUNT: {len(formatted_messages)}"
- ]),
- ("CONTENT", [
- "FORMAT: markdown",
- "TEXT_START",
- channel_content,
- "TEXT_END"
- ])
+ (
+ "METADATA",
+ [
+ f"GUILD_NAME: {guild_name}",
+ f"GUILD_ID: {guild_id}",
+ f"CHANNEL_NAME: {channel_name}",
+ f"CHANNEL_ID: {channel_id}",
+ f"MESSAGE_COUNT: {len(formatted_messages)}",
+ ],
+ ),
+ (
+ "CONTENT",
+ [
+ "FORMAT: markdown",
+ "TEXT_START",
+ channel_content,
+ "TEXT_END",
+ ],
+ ),
]
# Build the document string
@@ -1523,31 +1827,43 @@ async def index_discord_messages(
document_parts.extend(section_content)
document_parts.append(f"{section_title}>")
document_parts.append("")
- combined_document_string = '\n'.join(document_parts)
- content_hash = generate_content_hash(combined_document_string, search_space_id)
+ combined_document_string = "\n".join(document_parts)
+ content_hash = generate_content_hash(
+ combined_document_string, search_space_id
+ )
# Check if document with this content hash already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
- existing_document_by_hash = existing_doc_by_hash_result.scalars().first()
+ existing_document_by_hash = (
+ existing_doc_by_hash_result.scalars().first()
+ )
if existing_document_by_hash:
- logger.info(f"Document with content hash {content_hash} already exists for channel {guild_name}#{channel_name}. Skipping processing.")
+ logger.info(
+ f"Document with content hash {content_hash} already exists for channel {guild_name}#{channel_name}. Skipping processing."
+ )
documents_skipped += 1
continue
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
- logger.error(f"No long context LLM configured for user {user_id}")
- skipped_channels.append(f"{guild_name}#{channel_name} (no LLM configured)")
+ logger.error(
+ f"No long context LLM configured for user {user_id}"
+ )
+ skipped_channels.append(
+ f"{guild_name}#{channel_name} (no LLM configured)"
+ )
documents_skipped += 1
continue
# Generate summary using summary_chain
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
- summary_result = await summary_chain.ainvoke({"document": combined_document_string})
+ summary_result = await summary_chain.ainvoke(
+ {"document": combined_document_string}
+ )
summary_content = summary_result.content
summary_embedding = await asyncio.to_thread(
config.embedding_model_instance.embed, summary_content
@@ -1555,14 +1871,17 @@ async def index_discord_messages(
# Process chunks
raw_chunks = await asyncio.to_thread(
- config.chunker_instance.chunk,
- channel_content
+ config.chunker_instance.chunk, channel_content
)
- chunk_texts = [chunk.text for chunk in raw_chunks if chunk.text.strip()]
+ chunk_texts = [
+ chunk.text for chunk in raw_chunks if chunk.text.strip()
+ ]
chunk_embeddings = await asyncio.to_thread(
- lambda texts: [config.embedding_model_instance.embed(t) for t in texts],
- chunk_texts
+ lambda texts: [
+ config.embedding_model_instance.embed(t) for t in texts
+ ],
+ chunk_texts,
)
chunks = [
@@ -1583,20 +1902,26 @@ async def index_discord_messages(
"message_count": len(formatted_messages),
"start_date": start_date_iso,
"end_date": end_date_iso,
- "indexed_at": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
+ "indexed_at": datetime.now(timezone.utc).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ ),
},
content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
- chunks=chunks
+ chunks=chunks,
)
session.add(document)
documents_indexed += 1
- logger.info(f"Successfully indexed new channel {guild_name}#{channel_name} with {len(formatted_messages)} messages")
+ logger.info(
+ f"Successfully indexed new channel {guild_name}#{channel_name} with {len(formatted_messages)} messages"
+ )
except Exception as e:
- logger.error(f"Error processing guild {guild_name}: {str(e)}", exc_info=True)
+ logger.error(
+ f"Error processing guild {guild_name}: {str(e)}", exc_info=True
+ )
skipped_channels.append(f"{guild_name} (processing error)")
documents_skipped += 1
continue
@@ -1625,11 +1950,13 @@ async def index_discord_messages(
"documents_skipped": documents_skipped,
"skipped_channels_count": len(skipped_channels),
"guilds_processed": len(guilds),
- "result_message": result_message
- }
+ "result_message": result_message,
+ },
)
- logger.info(f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
+ logger.info(
+ f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped"
+ )
return documents_indexed, result_message
except SQLAlchemyError as db_error:
@@ -1638,9 +1965,11 @@ async def index_discord_messages(
log_entry,
f"Database error during Discord indexing for connector {connector_id}",
str(db_error),
- {"error_type": "SQLAlchemyError"}
+ {"error_type": "SQLAlchemyError"},
+ )
+ logger.error(
+ f"Database error during Discord indexing: {str(db_error)}", exc_info=True
)
- logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
@@ -1648,7 +1977,7 @@ async def index_discord_messages(
log_entry,
f"Failed to index Discord messages for connector {connector_id}",
str(e),
- {"error_type": type(e).__name__}
+ {"error_type": type(e).__name__},
)
logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True)
return 0, f"Failed to index Discord messages: {str(e)}"
@@ -1661,7 +1990,7 @@ async def index_jira_issues(
user_id: str,
start_date: str = None,
end_date: str = None,
- update_last_indexed: bool = True
+ update_last_indexed: bool = True,
) -> Tuple[int, Optional[str]]:
"""
Index Jira issues and comments.
@@ -1685,13 +2014,20 @@ async def index_jira_issues(
task_name="jira_issues_indexing",
source="connector_indexing_task",
message=f"Starting Jira issues indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ metadata={
+ "connector_id": connector_id,
+ "user_id": str(user_id),
+ "start_date": start_date,
+ "end_date": end_date,
+ },
)
try:
# Get the connector from the database
result = await session.execute(
- select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id)
+ select(SearchSourceConnector).where(
+ SearchSourceConnector.id == connector_id
+ )
)
connector = result.scalar_one_or_none()
@@ -1700,7 +2036,7 @@ async def index_jira_issues(
log_entry,
f"Connector with ID {connector_id} not found",
"Connector not found",
- {"error_type": "ConnectorNotFound"}
+ {"error_type": "ConnectorNotFound"},
)
return 0, f"Connector with ID {connector_id} not found"
@@ -1713,7 +2049,7 @@ async def index_jira_issues(
log_entry,
f"Jira credentials not found in connector config for connector {connector_id}",
"Missing Jira credentials",
- {"error_type": "MissingCredentials"}
+ {"error_type": "MissingCredentials"},
)
return 0, "Jira credentials not found in connector config"
@@ -1721,10 +2057,12 @@ async def index_jira_issues(
await task_logger.log_task_progress(
log_entry,
f"Initializing Jira client for connector {connector_id}",
- {"stage": "client_initialization"}
+ {"stage": "client_initialization"},
)
- jira_client = JiraConnector(base_url=jira_base_url, personal_access_token=jira_token)
+ jira_client = JiraConnector(
+ base_url=jira_base_url, personal_access_token=jira_token
+ )
# Calculate date range
if start_date is None or end_date is None:
@@ -1737,8 +2075,8 @@ async def index_jira_issues(
# If never indexed, go back 30 days
calculated_start_date = calculated_end_date - timedelta(days=30)
- start_date_str = calculated_start_date.strftime('%Y-%m-%d')
- end_date_str = calculated_end_date.strftime('%Y-%m-%d')
+ start_date_str = calculated_start_date.strftime("%Y-%m-%d")
+ end_date_str = calculated_end_date.strftime("%Y-%m-%d")
else:
start_date_str = start_date
end_date_str = end_date
@@ -1746,15 +2084,17 @@ async def index_jira_issues(
await task_logger.log_task_progress(
log_entry,
f"Fetching Jira issues from {start_date_str} to {end_date_str}",
- {"stage": "fetching_issues", "start_date": start_date_str, "end_date": end_date_str}
+ {
+ "stage": "fetching_issues",
+ "start_date": start_date_str,
+ "end_date": end_date_str,
+ },
)
# Get issues within date range
try:
issues, error = jira_client.get_issues_by_date_range(
- start_date=start_date_str,
- end_date=end_date_str,
- include_comments=True
+ start_date=start_date_str, end_date=end_date_str, include_comments=True
)
if error:
@@ -1762,16 +2102,20 @@ async def index_jira_issues(
# Don't treat "No issues found" as an error that should stop indexing
if "No issues found" in error:
- logger.info("No issues found is not a critical error, continuing with update")
+ logger.info(
+ "No issues found is not a critical error, continuing with update"
+ )
if update_last_indexed:
connector.last_indexed_at = datetime.now()
await session.commit()
- logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found")
+ logger.info(
+ f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found"
+ )
await task_logger.log_task_completion(
log_entry,
f"No Jira issues found in date range {start_date_str} to {end_date_str}",
- {"indexed_count": 0}
+ {"indexed_count": 0},
)
return 0, None
else:
@@ -1779,7 +2123,7 @@ async def index_jira_issues(
log_entry,
f"Failed to get Jira issues: {error}",
"API Error",
- {"error_type": "APIError"}
+ {"error_type": "APIError"},
)
return 0, f"Failed to get Jira issues: {error}"
@@ -1788,7 +2132,7 @@ async def index_jira_issues(
await task_logger.log_task_progress(
log_entry,
f"Retrieved {len(issues)} issues from Jira API",
- {"stage": "processing_issues", "issue_count": len(issues)}
+ {"stage": "processing_issues", "issue_count": len(issues)},
)
except Exception as e:
@@ -1796,7 +2140,7 @@ async def index_jira_issues(
log_entry,
f"Error fetching Jira issues: {str(e)}",
"Fetch Error",
- {"error_type": type(e).__name__}
+ {"error_type": type(e).__name__},
)
logger.error(f"Error fetching Jira issues: {str(e)}", exc_info=True)
return 0, f"Error fetching Jira issues: {str(e)}"
@@ -1820,14 +2164,20 @@ async def index_jira_issues(
"priority": formatted_issue.get("priority", ""),
"issue_type": formatted_issue.get("issue_type", ""),
"project": formatted_issue.get("project", ""),
- "assignee": formatted_issue.get("assignee", {}).get("display_name", "") if formatted_issue.get("assignee") else "",
- "reporter": formatted_issue.get("reporter", {}).get("display_name", ""),
+ "assignee": (
+ formatted_issue.get("assignee", {}).get("display_name", "")
+ if formatted_issue.get("assignee")
+ else ""
+ ),
+ "reporter": formatted_issue.get("reporter", {}).get(
+ "display_name", ""
+ ),
"created_at": formatted_issue.get("created_at", ""),
"updated_at": formatted_issue.get("updated_at", ""),
"comment_count": len(formatted_issue.get("comments", [])),
"connector_id": connector_id,
"source": "jira",
- "base_url": jira_base_url
+ "base_url": jira_base_url,
}
# Generate content hash
@@ -1840,7 +2190,9 @@ async def index_jira_issues(
existing_doc = existing_doc_result.scalar_one_or_none()
if existing_doc:
- logger.debug(f"Document with hash {content_hash} already exists, skipping")
+ logger.debug(
+ f"Document with hash {content_hash} already exists, skipping"
+ )
continue
# Create new document
@@ -1850,34 +2202,47 @@ async def index_jira_issues(
document_metadata=metadata,
content=issue_markdown,
content_hash=content_hash,
- search_space_id=search_space_id
+ search_space_id=search_space_id,
)
# Generate embedding
- embedding = await config.embedding_model_instance.get_embedding(issue_markdown)
+ embedding = await config.embedding_model_instance.get_embedding(
+ issue_markdown
+ )
document.embedding = embedding
session.add(document)
await session.flush() # Flush to get the document ID
# Create chunks for the document
- chunks = await config.chunking_model_instance.chunk_document(issue_markdown)
+ chunks = await config.chunking_model_instance.chunk_document(
+ issue_markdown
+ )
for chunk_content in chunks:
- chunk_embedding = await config.embedding_model_instance.get_embedding(chunk_content)
+ chunk_embedding = (
+ await config.embedding_model_instance.get_embedding(
+ chunk_content
+ )
+ )
chunk = Chunk(
content=chunk_content,
embedding=chunk_embedding,
- document_id=document.id
+ document_id=document.id,
)
session.add(chunk)
indexed_count += 1
- logger.debug(f"Indexed Jira issue: {formatted_issue.get('key', 'Unknown')}")
+ logger.debug(
+ f"Indexed Jira issue: {formatted_issue.get('key', 'Unknown')}"
+ )
except Exception as e:
- logger.error(f"Error processing Jira issue {issue.get('key', 'Unknown')}: {str(e)}", exc_info=True)
+ logger.error(
+ f"Error processing Jira issue {issue.get('key', 'Unknown')}: {str(e)}",
+ exc_info=True,
+ )
continue
# Commit all changes
@@ -1892,7 +2257,7 @@ async def index_jira_issues(
await task_logger.log_task_completion(
log_entry,
f"Successfully indexed {indexed_count} Jira issues",
- {"indexed_count": indexed_count}
+ {"indexed_count": indexed_count},
)
logger.info(f"Successfully indexed {indexed_count} Jira issues")
@@ -1903,7 +2268,7 @@ async def index_jira_issues(
log_entry,
f"Failed to index Jira issues: {str(e)}",
str(e),
- {"error_type": type(e).__name__}
+ {"error_type": type(e).__name__},
)
logger.error(f"Failed to index Jira issues: {str(e)}", exc_info=True)
return 0, f"Failed to index Jira issues: {str(e)}"