diff --git a/node_modules/.cache/prettier/.prettier-caches/a2ecb2962bf19c1099cfe708e42daa0097f94976.json b/node_modules/.cache/prettier/.prettier-caches/a2ecb2962bf19c1099cfe708e42daa0097f94976.json new file mode 100644 index 0000000..e744e3a --- /dev/null +++ b/node_modules/.cache/prettier/.prettier-caches/a2ecb2962bf19c1099cfe708e42daa0097f94976.json @@ -0,0 +1 @@ +{"2d0ec64d93969318101ee479b664221b32241665":{"files":{"surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/page.tsx":["EHKKvlOK0vfy0GgHwlG/J2Bx5rw=",true]},"modified":1753426633288}} \ No newline at end of file diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 0fb9dc3..7919465 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -2,78 +2,79 @@ import asyncio import json from typing import Any, Dict, List +from app.db import Document, SearchSpace from app.services.connector_service import ConnectorService +from app.services.query_service import QueryService from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.runnables import RunnableConfig - -from sqlalchemy.ext.asyncio import AsyncSession - -from .configuration import Configuration, SearchMode -from .prompts import get_answer_outline_system_prompt, get_further_questions_system_prompt -from .state import State -from .sub_section_writer.graph import graph as sub_section_writer_graph -from .sub_section_writer.configuration import SubSectionType -from .qna_agent.graph import graph as qna_agent_graph -from .utils import AnswerOutline, get_connector_emoji, get_connector_friendly_name - -from app.services.query_service import QueryService - from langgraph.types import StreamWriter +from sqlalchemy.ext.asyncio import AsyncSession # Additional imports for document fetching from sqlalchemy.future import select -from app.db import Document, SearchSpace + +from .configuration import Configuration, SearchMode +from .prompts import ( + get_answer_outline_system_prompt, + get_further_questions_system_prompt, +) +from .qna_agent.graph import graph as qna_agent_graph +from .state import State +from .sub_section_writer.configuration import SubSectionType +from .sub_section_writer.graph import graph as sub_section_writer_graph +from .utils import AnswerOutline, get_connector_emoji, get_connector_friendly_name + async def fetch_documents_by_ids( - document_ids: List[int], - user_id: str, - db_session: AsyncSession + document_ids: List[int], user_id: str, db_session: AsyncSession ) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: """ Fetch documents by their IDs with ownership check using DOCUMENTS mode approach. - + This function ensures that only documents belonging to the user are fetched, providing security by checking ownership through SearchSpace association. Similar to SearchMode.DOCUMENTS, it fetches full documents and concatenates their chunks. Also creates source objects for UI display, grouped by document type. - + Args: document_ids: List of document IDs to fetch user_id: The user ID to check ownership db_session: The database session - + Returns: Tuple of (source_objects, document_chunks) - similar to ConnectorService pattern """ if not document_ids: return [], [] - + try: # Query documents with ownership check result = await db_session.execute( select(Document) .join(SearchSpace) - .filter( - Document.id.in_(document_ids), - SearchSpace.user_id == user_id - ) + .filter(Document.id.in_(document_ids), SearchSpace.user_id == user_id) ) documents = result.scalars().all() - + # Group documents by type for source object creation documents_by_type = {} formatted_documents = [] - + for doc in documents: # Fetch associated chunks for this document (similar to DocumentHybridSearchRetriever) from app.db import Chunk - chunks_query = select(Chunk).where(Chunk.document_id == doc.id).order_by(Chunk.id) + + chunks_query = ( + select(Chunk).where(Chunk.document_id == doc.id).order_by(Chunk.id) + ) chunks_result = await db_session.execute(chunks_query) chunks = chunks_result.scalars().all() - + # Concatenate chunks content (similar to SearchMode.DOCUMENTS approach) - concatenated_chunks_content = " ".join([chunk.content for chunk in chunks]) if chunks else doc.content - + concatenated_chunks_content = ( + " ".join([chunk.content for chunk in chunks]) if chunks else doc.content + ) + # Format to match connector service return format formatted_doc = { "chunk_id": f"user_doc_{doc.id}", @@ -82,102 +83,156 @@ async def fetch_documents_by_ids( "document": { "id": doc.id, "title": doc.title, - "document_type": doc.document_type.value if doc.document_type else "UNKNOWN", + "document_type": ( + doc.document_type.value if doc.document_type else "UNKNOWN" + ), "metadata": doc.document_metadata or {}, }, - "source": doc.document_type.value if doc.document_type else "UNKNOWN" + "source": doc.document_type.value if doc.document_type else "UNKNOWN", } formatted_documents.append(formatted_doc) - + # Group by document type for source objects doc_type = doc.document_type.value if doc.document_type else "UNKNOWN" if doc_type not in documents_by_type: documents_by_type[doc_type] = [] documents_by_type[doc_type].append(doc) - + # Create source objects for each document type (similar to ConnectorService) source_objects = [] - connector_id_counter = 100 # Start from 100 to avoid conflicts with regular connectors - + connector_id_counter = ( + 100 # Start from 100 to avoid conflicts with regular connectors + ) + for doc_type, docs in documents_by_type.items(): sources_list = [] - + for doc in docs: metadata = doc.document_metadata or {} - + # Create type-specific source formatting (similar to ConnectorService) if doc_type == "LINEAR_CONNECTOR": # Extract Linear-specific metadata - issue_identifier = metadata.get('issue_identifier', '') - issue_title = metadata.get('issue_title', doc.title) - issue_state = metadata.get('state', '') - comment_count = metadata.get('comment_count', 0) - + issue_identifier = metadata.get("issue_identifier", "") + issue_title = metadata.get("issue_title", doc.title) + issue_state = metadata.get("state", "") + comment_count = metadata.get("comment_count", 0) + # Create a more descriptive title for Linear issues - title = f"Linear: {issue_identifier} - {issue_title}" if issue_identifier else f"Linear: {issue_title}" + title = ( + f"Linear: {issue_identifier} - {issue_title}" + if issue_identifier + else f"Linear: {issue_title}" + ) if issue_state: title += f" ({issue_state})" - + # Create description - description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content + description = ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ) if comment_count: description += f" | Comments: {comment_count}" - + # Create URL - url = f"https://linear.app/issue/{issue_identifier}" if issue_identifier else "" - + url = ( + f"https://linear.app/issue/{issue_identifier}" + if issue_identifier + else "" + ) + elif doc_type == "SLACK_CONNECTOR": # Extract Slack-specific metadata - channel_name = metadata.get('channel_name', 'Unknown Channel') - channel_id = metadata.get('channel_id', '') - message_date = metadata.get('start_date', '') - + channel_name = metadata.get("channel_name", "Unknown Channel") + channel_id = metadata.get("channel_id", "") + message_date = metadata.get("start_date", "") + title = f"Slack: {channel_name}" if message_date: title += f" ({message_date})" - - description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content - url = f"https://slack.com/app_redirect?channel={channel_id}" if channel_id else "" - + + description = ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ) + url = ( + f"https://slack.com/app_redirect?channel={channel_id}" + if channel_id + else "" + ) + elif doc_type == "NOTION_CONNECTOR": # Extract Notion-specific metadata - page_title = metadata.get('page_title', doc.title) - page_id = metadata.get('page_id', '') - + page_title = metadata.get("page_title", doc.title) + page_id = metadata.get("page_id", "") + title = f"Notion: {page_title}" - description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content - url = f"https://notion.so/{page_id.replace('-', '')}" if page_id else "" - + description = ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ) + url = ( + f"https://notion.so/{page_id.replace('-', '')}" + if page_id + else "" + ) + elif doc_type == "GITHUB_CONNECTOR": title = f"GitHub: {doc.title}" - description = metadata.get('description', doc.content[:100] + "..." if len(doc.content) > 100 else doc.content) - url = metadata.get('url', '') - + description = metadata.get( + "description", + ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ), + ) + url = metadata.get("url", "") + elif doc_type == "YOUTUBE_VIDEO": # Extract YouTube-specific metadata - video_title = metadata.get('video_title', doc.title) - video_id = metadata.get('video_id', '') - channel_name = metadata.get('channel_name', '') - + video_title = metadata.get("video_title", doc.title) + video_id = metadata.get("video_id", "") + channel_name = metadata.get("channel_name", "") + title = video_title if channel_name: title += f" - {channel_name}" - - description = metadata.get('description', doc.content[:100] + "..." if len(doc.content) > 100 else doc.content) - url = f"https://www.youtube.com/watch?v={video_id}" if video_id else "" - + + description = metadata.get( + "description", + ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ), + ) + url = ( + f"https://www.youtube.com/watch?v={video_id}" + if video_id + else "" + ) + elif doc_type == "DISCORD_CONNECTOR": # Extract Discord-specific metadata - channel_name = metadata.get('channel_name', 'Unknown Channel') - channel_id = metadata.get('channel_id', '') - guild_id = metadata.get('guild_id', '') - message_date = metadata.get('start_date', '') + channel_name = metadata.get("channel_name", "Unknown Channel") + channel_id = metadata.get("channel_id", "") + guild_id = metadata.get("guild_id", "") + message_date = metadata.get("start_date", "") title = f"Discord: {channel_name}" if message_date: title += f" ({message_date})" - description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content + description = ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ) if guild_id and channel_id: url = f"https://discord.com/channels/{guild_id}/{channel_id}" @@ -188,20 +243,28 @@ async def fetch_documents_by_ids( elif doc_type == "JIRA_CONNECTOR": # Extract Jira-specific metadata - issue_key = metadata.get('issue_key', 'Unknown Issue') - issue_title = metadata.get('issue_title', 'Untitled Issue') - status = metadata.get('status', '') - priority = metadata.get('priority', '') - issue_type = metadata.get('issue_type', '') + issue_key = metadata.get("issue_key", "Unknown Issue") + issue_title = metadata.get("issue_title", "Untitled Issue") + status = metadata.get("status", "") + priority = metadata.get("priority", "") + issue_type = metadata.get("issue_type", "") title = f"Jira: {issue_key} - {issue_title}" if status: title += f" ({status})" - description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content + description = ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ) + if priority: + description += f" | Priority: {priority}" + if issue_type: + description += f" | Type: {issue_type}" # Construct Jira URL if we have the base URL - base_url = metadata.get('base_url', '') + base_url = metadata.get("base_url", "") if base_url and issue_key: url = f"{base_url}/browse/{issue_key}" else: @@ -209,37 +272,61 @@ async def fetch_documents_by_ids( elif doc_type == "EXTENSION": # Extract Extension-specific metadata - webpage_title = metadata.get('VisitedWebPageTitle', doc.title) - webpage_url = metadata.get('VisitedWebPageURL', '') - visit_date = metadata.get('VisitedWebPageDateWithTimeInISOString', '') - + webpage_title = metadata.get("VisitedWebPageTitle", doc.title) + webpage_url = metadata.get("VisitedWebPageURL", "") + visit_date = metadata.get( + "VisitedWebPageDateWithTimeInISOString", "" + ) + title = webpage_title if visit_date: - formatted_date = visit_date.split('T')[0] if 'T' in visit_date else visit_date + formatted_date = ( + visit_date.split("T")[0] + if "T" in visit_date + else visit_date + ) title += f" (visited: {formatted_date})" - - description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content + + description = ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ) url = webpage_url - + elif doc_type == "CRAWLED_URL": title = doc.title - description = metadata.get('og:description', metadata.get('ogDescription', doc.content[:100] + "..." if len(doc.content) > 100 else doc.content)) - url = metadata.get('url', '') - + description = metadata.get( + "og:description", + metadata.get( + "ogDescription", + ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ), + ), + ) + url = metadata.get("url", "") + else: # FILE and other types title = doc.title - description = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content - url = metadata.get('url', '') - + description = ( + doc.content[:100] + "..." + if len(doc.content) > 100 + else doc.content + ) + url = metadata.get("url", "") + # Create source entry source = { "id": doc.id, "title": title, "description": description, - "url": url + "url": url, } sources_list.append(source) - + # Create source object for this document type friendly_type_names = { "LINEAR_CONNECTOR": "Linear Issues (Selected)", @@ -251,9 +338,9 @@ async def fetch_documents_by_ids( "JIRA_CONNECTOR": "Jira Issues (Selected)", "EXTENSION": "Browser Extension (Selected)", "CRAWLED_URL": "Web Pages (Selected)", - "FILE": "Files (Selected)" + "FILE": "Files (Selected)", } - + source_object = { "id": connector_id_counter, "name": friendly_type_names.get(doc_type, f"{doc_type} (Selected)"), @@ -262,31 +349,34 @@ async def fetch_documents_by_ids( } source_objects.append(source_object) connector_id_counter += 1 - - print(f"Fetched {len(formatted_documents)} user-selected documents (with concatenated chunks) from {len(document_ids)} requested IDs") + + print( + f"Fetched {len(formatted_documents)} user-selected documents (with concatenated chunks) from {len(document_ids)} requested IDs" + ) print(f"Created {len(source_objects)} source objects for UI display") - + return source_objects, formatted_documents - + except Exception as e: print(f"Error fetching documents by IDs: {str(e)}") return [], [] -async def write_answer_outline(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]: +async def write_answer_outline( + state: State, config: RunnableConfig, writer: StreamWriter +) -> Dict[str, Any]: """ Create a structured answer outline based on the user query. - + This node takes the user query and number of sections from the configuration and uses an LLM to generate a comprehensive outline with logical sections and research questions for each section. - + Returns: Dict containing the answer outline in the "answer_outline" key for state update. """ from app.services.llm_service import get_user_strategic_llm - from app.db import get_async_session - + streaming_service = state.streaming_service writer( @@ -321,10 +411,10 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str # Create the human message content human_message_content = f""" Now Please create an answer outline for the following query: - + User Query: {reformulated_query} Number of Sections: {num_sections} - + Remember to format your response as valid JSON exactly matching this structure: {{ "answer_outline": [ @@ -338,7 +428,7 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str }} ] }} - + Your output MUST be valid JSON in exactly this format. Do not include any other text or explanation. """ @@ -353,9 +443,9 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str # Create messages for the LLM messages = [ SystemMessage(content=get_answer_outline_system_prompt()), - HumanMessage(content=human_message_content) + HumanMessage(content=human_message_content), ] - + # Call the LLM directly without using structured output writer( { @@ -366,26 +456,28 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str ) response = await llm.ainvoke(messages) - + # Parse the JSON response manually try: # Extract JSON content from the response content = response.content - + # Find the JSON in the content (handle case where LLM might add additional text) - json_start = content.find('{') - json_end = content.rfind('}') + 1 + json_start = content.find("{") + json_end = content.rfind("}") + 1 if json_start >= 0 and json_end > json_start: json_str = content[json_start:json_end] - + # Parse the JSON string parsed_data = json.loads(json_str) - + # Convert to Pydantic model answer_outline = AnswerOutline(**parsed_data) - - total_questions = sum(len(section.questions) for section in answer_outline.answer_outline) - + + total_questions = sum( + len(section.questions) for section in answer_outline.answer_outline + ) + writer( { "yield_value": streaming_service.format_terminal_info_delta( @@ -429,16 +521,16 @@ async def fetch_relevant_documents( top_k: int = 10, connector_service: ConnectorService = None, search_mode: SearchMode = SearchMode.CHUNKS, - user_selected_sources: List[Dict[str, Any]] = None + user_selected_sources: List[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """ Fetch relevant documents for research questions using the provided connectors. - + This function searches across multiple data sources for information related to the research questions. It provides user-friendly feedback during the search process by displaying connector names (like "Web Search" instead of "TAVILY_API") and adding relevant emojis to indicate the type of source being searched. - + Args: research_questions: List of research questions to find documents for user_id: The user ID @@ -449,19 +541,21 @@ async def fetch_relevant_documents( state: The current state containing the streaming service top_k: Number of top results to retrieve per connector per question connector_service: An initialized connector service to use for searching - + Returns: List of relevant documents """ # Initialize services # connector_service = ConnectorService(db_session) - + # Only use streaming if both writer and state are provided streaming_service = state.streaming_service if state is not None else None # Stream initial status update if streaming_service and writer: - connector_names = [get_connector_friendly_name(connector) for connector in connectors_to_search] + connector_names = [ + get_connector_friendly_name(connector) for connector in connectors_to_search + ] connector_names_str = ", ".join(connector_names) writer( { @@ -473,7 +567,7 @@ async def fetch_relevant_documents( all_raw_documents = [] # Store all raw documents all_sources = [] # Store all sources - + for i, user_query in enumerate(research_questions): # Stream question being researched if streaming_service and writer: @@ -487,7 +581,7 @@ async def fetch_relevant_documents( # Use original research question as the query reformulated_query = user_query - + # Process each selected connector for connector in connectors_to_search: # Stream connector being searched @@ -504,19 +598,22 @@ async def fetch_relevant_documents( try: if connector == "YOUTUBE_VIDEO": - source_object, youtube_chunks = await connector_service.search_youtube( + ( + source_object, + youtube_chunks, + ) = await connector_service.search_youtube( user_query=reformulated_query, user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(youtube_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -528,19 +625,22 @@ async def fetch_relevant_documents( ) elif connector == "EXTENSION": - source_object, extension_chunks = await connector_service.search_extension( + ( + source_object, + extension_chunks, + ) = await connector_service.search_extension( user_query=reformulated_query, user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(extension_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -552,19 +652,22 @@ async def fetch_relevant_documents( ) elif connector == "CRAWLED_URL": - source_object, crawled_urls_chunks = await connector_service.search_crawled_urls( + ( + source_object, + crawled_urls_chunks, + ) = await connector_service.search_crawled_urls( user_query=reformulated_query, user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(crawled_urls_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -581,14 +684,14 @@ async def fetch_relevant_documents( user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(files_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -605,14 +708,14 @@ async def fetch_relevant_documents( user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(slack_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -624,19 +727,22 @@ async def fetch_relevant_documents( ) elif connector == "NOTION_CONNECTOR": - source_object, notion_chunks = await connector_service.search_notion( + ( + source_object, + notion_chunks, + ) = await connector_service.search_notion( user_query=reformulated_query, user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(notion_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -648,19 +754,22 @@ async def fetch_relevant_documents( ) elif connector == "GITHUB_CONNECTOR": - source_object, github_chunks = await connector_service.search_github( + ( + source_object, + github_chunks, + ) = await connector_service.search_github( user_query=reformulated_query, user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(github_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -672,19 +781,22 @@ async def fetch_relevant_documents( ) elif connector == "LINEAR_CONNECTOR": - source_object, linear_chunks = await connector_service.search_linear( + ( + source_object, + linear_chunks, + ) = await connector_service.search_linear( user_query=reformulated_query, user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(linear_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -696,17 +808,18 @@ async def fetch_relevant_documents( ) elif connector == "TAVILY_API": - source_object, tavily_chunks = await connector_service.search_tavily( - user_query=reformulated_query, - user_id=user_id, - top_k=top_k + ( + source_object, + tavily_chunks, + ) = await connector_service.search_tavily( + user_query=reformulated_query, user_id=user_id, top_k=top_k ) - + # Add to sources and raw documents if source_object: all_sources.append(source_object) all_raw_documents.extend(tavily_chunks) - + # Stream found document count if streaming_service and writer: writer( @@ -723,14 +836,14 @@ async def fetch_relevant_documents( source_object, linkup_chunks = await connector_service.search_linkup( user_query=reformulated_query, user_id=user_id, - mode=linkup_mode - ) - + mode=linkup_mode, + ) + # Add to sources and raw documents if source_object: all_sources.append(source_object) - all_raw_documents.extend(linkup_chunks) - + all_raw_documents.extend(linkup_chunks) + # Stream found document count if streaming_service and writer: writer( @@ -742,12 +855,15 @@ async def fetch_relevant_documents( ) elif connector == "DISCORD_CONNECTOR": - source_object, discord_chunks = await connector_service.search_discord( + ( + source_object, + discord_chunks, + ) = await connector_service.search_discord( user_query=reformulated_query, user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) # Add to sources and raw documents if source_object: @@ -769,7 +885,7 @@ async def fetch_relevant_documents( user_id=user_id, search_space_id=search_space_id, top_k=top_k, - search_mode=search_mode + search_mode=search_mode, ) # Add to sources and raw documents @@ -790,7 +906,7 @@ async def fetch_relevant_documents( except Exception as e: error_message = f"Error searching connector {connector}: {str(e)}" print(error_message) - + # Stream error message if streaming_service and writer: friendly_name = get_connector_friendly_name(connector) @@ -804,17 +920,17 @@ async def fetch_relevant_documents( # Continue with other connectors on error continue - + # Deduplicate source objects by ID before streaming deduplicated_sources = [] seen_source_keys = set() - + # First add user-selected sources (if any) if user_selected_sources: for source_obj in user_selected_sources: - source_id = source_obj.get('id') - source_type = source_obj.get('type') - + source_id = source_obj.get("id") + source_type = source_obj.get("type") + if source_id and source_type: source_key = f"{source_type}_{source_id}" if source_key not in seen_source_keys: @@ -822,14 +938,14 @@ async def fetch_relevant_documents( deduplicated_sources.append(source_obj) else: deduplicated_sources.append(source_obj) - + # Then add connector sources for source_obj in all_sources: # Use combination of source ID and type as a unique identifier # This ensures we don't accidentally deduplicate sources from different connectors - source_id = source_obj.get('id') - source_type = source_obj.get('type') - + source_id = source_obj.get("id") + source_type = source_obj.get("type") + if source_id and source_type: source_key = f"{source_type}_{source_id}" current_sources_count = len(source_obj.get('sources', [])) @@ -877,28 +993,36 @@ async def fetch_relevant_documents( # After all sources are collected and deduplicated, stream them if streaming_service and writer: - writer({"yield_value": streaming_service.format_sources_delta(deduplicated_sources)}) + writer( + { + "yield_value": streaming_service.format_sources_delta( + deduplicated_sources + ) + } + ) # Deduplicate raw documents based on chunk_id or content seen_chunk_ids = set() seen_content_hashes = set() deduplicated_docs = [] - + for doc in all_raw_documents: chunk_id = doc.get("chunk_id") content = doc.get("content", "") content_hash = hash(content) - + # Skip if we've seen this chunk_id or content before - if (chunk_id and chunk_id in seen_chunk_ids) or content_hash in seen_content_hashes: + if ( + chunk_id and chunk_id in seen_chunk_ids + ) or content_hash in seen_content_hashes: continue - + # Add to our tracking sets and keep this document if chunk_id: seen_chunk_ids.add(chunk_id) seen_content_hashes.add(content_hash) deduplicated_docs.append(doc) - + # Stream info about deduplicated documents if streaming_service and writer: writer( @@ -913,14 +1037,16 @@ async def fetch_relevant_documents( return deduplicated_docs -async def process_sections(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]: +async def process_sections( + state: State, config: RunnableConfig, writer: StreamWriter +) -> Dict[str, Any]: """ Process all sections in parallel and combine the results. - - This node takes the answer outline from the previous step, fetches relevant documents - for all questions across all sections once, and then processes each section in parallel + + This node takes the answer outline from the previous step, fetches relevant documents + for all questions across all sections once, and then processes each section in parallel using the sub_section_writer graph with the shared document pool. - + Returns: Dict containing the final written report in the "final_written_report" key. """ @@ -928,7 +1054,7 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW configuration = Configuration.from_runnable_config(config) answer_outline = state.answer_outline streaming_service = state.streaming_service - + # Initialize a dictionary to track content for all sections # This is used to maintain section content while streaming multiple sections section_contents = {} @@ -942,19 +1068,19 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW ) print(f"Processing sections from outline: {answer_outline is not None}") - + if not answer_outline: error_message = "No answer outline was provided. Cannot generate report." writer({"yield_value": streaming_service.format_error(error_message)}) return { "final_written_report": "No answer outline was provided. Cannot generate final report." } - + # Collect all questions from all sections all_questions = [] for section in answer_outline.answer_outline: all_questions.extend(section.questions) - + print(f"Collected {len(all_questions)} questions from all sections") writer( { @@ -981,11 +1107,11 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW TOP_K = 30 else: TOP_K = 10 - + relevant_documents = [] user_selected_documents = [] user_selected_sources = [] - + try: # First, fetch user-selected documents if any if configuration.document_ids_to_add_in_context: @@ -997,12 +1123,15 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW } ) - user_selected_sources, user_selected_documents = await fetch_documents_by_ids( + ( + user_selected_sources, + user_selected_documents, + ) = await fetch_documents_by_ids( document_ids=configuration.document_ids_to_add_in_context, user_id=configuration.user_id, - db_session=state.db_session + db_session=state.db_session, ) - + if user_selected_documents: writer( { @@ -1013,9 +1142,11 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW ) # Create connector service using state db_session - connector_service = ConnectorService(state.db_session, user_id=configuration.user_id) + connector_service = ConnectorService( + state.db_session, user_id=configuration.user_id + ) await connector_service.initialize_counter() - + relevant_documents = await fetch_relevant_documents( research_questions=all_questions, user_id=configuration.user_id, @@ -1027,7 +1158,7 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW top_k=TOP_K, connector_service=connector_service, search_mode=configuration.search_mode, - user_selected_sources=user_selected_sources + user_selected_sources=user_selected_sources, ) except Exception as e: error_message = f"Error fetching relevant documents: {str(e)}" @@ -1036,12 +1167,14 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW # Log the error and continue with an empty list of documents # This allows the process to continue, but the report might lack information relevant_documents = [] - + # Combine user-selected documents with connector-fetched documents all_documents = user_selected_documents + relevant_documents - + print(f"Fetched {len(relevant_documents)} relevant documents for all sections") - print(f"Added {len(user_selected_documents)} user-selected documents for all sections") + print( + f"Added {len(user_selected_documents)} user-selected documents for all sections" + ) print(f"Total documents for sections: {len(all_documents)}") writer( @@ -1069,14 +1202,14 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW sub_section_type = SubSectionType.END else: sub_section_type = SubSectionType.MIDDLE - + # Initialize the section_contents entry for this section section_contents[i] = { "title": section.section_title, "content": "", - "index": i + "index": i, } - + section_tasks.append( process_section_with_documents( section_id=i, @@ -1089,10 +1222,10 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW state=state, writer=writer, sub_section_type=sub_section_type, - section_contents=section_contents + section_contents=section_contents, ) ) - + # Run all section processing tasks in parallel print(f"Running {len(section_tasks)} section processing tasks in parallel") writer( @@ -1104,7 +1237,7 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW ) section_results = await asyncio.gather(*section_tasks, return_exceptions=True) - + # Handle any exceptions in the results writer( { @@ -1124,22 +1257,25 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW processed_results.append(error_message) else: processed_results.append(result) - + # Combine the results into a final report with section titles final_report = [] - for i, (section, content) in enumerate(zip(answer_outline.answer_outline, processed_results)): + for i, (section, content) in enumerate( + zip(answer_outline.answer_outline, processed_results) + ): # Skip adding the section header since the content already contains the title final_report.append(content) - final_report.append("\n") - + final_report.append("\n") + # Stream each section with its title writer( { - "yield_value": state.streaming_service.format_text_chunk(f"# {section.section_title}\n\n{content}") + "yield_value": state.streaming_service.format_text_chunk( + f"# {section.section_title}\n\n{content}" + ) } ) - # Join all sections with newlines final_written_report = "\n".join(final_report) print(f"Generated final report with {len(final_report)} parts") @@ -1156,26 +1292,26 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW # Since all sections used the same document pool, we can use it directly return { "final_written_report": final_written_report, - "reranked_documents": all_documents + "reranked_documents": all_documents, } async def process_section_with_documents( section_id: int, - section_title: str, + section_title: str, section_questions: List[str], - user_id: str, - search_space_id: int, + user_id: str, + search_space_id: int, relevant_documents: List[Dict[str, Any]], user_query: str, state: State = None, writer: StreamWriter = None, sub_section_type: SubSectionType = SubSectionType.MIDDLE, - section_contents: Dict[int, Dict[str, Any]] = None + section_contents: Dict[int, Dict[str, Any]] = None, ) -> str: """ Process a single section using pre-fetched documents. - + Args: section_id: The ID of the section section_title: The title of the section @@ -1187,14 +1323,14 @@ async def process_section_with_documents( writer: StreamWriter for sending progress updates sub_section_type: The type of section (start, middle, end) section_contents: Dictionary to track content across multiple sections - + Returns: The written section content """ try: # Use the provided documents documents_to_use = relevant_documents - + # Send status update via streaming if available if state and state.streaming_service and writer: writer( @@ -1221,7 +1357,7 @@ async def process_section_with_documents( {"content": f"No specific information was found for: {question}"} for question in section_questions ] - + # Call the sub_section_writer graph with the appropriate config config = { "configurable": { @@ -1234,13 +1370,10 @@ async def process_section_with_documents( "search_space_id": search_space_id, } } - + # Create the initial state with db_session and chat_history - sub_state = { - "db_session": state.db_session, - "chat_history": state.chat_history - } - + sub_state = {"db_session": state.db_session, "chat_history": state.chat_history} + # Invoke the sub-section writer graph with streaming print(f"Invoking sub_section_writer for: {section_title}") if state and state.streaming_service and writer: @@ -1254,17 +1387,19 @@ async def process_section_with_documents( # Variables to track streaming state complete_content = "" # Tracks the complete content received so far - - async for chunk_type, chunk in sub_section_writer_graph.astream(sub_state, config, stream_mode=["values"]): + + async for chunk_type, chunk in sub_section_writer_graph.astream( + sub_state, config, stream_mode=["values"] + ): if "final_answer" in chunk: new_content = chunk["final_answer"] if new_content and new_content != complete_content: # Extract only the new content (delta) - delta = new_content[len(complete_content):] - + delta = new_content[len(complete_content) :] + # Update what we've processed so far complete_content = new_content - + # Only stream if there's actual new content if delta and state and state.streaming_service and writer: # Update terminal with real-time progress indicator @@ -1278,26 +1413,29 @@ async def process_section_with_documents( # Update section_contents with just the new delta section_contents[section_id]["content"] += delta - + # Build UI-friendly content for all sections complete_answer = [] for i in range(len(section_contents)): if i in section_contents and section_contents[i]["content"]: # Add section header - complete_answer.append(f"# {section_contents[i]['title']}") + complete_answer.append( + f"# {section_contents[i]['title']}" + ) complete_answer.append("") # Empty line after title - + # Add section content - content_lines = section_contents[i]["content"].split("\n") + content_lines = section_contents[i]["content"].split( + "\n" + ) complete_answer.extend(content_lines) complete_answer.append("") # Empty line after content - # Set default if no content was received if not complete_content: complete_content = "No content was generated for this section." section_contents[section_id]["content"] = complete_content - + # Final terminal update if state and state.streaming_service and writer: writer( @@ -1311,7 +1449,7 @@ async def process_section_with_documents( return complete_content except Exception as e: print(f"Error processing section '{section_title}': {str(e)}") - + # Send error update via streaming if available if state and state.streaming_service and writer: writer( @@ -1325,37 +1463,46 @@ async def process_section_with_documents( return f"Error processing section: {section_title}. Details: {str(e)}" -async def reformulate_user_query(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]: +async def reformulate_user_query( + state: State, config: RunnableConfig, writer: StreamWriter +) -> Dict[str, Any]: """ Reforms the user query based on the chat history. """ - + configuration = Configuration.from_runnable_config(config) user_query = configuration.user_query - chat_history_str = await QueryService.langchain_chat_history_to_str(state.chat_history) - if len(state.chat_history) == 0: + chat_history_str = await QueryService.langchain_chat_history_to_str( + state.chat_history + ) + if len(state.chat_history) == 0: reformulated_query = user_query else: - reformulated_query = await QueryService.reformulate_query_with_chat_history(user_query=user_query, session=state.db_session, user_id=configuration.user_id, chat_history_str=chat_history_str) - - return { - "reformulated_query": reformulated_query - } + reformulated_query = await QueryService.reformulate_query_with_chat_history( + user_query=user_query, + session=state.db_session, + user_id=configuration.user_id, + chat_history_str=chat_history_str, + ) + + return {"reformulated_query": reformulated_query} -async def handle_qna_workflow(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]: +async def handle_qna_workflow( + state: State, config: RunnableConfig, writer: StreamWriter +) -> Dict[str, Any]: """ Handle the QNA research workflow. - + This node fetches relevant documents for the user query and then uses the QNA agent to generate a comprehensive answer with proper citations. - + Returns: Dict containing the final answer in the "final_written_report" key for consistency. """ streaming_service = state.streaming_service configuration = Configuration.from_runnable_config(config) - + reformulated_query = state.reformulated_query user_query = configuration.user_query @@ -1386,11 +1533,11 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre # Use a reasonable top_k for QNA - not too many documents to avoid overwhelming the LLM TOP_K = 15 - + relevant_documents = [] user_selected_documents = [] user_selected_sources = [] - + try: # First, fetch user-selected documents if any if configuration.document_ids_to_add_in_context: @@ -1402,12 +1549,15 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre } ) - user_selected_sources, user_selected_documents = await fetch_documents_by_ids( + ( + user_selected_sources, + user_selected_documents, + ) = await fetch_documents_by_ids( document_ids=configuration.document_ids_to_add_in_context, user_id=configuration.user_id, - db_session=state.db_session + db_session=state.db_session, ) - + if user_selected_documents: writer( { @@ -1418,12 +1568,14 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre ) # Create connector service using state db_session - connector_service = ConnectorService(state.db_session, user_id=configuration.user_id) + connector_service = ConnectorService( + state.db_session, user_id=configuration.user_id + ) await connector_service.initialize_counter() - + # Use the reformulated query as a single research question research_questions = [reformulated_query, user_query] - + relevant_documents = await fetch_relevant_documents( research_questions=research_questions, user_id=configuration.user_id, @@ -1435,7 +1587,7 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre top_k=TOP_K, connector_service=connector_service, search_mode=configuration.search_mode, - user_selected_sources=user_selected_sources + user_selected_sources=user_selected_sources, ) except Exception as e: error_message = f"Error fetching relevant documents for QNA: {str(e)}" @@ -1443,10 +1595,10 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre writer({"yield_value": streaming_service.format_error(error_message)}) # Continue with empty documents - the QNA agent will handle this gracefully relevant_documents = [] - + # Combine user-selected documents with connector-fetched documents all_documents = user_selected_documents + relevant_documents - + print(f"Fetched {len(relevant_documents)} relevant documents for QNA") print(f"Added {len(user_selected_documents)} user-selected documents for QNA") print(f"Total documents for QNA: {len(all_documents)}") @@ -1466,16 +1618,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre "reformulated_query": reformulated_query, "relevant_documents": all_documents, # Use combined documents "user_id": configuration.user_id, - "search_space_id": configuration.search_space_id + "search_space_id": configuration.search_space_id, } } - + # Create the state for the QNA agent (it has a different state structure) - qna_state = { - "db_session": state.db_session, - "chat_history": state.chat_history - } - + qna_state = {"db_session": state.db_session, "chat_history": state.chat_history} + try: writer( { @@ -1488,16 +1637,18 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre # Track streaming content for real-time updates complete_content = "" captured_reranked_documents = [] - + # Call the QNA agent with streaming - async for _chunk_type, chunk in qna_agent_graph.astream(qna_state, qna_config, stream_mode=["values"]): + async for _chunk_type, chunk in qna_agent_graph.astream( + qna_state, qna_config, stream_mode=["values"] + ): if "final_answer" in chunk: new_content = chunk["final_answer"] if new_content and new_content != complete_content: # Extract only the new content (delta) - delta = new_content[len(complete_content):] + delta = new_content[len(complete_content) :] complete_content = new_content - + # Stream the real-time answer if there's new content if delta: # Update terminal with progress @@ -1517,7 +1668,7 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre # Capture reranked documents from QNA agent for further question generation if "reranked_documents" in chunk: captured_reranked_documents = chunk["reranked_documents"] - + # Set default if no content was received if not complete_content: complete_content = "I couldn't find relevant information in your knowledge base to answer this question." @@ -1533,9 +1684,9 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre # Return the final answer and captured reranked documents for further question generation return { "final_written_report": complete_content, - "reranked_documents": captured_reranked_documents + "reranked_documents": captured_reranked_documents, } - + except Exception as e: error_message = f"Error generating QNA answer: {str(e)}" print(error_message) @@ -1544,27 +1695,29 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre return {"final_written_report": f"Error generating answer: {str(e)}"} -async def generate_further_questions(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]: +async def generate_further_questions( + state: State, config: RunnableConfig, writer: StreamWriter +) -> Dict[str, Any]: """ Generate contextually relevant follow-up questions based on chat history and available documents. - + This node takes the chat history and reranked documents from sub-agents (qna_agent or sub_section_writer) and uses an LLM to generate follow-up questions that would naturally extend the conversation and provide additional value to the user. - + Returns: Dict containing the further questions in the "further_questions" key for state update. """ from app.services.llm_service import get_user_fast_llm - + # Get configuration and state data configuration = Configuration.from_runnable_config(config) chat_history = state.chat_history user_id = configuration.user_id streaming_service = state.streaming_service - + # Get reranked documents from the state (will be populated by sub-agents) - reranked_documents = getattr(state, 'reranked_documents', None) or [] + reranked_documents = getattr(state, "reranked_documents", None) or [] writer( { @@ -1584,11 +1737,11 @@ async def generate_further_questions(state: State, config: RunnableConfig, write # Stream empty further questions to UI writer({"yield_value": streaming_service.format_further_questions_delta([])}) return {"further_questions": []} - + # Format chat history for the prompt chat_history_xml = "\n" for message in chat_history: - if hasattr(message, 'type'): + if hasattr(message, "type"): if message.type == "human": chat_history_xml += f"{message.content}\n" elif message.type == "ai": @@ -1597,7 +1750,7 @@ async def generate_further_questions(state: State, config: RunnableConfig, write # Handle other message types if needed chat_history_xml += f"{str(message)}\n" chat_history_xml += "" - + # Format available documents for the prompt documents_xml = "\n" for i, doc in enumerate(reranked_documents): @@ -1605,24 +1758,24 @@ async def generate_further_questions(state: State, config: RunnableConfig, write source_id = document_info.get("id", f"doc_{i}") source_type = document_info.get("document_type", "UNKNOWN") content = doc.get("content", "") - - documents_xml += f"\n" - documents_xml += f"\n" + + documents_xml += "\n" + documents_xml += "\n" documents_xml += f"{source_id}\n" documents_xml += f"{source_type}\n" - documents_xml += f"\n" + documents_xml += "\n" documents_xml += f"\n{content}\n" - documents_xml += f"\n" + documents_xml += "\n" documents_xml += "" - + # Create the human message content human_message_content = f""" {chat_history_xml} - + {documents_xml} - + Based on the chat history and available documents above, generate 3-5 contextually relevant follow-up questions that would naturally extend the conversation and provide additional value to the user. Make sure the questions can be reasonably answered using the available documents or knowledge base. - + Your response MUST be valid JSON in exactly this format: {{ "further_questions": [ @@ -1636,7 +1789,7 @@ async def generate_further_questions(state: State, config: RunnableConfig, write }} ] }} - + Do not include any other text or explanation. Only return the JSON. """ @@ -1651,25 +1804,25 @@ async def generate_further_questions(state: State, config: RunnableConfig, write # Create messages for the LLM messages = [ SystemMessage(content=get_further_questions_system_prompt()), - HumanMessage(content=human_message_content) + HumanMessage(content=human_message_content), ] - + try: # Call the LLM response = await llm.ainvoke(messages) - + # Parse the JSON response content = response.content - + # Find the JSON in the content - json_start = content.find('{') - json_end = content.rfind('}') + 1 + json_start = content.find("{") + json_end = content.rfind("}") + 1 if json_start >= 0 and json_end > json_start: json_str = content[json_start:json_end] - + # Parse the JSON string parsed_data = json.loads(json_str) - + # Extract the further_questions array further_questions = parsed_data.get("further_questions", []) @@ -1691,7 +1844,7 @@ async def generate_further_questions(state: State, config: RunnableConfig, write ) print(f"Successfully generated {len(further_questions)} further questions") - + return {"further_questions": further_questions} else: # If JSON structure not found, return empty list @@ -1712,7 +1865,7 @@ async def generate_further_questions(state: State, config: RunnableConfig, write {"yield_value": streaming_service.format_further_questions_delta([])} ) return {"further_questions": []} - + except (json.JSONDecodeError, ValueError) as e: # Log the error and return empty list error_message = f"Error parsing further questions response: {str(e)}" @@ -1724,7 +1877,7 @@ async def generate_further_questions(state: State, config: RunnableConfig, write # Stream empty further questions to UI writer({"yield_value": streaming_service.format_further_questions_delta([])}) return {"further_questions": []} - + except Exception as e: # Handle any other errors error_message = f"Error generating further questions: {str(e)}" diff --git a/surfsense_backend/app/connectors/jira_connector.py b/surfsense_backend/app/connectors/jira_connector.py index eeecb0f..7d810d7 100644 --- a/surfsense_backend/app/connectors/jira_connector.py +++ b/surfsense_backend/app/connectors/jira_connector.py @@ -6,7 +6,6 @@ Allows fetching issue lists and their comments, projects and more. """ import base64 -import json from datetime import datetime from typing import Any, Dict, List, Optional @@ -119,8 +118,6 @@ class JiraConnector: response = requests.get(url, headers=headers, params=params, timeout=500) - print(json.dumps(response.json(), indent=2)) - if response.status_code == 200: return response.json() else: @@ -227,6 +224,7 @@ class JiraConnector: date_filter = ( f"(createdDate >= '{start_date}' AND createdDate <= '{end_date}')" ) + # TODO : This JQL needs some improvement to work as expected jql = f"{date_filter}" if project_key: @@ -252,7 +250,7 @@ class JiraConnector: fields.append("comment") params = { - "jql": "", + "jql": "", # TODO : Add a JQL query to filter from a date range "fields": ",".join(fields), "maxResults": 100, "startAt": 0, @@ -263,10 +261,8 @@ class JiraConnector: while True: params["startAt"] = start_at - print(json.dumps(params, indent=2)) - result = self.make_api_request("search", params) - print(json.dumps(result, indent=2)) + result = self.make_api_request("search", params) if not isinstance(result, dict) or "issues" not in result: return [], "Invalid response from Jira API" diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 33366ff..838b81a 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -9,35 +9,58 @@ POST /search-source-connectors/{connector_id}/index - Index content from a conne Note: Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, NOTION_CONNECTOR, GITHUB_CONNECTOR, LINEAR_CONNECTOR, DISCORD_CONNECTOR). """ -from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks, Body -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select -from sqlalchemy.exc import IntegrityError -from typing import List, Dict, Any -from app.db import get_async_session, User, SearchSourceConnector, SearchSourceConnectorType, SearchSpace, async_session_maker -from app.schemas import SearchSourceConnectorCreate, SearchSourceConnectorUpdate, SearchSourceConnectorRead, SearchSourceConnectorBase + +import logging +from datetime import datetime, timedelta +from typing import Any, Dict, List + +from app.connectors.github_connector import GitHubConnector +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + SearchSpace, + User, + async_session_maker, + get_async_session, +) +from app.schemas import ( + SearchSourceConnectorBase, + SearchSourceConnectorCreate, + SearchSourceConnectorRead, + SearchSourceConnectorUpdate, +) +from app.tasks.connectors_indexing_tasks import ( + index_discord_messages, + index_github_repos, + index_jira_issues, + index_linear_issues, + index_notion_pages, + index_slack_messages, +) from app.users import current_active_user from app.utils.check_ownership import check_ownership +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query from pydantic import BaseModel, Field, ValidationError -from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues, index_discord_messages, index_jira_issues -from app.connectors.github_connector import GitHubConnector -from datetime import datetime, timedelta -import logging +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select # Set up logging logger = logging.getLogger(__name__) router = APIRouter() + # Use Pydantic's BaseModel here class GitHubPATRequest(BaseModel): github_pat: str = Field(..., description="GitHub Personal Access Token") + # --- New Endpoint to list GitHub Repositories --- @router.post("/github/repositories/", response_model=List[Dict[str, Any]]) async def list_github_repositories( pat_request: GitHubPATRequest, - user: User = Depends(current_active_user) # Ensure the user is logged in + user: User = Depends(current_active_user), # Ensure the user is logged in ): """ Fetches a list of repositories accessible by the provided GitHub PAT. @@ -54,35 +77,39 @@ async def list_github_repositories( logger.error(f"GitHub PAT validation failed for user {user.id}: {str(e)}") raise HTTPException(status_code=400, detail=f"Invalid GitHub PAT: {str(e)}") except Exception as e: - logger.error(f"Failed to fetch GitHub repositories for user {user.id}: {str(e)}") - raise HTTPException(status_code=500, detail="Failed to fetch GitHub repositories.") + logger.error( + f"Failed to fetch GitHub repositories for user {user.id}: {str(e)}" + ) + raise HTTPException( + status_code=500, detail="Failed to fetch GitHub repositories." + ) + @router.post("/search-source-connectors/", response_model=SearchSourceConnectorRead) async def create_search_source_connector( connector: SearchSourceConnectorCreate, session: AsyncSession = Depends(get_async_session), - user: User = Depends(current_active_user) + user: User = Depends(current_active_user), ): """ Create a new search source connector. - + Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, etc.). The config must contain the appropriate keys for the connector type. """ try: # Check if a connector with the same type already exists for this user result = await session.execute( - select(SearchSourceConnector) - .filter( + select(SearchSourceConnector).filter( SearchSourceConnector.user_id == user.id, - SearchSourceConnector.connector_type == connector.connector_type + SearchSourceConnector.connector_type == connector.connector_type, ) ) existing_connector = result.scalars().first() if existing_connector: raise HTTPException( status_code=409, - detail=f"A connector with type {connector.connector_type} already exists. Each user can have only one connector of each type." + detail=f"A connector with type {connector.connector_type} already exists. Each user can have only one connector of each type.", ) db_connector = SearchSourceConnector(**connector.model_dump(), user_id=user.id) session.add(db_connector) @@ -91,15 +118,12 @@ async def create_search_source_connector( return db_connector except ValidationError as e: await session.rollback() - raise HTTPException( - status_code=422, - detail=f"Validation error: {str(e)}" - ) + raise HTTPException(status_code=422, detail=f"Validation error: {str(e)}") except IntegrityError as e: await session.rollback() raise HTTPException( status_code=409, - detail=f"Integrity error: A connector with this type already exists. {str(e)}" + detail=f"Integrity error: A connector with this type already exists. {str(e)}", ) except HTTPException: await session.rollback() @@ -109,38 +133,44 @@ async def create_search_source_connector( await session.rollback() raise HTTPException( status_code=500, - detail=f"Failed to create search source connector: {str(e)}" + detail=f"Failed to create search source connector: {str(e)}", ) -@router.get("/search-source-connectors/", response_model=List[SearchSourceConnectorRead]) + +@router.get( + "/search-source-connectors/", response_model=List[SearchSourceConnectorRead] +) async def read_search_source_connectors( skip: int = 0, limit: int = 100, search_space_id: int = None, session: AsyncSession = Depends(get_async_session), - user: User = Depends(current_active_user) + user: User = Depends(current_active_user), ): """List all search source connectors for the current user.""" try: - query = select(SearchSourceConnector).filter(SearchSourceConnector.user_id == user.id) - - # No need to filter by search_space_id as connectors are user-owned, not search space specific - - result = await session.execute( - query.offset(skip).limit(limit) + query = select(SearchSourceConnector).filter( + SearchSourceConnector.user_id == user.id ) + + # No need to filter by search_space_id as connectors are user-owned, not search space specific + + result = await session.execute(query.offset(skip).limit(limit)) return result.scalars().all() except Exception as e: raise HTTPException( status_code=500, - detail=f"Failed to fetch search source connectors: {str(e)}" + detail=f"Failed to fetch search source connectors: {str(e)}", ) -@router.get("/search-source-connectors/{connector_id}", response_model=SearchSourceConnectorRead) + +@router.get( + "/search-source-connectors/{connector_id}", response_model=SearchSourceConnectorRead +) async def read_search_source_connector( connector_id: int, session: AsyncSession = Depends(get_async_session), - user: User = Depends(current_active_user) + user: User = Depends(current_active_user), ): """Get a specific search source connector by ID.""" try: @@ -149,31 +179,37 @@ async def read_search_source_connector( raise except Exception as e: raise HTTPException( - status_code=500, - detail=f"Failed to fetch search source connector: {str(e)}" + status_code=500, detail=f"Failed to fetch search source connector: {str(e)}" ) -@router.put("/search-source-connectors/{connector_id}", response_model=SearchSourceConnectorRead) + +@router.put( + "/search-source-connectors/{connector_id}", response_model=SearchSourceConnectorRead +) async def update_search_source_connector( connector_id: int, connector_update: SearchSourceConnectorUpdate, session: AsyncSession = Depends(get_async_session), - user: User = Depends(current_active_user) + user: User = Depends(current_active_user), ): """ Update a search source connector. Handles partial updates, including merging changes into the 'config' field. """ - db_connector = await check_ownership(session, SearchSourceConnector, connector_id, user) - + db_connector = await check_ownership( + session, SearchSourceConnector, connector_id, user + ) + # Convert the sparse update data (only fields present in request) to a dict update_data = connector_update.model_dump(exclude_unset=True) # Special handling for 'config' field if "config" in update_data: - incoming_config = update_data["config"] # Config data from the request - existing_config = db_connector.config if db_connector.config else {} # Current config from DB - + incoming_config = update_data["config"] # Config data from the request + existing_config = ( + db_connector.config if db_connector.config else {} + ) # Current config from DB + # Merge incoming config into existing config # This preserves existing keys (like GITHUB_PAT) if they are not in the incoming data merged_config = existing_config.copy() @@ -182,26 +218,29 @@ async def update_search_source_connector( # -- Validation after merging -- # Validate the *merged* config based on the connector type # We need the connector type - use the one from the update if provided, else the existing one - current_connector_type = connector_update.connector_type if connector_update.connector_type is not None else db_connector.connector_type - + current_connector_type = ( + connector_update.connector_type + if connector_update.connector_type is not None + else db_connector.connector_type + ) + try: # We can reuse the base validator by creating a temporary base model instance # Note: This assumes 'name' and 'is_indexable' are not crucial for config validation itself temp_data_for_validation = { - "name": db_connector.name, # Use existing name + "name": db_connector.name, # Use existing name "connector_type": current_connector_type, - "is_indexable": db_connector.is_indexable, # Use existing value - "last_indexed_at": db_connector.last_indexed_at, # Not used by validator - "config": merged_config + "is_indexable": db_connector.is_indexable, # Use existing value + "last_indexed_at": db_connector.last_indexed_at, # Not used by validator + "config": merged_config, } SearchSourceConnectorBase.model_validate(temp_data_for_validation) except ValidationError as e: # Raise specific validation error for the merged config raise HTTPException( - status_code=422, - detail=f"Validation error for merged config: {str(e)}" + status_code=422, detail=f"Validation error for merged config: {str(e)}" ) - + # If validation passes, update the main update_data dict with the merged config update_data["config"] = merged_config @@ -210,20 +249,19 @@ async def update_search_source_connector( # Prevent changing connector_type if it causes a duplicate (check moved here) if key == "connector_type" and value != db_connector.connector_type: result = await session.execute( - select(SearchSourceConnector) - .filter( + select(SearchSourceConnector).filter( SearchSourceConnector.user_id == user.id, SearchSourceConnector.connector_type == value, - SearchSourceConnector.id != connector_id + SearchSourceConnector.id != connector_id, ) ) existing_connector = result.scalars().first() if existing_connector: raise HTTPException( status_code=409, - detail=f"A connector with type {value} already exists. Each user can have only one connector of each type." + detail=f"A connector with type {value} already exists. Each user can have only one connector of each type.", ) - + setattr(db_connector, key, value) try: @@ -234,26 +272,31 @@ async def update_search_source_connector( await session.rollback() # This might occur if connector_type constraint is violated somehow after the check raise HTTPException( - status_code=409, - detail=f"Database integrity error during update: {str(e)}" + status_code=409, detail=f"Database integrity error during update: {str(e)}" ) except Exception as e: await session.rollback() - logger.error(f"Failed to update search source connector {connector_id}: {e}", exc_info=True) + logger.error( + f"Failed to update search source connector {connector_id}: {e}", + exc_info=True, + ) raise HTTPException( status_code=500, - detail=f"Failed to update search source connector: {str(e)}" + detail=f"Failed to update search source connector: {str(e)}", ) + @router.delete("/search-source-connectors/{connector_id}", response_model=dict) async def delete_search_source_connector( connector_id: int, session: AsyncSession = Depends(get_async_session), - user: User = Depends(current_active_user) + user: User = Depends(current_active_user), ): """Delete a search source connector.""" try: - db_connector = await check_ownership(session, SearchSourceConnector, connector_id, user) + db_connector = await check_ownership( + session, SearchSourceConnector, connector_id, user + ) await session.delete(db_connector) await session.commit() return {"message": "Search source connector deleted successfully"} @@ -263,22 +306,33 @@ async def delete_search_source_connector( await session.rollback() raise HTTPException( status_code=500, - detail=f"Failed to delete search source connector: {str(e)}" + detail=f"Failed to delete search source connector: {str(e)}", ) -@router.post("/search-source-connectors/{connector_id}/index", response_model=Dict[str, Any]) + +@router.post( + "/search-source-connectors/{connector_id}/index", response_model=Dict[str, Any] +) async def index_connector_content( connector_id: int, - search_space_id: int = Query(..., description="ID of the search space to store indexed content"), - start_date: str = Query(None, description="Start date for indexing (YYYY-MM-DD format). If not provided, uses last_indexed_at or defaults to 365 days ago"), - end_date: str = Query(None, description="End date for indexing (YYYY-MM-DD format). If not provided, uses today's date"), + search_space_id: int = Query( + ..., description="ID of the search space to store indexed content" + ), + start_date: str = Query( + None, + description="Start date for indexing (YYYY-MM-DD format). If not provided, uses last_indexed_at or defaults to 365 days ago", + ), + end_date: str = Query( + None, + description="End date for indexing (YYYY-MM-DD format). If not provided, uses today's date", + ), session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), - background_tasks: BackgroundTasks = None + background_tasks: BackgroundTasks = None, ): """ Index content from a connector to a search space. - + Currently supports: - SLACK_CONNECTOR: Indexes messages from all accessible Slack channels - NOTION_CONNECTOR: Indexes pages from all accessible Notion pages @@ -286,26 +340,30 @@ async def index_connector_content( - LINEAR_CONNECTOR: Indexes issues and comments from Linear - JIRA_CONNECTOR: Indexes issues and comments from Jira - DISCORD_CONNECTOR: Indexes messages from all accessible Discord channels - + Args: connector_id: ID of the connector to use search_space_id: ID of the search space to store indexed content background_tasks: FastAPI background tasks - + Returns: Dictionary with indexing status """ try: # Check if the connector belongs to the user - connector = await check_ownership(session, SearchSourceConnector, connector_id, user) - + connector = await check_ownership( + session, SearchSourceConnector, connector_id, user + ) + # Check if the search space belongs to the user - search_space = await check_ownership(session, SearchSpace, search_space_id, user) - + search_space = await check_ownership( + session, SearchSpace, search_space_id, user + ) + # Handle different connector types response_message = "" today_str = datetime.now().strftime("%Y-%m-%d") - + # Determine the actual date range to use if start_date is None: # Use last_indexed_at or default to 365 days ago @@ -317,10 +375,12 @@ async def index_connector_content( else: indexing_from = connector.last_indexed_at.strftime("%Y-%m-%d") else: - indexing_from = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d") + indexing_from = (datetime.now() - timedelta(days=365)).strftime( + "%Y-%m-%d" + ) else: indexing_from = start_date - + if end_date is None: indexing_to = today_str else: @@ -328,32 +388,77 @@ async def index_connector_content( if connector.connector_type == SearchSourceConnectorType.SLACK_CONNECTOR: # Run indexing in background - logger.info(f"Triggering Slack indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}") - background_tasks.add_task(run_slack_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to) + logger.info( + f"Triggering Slack indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_slack_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) response_message = "Slack indexing started in the background." elif connector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR: # Run indexing in background - logger.info(f"Triggering Notion indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}") - background_tasks.add_task(run_notion_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to) + logger.info( + f"Triggering Notion indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_notion_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) response_message = "Notion indexing started in the background." - + elif connector.connector_type == SearchSourceConnectorType.GITHUB_CONNECTOR: # Run indexing in background - logger.info(f"Triggering GitHub indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}") - background_tasks.add_task(run_github_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to) + logger.info( + f"Triggering GitHub indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_github_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) response_message = "GitHub indexing started in the background." - + elif connector.connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR: # Run indexing in background - logger.info(f"Triggering Linear indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}") - background_tasks.add_task(run_linear_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to) + logger.info( + f"Triggering Linear indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_linear_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) response_message = "Linear indexing started in the background." elif connector.connector_type == SearchSourceConnectorType.JIRA_CONNECTOR: # Run indexing in background - logger.info(f"Triggering Jira indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}") - background_tasks.add_task(run_jira_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to) + logger.info( + f"Triggering Jira indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_jira_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) response_message = "Jira indexing started in the background." elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: @@ -362,71 +467,83 @@ async def index_connector_content( f"Triggering Discord indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" ) background_tasks.add_task( - run_discord_indexing_with_new_session, connector_id, search_space_id, str(user.id), indexing_from, indexing_to + run_discord_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, ) response_message = "Discord indexing started in the background." else: raise HTTPException( status_code=400, - detail=f"Indexing not supported for connector type: {connector.connector_type}" + detail=f"Indexing not supported for connector type: {connector.connector_type}", ) return { - "message": response_message, - "connector_id": connector_id, + "message": response_message, + "connector_id": connector_id, "search_space_id": search_space_id, "indexing_from": indexing_from, - "indexing_to": indexing_to + "indexing_to": indexing_to, } except HTTPException: raise except Exception as e: - logger.error(f"Failed to initiate indexing for connector {connector_id}: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to initiate indexing: {str(e)}" + logger.error( + f"Failed to initiate indexing for connector {connector_id}: {e}", + exc_info=True, ) - -async def update_connector_last_indexed( - session: AsyncSession, - connector_id: int -): + raise HTTPException( + status_code=500, detail=f"Failed to initiate indexing: {str(e)}" + ) + + +async def update_connector_last_indexed(session: AsyncSession, connector_id: int): """ Update the last_indexed_at timestamp for a connector. - + Args: session: Database session connector_id: ID of the connector to update """ try: result = await session.execute( - select(SearchSourceConnector) - .filter(SearchSourceConnector.id == connector_id) + select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id + ) ) connector = result.scalars().first() - + if connector: connector.last_indexed_at = datetime.now() await session.commit() logger.info(f"Updated last_indexed_at for connector {connector_id}") except Exception as e: - logger.error(f"Failed to update last_indexed_at for connector {connector_id}: {str(e)}") + logger.error( + f"Failed to update last_indexed_at for connector {connector_id}: {str(e)}" + ) await session.rollback() + async def run_slack_indexing_with_new_session( connector_id: int, search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """ Create a new session and run the Slack indexing task. This prevents session leaks by creating a dedicated session for the background task. """ async with async_session_maker() as session: - await run_slack_indexing(session, connector_id, search_space_id, user_id, start_date, end_date) + await run_slack_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + async def run_slack_indexing( session: AsyncSession, @@ -434,11 +551,11 @@ async def run_slack_indexing( search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """ Background task to run Slack indexing. - + Args: session: Database session connector_id: ID of the Slack connector @@ -456,31 +573,39 @@ async def run_slack_indexing( user_id=user_id, start_date=start_date, end_date=end_date, - update_last_indexed=False # Don't update timestamp in the indexing function + update_last_indexed=False, # Don't update timestamp in the indexing function ) - + # Only update last_indexed_at if indexing was successful (either new docs or updated docs) if documents_processed > 0: await update_connector_last_indexed(session, connector_id) - logger.info(f"Slack indexing completed successfully: {documents_processed} documents processed") + logger.info( + f"Slack indexing completed successfully: {documents_processed} documents processed" + ) else: - logger.error(f"Slack indexing failed or no documents processed: {error_or_warning}") + logger.error( + f"Slack indexing failed or no documents processed: {error_or_warning}" + ) except Exception as e: logger.error(f"Error in background Slack indexing task: {str(e)}") + async def run_notion_indexing_with_new_session( connector_id: int, search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """ Create a new session and run the Notion indexing task. This prevents session leaks by creating a dedicated session for the background task. """ async with async_session_maker() as session: - await run_notion_indexing(session, connector_id, search_space_id, user_id, start_date, end_date) + await run_notion_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + async def run_notion_indexing( session: AsyncSession, @@ -488,11 +613,11 @@ async def run_notion_indexing( search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """ Background task to run Notion indexing. - + Args: session: Database session connector_id: ID of the Notion connector @@ -510,112 +635,158 @@ async def run_notion_indexing( user_id=user_id, start_date=start_date, end_date=end_date, - update_last_indexed=False # Don't update timestamp in the indexing function + update_last_indexed=False, # Don't update timestamp in the indexing function ) - + # Only update last_indexed_at if indexing was successful (either new docs or updated docs) if documents_processed > 0: await update_connector_last_indexed(session, connector_id) - logger.info(f"Notion indexing completed successfully: {documents_processed} documents processed") + logger.info( + f"Notion indexing completed successfully: {documents_processed} documents processed" + ) else: - logger.error(f"Notion indexing failed or no documents processed: {error_or_warning}") + logger.error( + f"Notion indexing failed or no documents processed: {error_or_warning}" + ) except Exception as e: logger.error(f"Error in background Notion indexing task: {str(e)}") + # Add new helper functions for GitHub indexing async def run_github_indexing_with_new_session( connector_id: int, search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """Wrapper to run GitHub indexing with its own database session.""" - logger.info(f"Background task started: Indexing GitHub connector {connector_id} into space {search_space_id} from {start_date} to {end_date}") + logger.info( + f"Background task started: Indexing GitHub connector {connector_id} into space {search_space_id} from {start_date} to {end_date}" + ) async with async_session_maker() as session: - await run_github_indexing(session, connector_id, search_space_id, user_id, start_date, end_date) + await run_github_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) logger.info(f"Background task finished: Indexing GitHub connector {connector_id}") + async def run_github_indexing( session: AsyncSession, connector_id: int, search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """Runs the GitHub indexing task and updates the timestamp.""" try: indexed_count, error_message = await index_github_repos( - session, connector_id, search_space_id, user_id, start_date, end_date, update_last_indexed=False + session, + connector_id, + search_space_id, + user_id, + start_date, + end_date, + update_last_indexed=False, ) if error_message: - logger.error(f"GitHub indexing failed for connector {connector_id}: {error_message}") + logger.error( + f"GitHub indexing failed for connector {connector_id}: {error_message}" + ) # Optionally update status in DB to indicate failure else: - logger.info(f"GitHub indexing successful for connector {connector_id}. Indexed {indexed_count} documents.") + logger.info( + f"GitHub indexing successful for connector {connector_id}. Indexed {indexed_count} documents." + ) # Update the last indexed timestamp only on success await update_connector_last_indexed(session, connector_id) - await session.commit() # Commit timestamp update + await session.commit() # Commit timestamp update except Exception as e: await session.rollback() - logger.error(f"Critical error in run_github_indexing for connector {connector_id}: {e}", exc_info=True) + logger.error( + f"Critical error in run_github_indexing for connector {connector_id}: {e}", + exc_info=True, + ) # Optionally update status in DB to indicate failure + # Add new helper functions for Linear indexing async def run_linear_indexing_with_new_session( connector_id: int, search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """Wrapper to run Linear indexing with its own database session.""" - logger.info(f"Background task started: Indexing Linear connector {connector_id} into space {search_space_id} from {start_date} to {end_date}") + logger.info( + f"Background task started: Indexing Linear connector {connector_id} into space {search_space_id} from {start_date} to {end_date}" + ) async with async_session_maker() as session: - await run_linear_indexing(session, connector_id, search_space_id, user_id, start_date, end_date) + await run_linear_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) logger.info(f"Background task finished: Indexing Linear connector {connector_id}") + async def run_linear_indexing( session: AsyncSession, connector_id: int, search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """Runs the Linear indexing task and updates the timestamp.""" try: indexed_count, error_message = await index_linear_issues( - session, connector_id, search_space_id, user_id, start_date, end_date, update_last_indexed=False + session, + connector_id, + search_space_id, + user_id, + start_date, + end_date, + update_last_indexed=False, ) if error_message: - logger.error(f"Linear indexing failed for connector {connector_id}: {error_message}") + logger.error( + f"Linear indexing failed for connector {connector_id}: {error_message}" + ) # Optionally update status in DB to indicate failure else: - logger.info(f"Linear indexing successful for connector {connector_id}. Indexed {indexed_count} documents.") + logger.info( + f"Linear indexing successful for connector {connector_id}. Indexed {indexed_count} documents." + ) # Update the last indexed timestamp only on success await update_connector_last_indexed(session, connector_id) - await session.commit() # Commit timestamp update + await session.commit() # Commit timestamp update except Exception as e: await session.rollback() - logger.error(f"Critical error in run_linear_indexing for connector {connector_id}: {e}", exc_info=True) + logger.error( + f"Critical error in run_linear_indexing for connector {connector_id}: {e}", + exc_info=True, + ) # Optionally update status in DB to indicate failure + # Add new helper functions for discord indexing async def run_discord_indexing_with_new_session( connector_id: int, search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """ Create a new session and run the Discord indexing task. This prevents session leaks by creating a dedicated session for the background task. """ async with async_session_maker() as session: - await run_discord_indexing(session, connector_id, search_space_id, user_id, start_date, end_date) + await run_discord_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + async def run_discord_indexing( session: AsyncSession, @@ -623,7 +794,7 @@ async def run_discord_indexing( search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """ Background task to run Discord indexing. @@ -644,15 +815,19 @@ async def run_discord_indexing( user_id=user_id, start_date=start_date, end_date=end_date, - update_last_indexed=False # Don't update timestamp in the indexing function + update_last_indexed=False, # Don't update timestamp in the indexing function ) # Only update last_indexed_at if indexing was successful (either new docs or updated docs) if documents_processed > 0: await update_connector_last_indexed(session, connector_id) - logger.info(f"Discord indexing completed successfully: {documents_processed} documents processed") + logger.info( + f"Discord indexing completed successfully: {documents_processed} documents processed" + ) else: - logger.error(f"Discord indexing failed or no documents processed: {error_or_warning}") + logger.error( + f"Discord indexing failed or no documents processed: {error_or_warning}" + ) except Exception as e: logger.error(f"Error in background Discord indexing task: {str(e)}") @@ -663,36 +838,53 @@ async def run_jira_indexing_with_new_session( search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """Wrapper to run Jira indexing with its own database session.""" - logger.info(f"Background task started: Indexing Jira connector {connector_id} into space {search_space_id} from {start_date} to {end_date}") + logger.info( + f"Background task started: Indexing Jira connector {connector_id} into space {search_space_id} from {start_date} to {end_date}" + ) async with async_session_maker() as session: - await run_jira_indexing(session, connector_id, search_space_id, user_id, start_date, end_date) + await run_jira_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) logger.info(f"Background task finished: Indexing Jira connector {connector_id}") + async def run_jira_indexing( session: AsyncSession, connector_id: int, search_space_id: int, user_id: str, start_date: str, - end_date: str + end_date: str, ): """Runs the Jira indexing task and updates the timestamp.""" try: indexed_count, error_message = await index_jira_issues( - session, connector_id, search_space_id, user_id, start_date, end_date, update_last_indexed=False + session, + connector_id, + search_space_id, + user_id, + start_date, + end_date, + update_last_indexed=False, ) if error_message: - logger.error(f"Jira indexing failed for connector {connector_id}: {error_message}") + logger.error( + f"Jira indexing failed for connector {connector_id}: {error_message}" + ) # Optionally update status in DB to indicate failure else: - logger.info(f"Jira indexing successful for connector {connector_id}. Indexed {indexed_count} documents.") + logger.info( + f"Jira indexing successful for connector {connector_id}. Indexed {indexed_count} documents." + ) # Update the last indexed timestamp only on success await update_connector_last_indexed(session, connector_id) - await session.commit() # Commit timestamp update + await session.commit() # Commit timestamp update except Exception as e: - await session.rollback() - logger.error(f"Critical error in run_jira_indexing for connector {connector_id}: {e}", exc_info=True) - # Optionally update status in DB to indicate failure \ No newline at end of file + logger.error( + f"Critical error in run_jira_indexing for connector {connector_id}: {e}", + exc_info=True, + ) + # Optionally update status in DB to indicate failure diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 4529366..b0071ba 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -992,7 +992,7 @@ class ConnectorService: # Early return if no results if not jira_chunks: return { - "id": 10, + "id": 30, "name": "Jira Issues", "type": "JIRA_CONNECTOR", "sources": [], diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/page.tsx index 3a4bd33..1b66684 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/page.tsx @@ -60,7 +60,7 @@ import { IconBrandSlack, IconBrandYoutube, IconLayoutKanban, - IconBrandTrello, + IconTicket, } from "@tabler/icons-react"; import { ColumnDef, @@ -178,7 +178,7 @@ const documentTypeIcons = { YOUTUBE_VIDEO: IconBrandYoutube, GITHUB_CONNECTOR: IconBrandGithub, LINEAR_CONNECTOR: IconLayoutKanban, - JIRA_CONNECTOR: IconBrandTrello, + JIRA_CONNECTOR: IconTicket, DISCORD_CONNECTOR: IconBrandDiscord, } as const;