diff --git a/surfsense_backend/README.md b/surfsense_backend/README.md index 879fa43..f78ec7d 100644 --- a/surfsense_backend/README.md +++ b/surfsense_backend/README.md @@ -110,7 +110,6 @@ See pyproject.toml for detailed dependency information. Key dependencies include - fastapi and related packages - fastapi-users: Authentication and user management - firecrawl-py: Web crawling capabilities -- gpt-researcher: Advanced research capabilities - langchain components for AI workflows - litellm: LLM model integration - pgvector: Vector similarity search in PostgreSQL diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 1b42d71..4c3bc72 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -143,7 +143,8 @@ async def fetch_relevant_documents( connectors_to_search: List[str], writer: StreamWriter = None, state: State = None, - top_k: int = 10 + top_k: int = 10, + connector_service: ConnectorService = None ) -> List[Dict[str, Any]]: """ Fetch relevant documents for research questions using the provided connectors. @@ -162,7 +163,7 @@ async def fetch_relevant_documents( List of relevant documents """ # Initialize services - connector_service = ConnectorService(db_session) + # connector_service = ConnectorService(db_session) # Only use streaming if both writer and state are provided streaming_service = state.streaming_service if state is not None else None @@ -494,10 +495,12 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW elif configuration.num_sections == 6: TOP_K = 30 - relevant_documents = [] async with async_session_maker() as db_session: try: + # Create connector service inside the db_session scope + connector_service = ConnectorService(db_session) + relevant_documents = await fetch_relevant_documents( research_questions=all_questions, user_id=configuration.user_id, @@ -506,7 +509,8 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW connectors_to_search=configuration.connectors_to_search, writer=writer, state=state, - top_k=TOP_K + top_k=TOP_K, + connector_service=connector_service ) except Exception as e: error_message = f"Error fetching relevant documents: {str(e)}" diff --git a/surfsense_backend/app/agents/researcher/sub_section_writer/nodes.py b/surfsense_backend/app/agents/researcher/sub_section_writer/nodes.py index 0bec461..f1d50ae 100644 --- a/surfsense_backend/app/agents/researcher/sub_section_writer/nodes.py +++ b/surfsense_backend/app/agents/researcher/sub_section_writer/nodes.py @@ -102,7 +102,7 @@ async def write_sub_section(state: State, config: RunnableConfig) -> Dict[str, A # Extract content and metadata content = doc.get("content", "") doc_info = doc.get("document", {}) - document_id = doc_info.get("id", f"{i+1}") # Use document ID or index+1 as source_id + document_id = doc_info.get("id") # Use document ID # Format document according to the citation system prompt's expected format formatted_doc = f""" diff --git a/surfsense_backend/app/utils/connector_service.py b/surfsense_backend/app/utils/connector_service.py index 23e3035..c7ad692 100644 --- a/surfsense_backend/app/utils/connector_service.py +++ b/surfsense_backend/app/utils/connector_service.py @@ -1,5 +1,6 @@ import json from typing import List, Dict, Any, Optional, Tuple +import asyncio from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from app.retriver.chunks_hybrid_search import ChucksHybridSearchRetriever @@ -13,6 +14,7 @@ class ConnectorService: self.session = session self.retriever = ChucksHybridSearchRetriever(session) self.source_id_counter = 1 + self.counter_lock = asyncio.Lock() # Lock to protect counter in multithreaded environments async def search_crawled_urls(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20) -> tuple: """ @@ -29,25 +31,35 @@ class ConnectorService: document_type="CRAWLED_URL" ) + # Early return if no results + if not crawled_urls_chunks: + return { + "id": 1, + "name": "Crawled URLs", + "type": "CRAWLED_URL", + "sources": [], + }, [] + # Process each chunk and create sources directly without deduplication sources_list = [] - for i, chunk in enumerate(crawled_urls_chunks): - # Fix for UI - crawled_urls_chunks[i]['document']['id'] = self.source_id_counter - # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + async with self.counter_lock: + for i, chunk in enumerate(crawled_urls_chunks): + # Fix for UI + crawled_urls_chunks[i]['document']['id'] = self.source_id_counter + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) - # Create a source entry - source = { - "id": self.source_id_counter, - "title": document.get('title', 'Untitled Document'), - "description": metadata.get('og:description', metadata.get('ogDescription', chunk.get('content', '')[:100])), - "url": metadata.get('url', '') - } + # Create a source entry + source = { + "id": self.source_id_counter, + "title": document.get('title', 'Untitled Document'), + "description": metadata.get('og:description', metadata.get('ogDescription', chunk.get('content', '')[:100])), + "url": metadata.get('url', '') + } - self.source_id_counter += 1 - sources_list.append(source) + self.source_id_counter += 1 + sources_list.append(source) # Create result object result_object = { @@ -73,26 +85,36 @@ class ConnectorService: search_space_id=search_space_id, document_type="FILE" ) + + # Early return if no results + if not files_chunks: + return { + "id": 2, + "name": "Files", + "type": "FILE", + "sources": [], + }, [] # Process each chunk and create sources directly without deduplication sources_list = [] - for i, chunk in enumerate(files_chunks): - # Fix for UI - files_chunks[i]['document']['id'] = self.source_id_counter - # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + async with self.counter_lock: + for i, chunk in enumerate(files_chunks): + # Fix for UI + files_chunks[i]['document']['id'] = self.source_id_counter + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) - # Create a source entry - source = { - "id": self.source_id_counter, - "title": document.get('title', 'Untitled Document'), - "description": metadata.get('og:description', metadata.get('ogDescription', chunk.get('content', '')[:100])), - "url": metadata.get('url', '') - } + # Create a source entry + source = { + "id": self.source_id_counter, + "title": document.get('title', 'Untitled Document'), + "description": metadata.get('og:description', metadata.get('ogDescription', chunk.get('content', '')[:100])), + "url": metadata.get('url', '') + } - self.source_id_counter += 1 - sources_list.append(source) + self.source_id_counter += 1 + sources_list.append(source) # Create result object result_object = { @@ -163,39 +185,49 @@ class ConnectorService: # Extract results from Tavily response tavily_results = response.get("results", []) + # Early return if no results + if not tavily_results: + return { + "id": 3, + "name": "Tavily Search", + "type": "TAVILY_API", + "sources": [], + }, [] + # Process each result and create sources directly without deduplication sources_list = [] documents = [] - for i, result in enumerate(tavily_results): - - # Create a source entry - source = { - "id": self.source_id_counter, - "title": result.get("title", "Tavily Result"), - "description": result.get("content", "")[:100], - "url": result.get("url", "") - } - sources_list.append(source) - - # Create a document entry - document = { - "chunk_id": f"tavily_chunk_{i}", - "content": result.get("content", ""), - "score": result.get("score", 0.0), - "document": { + async with self.counter_lock: + for i, result in enumerate(tavily_results): + + # Create a source entry + source = { "id": self.source_id_counter, "title": result.get("title", "Tavily Result"), - "document_type": "TAVILY_API", - "metadata": { - "url": result.get("url", ""), - "published_date": result.get("published_date", ""), - "source": "TAVILY_API" + "description": result.get("content", "")[:100], + "url": result.get("url", "") + } + sources_list.append(source) + + # Create a document entry + document = { + "chunk_id": f"tavily_chunk_{i}", + "content": result.get("content", ""), + "score": result.get("score", 0.0), + "document": { + "id": self.source_id_counter, + "title": result.get("title", "Tavily Result"), + "document_type": "TAVILY_API", + "metadata": { + "url": result.get("url", ""), + "published_date": result.get("published_date", ""), + "source": "TAVILY_API" + } } } - } - documents.append(document) - self.source_id_counter += 1 + documents.append(document) + self.source_id_counter += 1 # Create result object result_object = { @@ -231,45 +263,55 @@ class ConnectorService: search_space_id=search_space_id, document_type="SLACK_CONNECTOR" ) + + # Early return if no results + if not slack_chunks: + return { + "id": 4, + "name": "Slack", + "type": "SLACK_CONNECTOR", + "sources": [], + }, [] # Process each chunk and create sources directly without deduplication sources_list = [] - for i, chunk in enumerate(slack_chunks): - # Fix for UI - slack_chunks[i]['document']['id'] = self.source_id_counter - # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + async with self.counter_lock: + for i, chunk in enumerate(slack_chunks): + # Fix for UI + slack_chunks[i]['document']['id'] = self.source_id_counter + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) - # Create a mapped source entry with Slack-specific metadata - channel_name = metadata.get('channel_name', 'Unknown Channel') - channel_id = metadata.get('channel_id', '') - message_date = metadata.get('start_date', '') - - # Create a more descriptive title for Slack messages - title = f"Slack: {channel_name}" - if message_date: - title += f" ({message_date})" + # Create a mapped source entry with Slack-specific metadata + channel_name = metadata.get('channel_name', 'Unknown Channel') + channel_id = metadata.get('channel_id', '') + message_date = metadata.get('start_date', '') - # Create a more descriptive description for Slack messages - description = chunk.get('content', '')[:100] - if len(description) == 100: - description += "..." - - # For URL, we can use a placeholder or construct a URL to the Slack channel if available - url = "" - if channel_id: - url = f"https://slack.com/app_redirect?channel={channel_id}" + # Create a more descriptive title for Slack messages + title = f"Slack: {channel_name}" + if message_date: + title += f" ({message_date})" + + # Create a more descriptive description for Slack messages + description = chunk.get('content', '')[:100] + if len(description) == 100: + description += "..." + + # For URL, we can use a placeholder or construct a URL to the Slack channel if available + url = "" + if channel_id: + url = f"https://slack.com/app_redirect?channel={channel_id}" - source = { - "id": self.source_id_counter, - "title": title, - "description": description, - "url": url, - } + source = { + "id": self.source_id_counter, + "title": title, + "description": description, + "url": url, + } - self.source_id_counter += 1 - sources_list.append(source) + self.source_id_counter += 1 + sources_list.append(source) # Create result object result_object = { @@ -301,47 +343,57 @@ class ConnectorService: search_space_id=search_space_id, document_type="NOTION_CONNECTOR" ) + + # Early return if no results + if not notion_chunks: + return { + "id": 5, + "name": "Notion", + "type": "NOTION_CONNECTOR", + "sources": [], + }, [] # Process each chunk and create sources directly without deduplication sources_list = [] - for i, chunk in enumerate(notion_chunks): - # Fix for UI - notion_chunks[i]['document']['id'] = self.source_id_counter - - # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) - - # Create a mapped source entry with Notion-specific metadata - page_title = metadata.get('page_title', 'Untitled Page') - page_id = metadata.get('page_id', '') - indexed_at = metadata.get('indexed_at', '') - - # Create a more descriptive title for Notion pages - title = f"Notion: {page_title}" - if indexed_at: - title += f" (indexed: {indexed_at})" + async with self.counter_lock: + for i, chunk in enumerate(notion_chunks): + # Fix for UI + notion_chunks[i]['document']['id'] = self.source_id_counter - # Create a more descriptive description for Notion pages - description = chunk.get('content', '')[:100] - if len(description) == 100: - description += "..." + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) + + # Create a mapped source entry with Notion-specific metadata + page_title = metadata.get('page_title', 'Untitled Page') + page_id = metadata.get('page_id', '') + indexed_at = metadata.get('indexed_at', '') - # For URL, we can use a placeholder or construct a URL to the Notion page if available - url = "" - if page_id: - # Notion page URLs follow this format - url = f"https://notion.so/{page_id.replace('-', '')}" + # Create a more descriptive title for Notion pages + title = f"Notion: {page_title}" + if indexed_at: + title += f" (indexed: {indexed_at})" + + # Create a more descriptive description for Notion pages + description = chunk.get('content', '')[:100] + if len(description) == 100: + description += "..." + + # For URL, we can use a placeholder or construct a URL to the Notion page if available + url = "" + if page_id: + # Notion page URLs follow this format + url = f"https://notion.so/{page_id.replace('-', '')}" - source = { - "id": self.source_id_counter, - "title": title, - "description": description, - "url": url, - } + source = { + "id": self.source_id_counter, + "title": title, + "description": description, + "url": url, + } - self.source_id_counter += 1 - sources_list.append(source) + self.source_id_counter += 1 + sources_list.append(source) # Create result object result_object = { @@ -373,65 +425,75 @@ class ConnectorService: search_space_id=search_space_id, document_type="EXTENSION" ) + + # Early return if no results + if not extension_chunks: + return { + "id": 6, + "name": "Extension", + "type": "EXTENSION", + "sources": [], + }, [] # Process each chunk and create sources directly without deduplication sources_list = [] - for i, chunk in enumerate(extension_chunks): - # Fix for UI - extension_chunks[i]['document']['id'] = self.source_id_counter - - # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + async with self.counter_lock: + for i, chunk in enumerate(extension_chunks): + # Fix for UI + extension_chunks[i]['document']['id'] = self.source_id_counter + + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) - # Extract extension-specific metadata - webpage_title = metadata.get('VisitedWebPageTitle', 'Untitled Page') - webpage_url = metadata.get('VisitedWebPageURL', '') - visit_date = metadata.get('VisitedWebPageDateWithTimeInISOString', '') - visit_duration = metadata.get('VisitedWebPageVisitDurationInMilliseconds', '') - browsing_session_id = metadata.get('BrowsingSessionId', '') - - # Create a more descriptive title for extension data - title = webpage_title - if visit_date: - # Format the date for display (simplified) - try: - # Just extract the date part for display - formatted_date = visit_date.split('T')[0] if 'T' in visit_date else visit_date - title += f" (visited: {formatted_date})" - except: - # Fallback if date parsing fails - title += f" (visited: {visit_date})" + # Extract extension-specific metadata + webpage_title = metadata.get('VisitedWebPageTitle', 'Untitled Page') + webpage_url = metadata.get('VisitedWebPageURL', '') + visit_date = metadata.get('VisitedWebPageDateWithTimeInISOString', '') + visit_duration = metadata.get('VisitedWebPageVisitDurationInMilliseconds', '') + browsing_session_id = metadata.get('BrowsingSessionId', '') - # Create a more descriptive description for extension data - description = chunk.get('content', '')[:100] - if len(description) == 100: - description += "..." - - # Add visit duration if available - if visit_duration: - try: - duration_seconds = int(visit_duration) / 1000 - if duration_seconds < 60: - duration_text = f"{duration_seconds:.1f} seconds" - else: - duration_text = f"{duration_seconds/60:.1f} minutes" + # Create a more descriptive title for extension data + title = webpage_title + if visit_date: + # Format the date for display (simplified) + try: + # Just extract the date part for display + formatted_date = visit_date.split('T')[0] if 'T' in visit_date else visit_date + title += f" (visited: {formatted_date})" + except: + # Fallback if date parsing fails + title += f" (visited: {visit_date})" - if description: - description += f" | Duration: {duration_text}" - except: - # Fallback if duration parsing fails - pass + # Create a more descriptive description for extension data + description = chunk.get('content', '')[:100] + if len(description) == 100: + description += "..." + + # Add visit duration if available + if visit_duration: + try: + duration_seconds = int(visit_duration) / 1000 + if duration_seconds < 60: + duration_text = f"{duration_seconds:.1f} seconds" + else: + duration_text = f"{duration_seconds/60:.1f} minutes" + + if description: + description += f" | Duration: {duration_text}" + except: + # Fallback if duration parsing fails + pass - source = { - "id": self.source_id_counter, - "title": title, - "description": description, - "url": webpage_url - } + source = { + "id": self.source_id_counter, + "title": title, + "description": description, + "url": webpage_url + } - self.source_id_counter += 1 - sources_list.append(source) + self.source_id_counter += 1 + sources_list.append(source) # Create result object result_object = { @@ -463,47 +525,57 @@ class ConnectorService: search_space_id=search_space_id, document_type="YOUTUBE_VIDEO" ) + + # Early return if no results + if not youtube_chunks: + return { + "id": 7, + "name": "YouTube Videos", + "type": "YOUTUBE_VIDEO", + "sources": [], + }, [] # Process each chunk and create sources directly without deduplication sources_list = [] - for i, chunk in enumerate(youtube_chunks): - # Fix for UI - youtube_chunks[i]['document']['id'] = self.source_id_counter - - # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) - - # Extract YouTube-specific metadata - video_title = metadata.get('video_title', 'Untitled Video') - video_id = metadata.get('video_id', '') - channel_name = metadata.get('channel_name', '') - published_date = metadata.get('published_date', '') - - # Create a more descriptive title for YouTube videos - title = video_title - if channel_name: - title += f" - {channel_name}" + async with self.counter_lock: + for i, chunk in enumerate(youtube_chunks): + # Fix for UI + youtube_chunks[i]['document']['id'] = self.source_id_counter - # Create a more descriptive description for YouTube videos - description = metadata.get('description', chunk.get('content', '')[:100]) - if len(description) == 100: - description += "..." + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) + + # Extract YouTube-specific metadata + video_title = metadata.get('video_title', 'Untitled Video') + video_id = metadata.get('video_id', '') + channel_name = metadata.get('channel_name', '') + published_date = metadata.get('published_date', '') - # For URL, construct a URL to the YouTube video - url = f"https://www.youtube.com/watch?v={video_id}" if video_id else "" + # Create a more descriptive title for YouTube videos + title = video_title + if channel_name: + title += f" - {channel_name}" + + # Create a more descriptive description for YouTube videos + description = metadata.get('description', chunk.get('content', '')[:100]) + if len(description) == 100: + description += "..." + + # For URL, construct a URL to the YouTube video + url = f"https://www.youtube.com/watch?v={video_id}" if video_id else "" - source = { - "id": self.source_id_counter, - "title": title, - "description": description, - "url": url, - "video_id": video_id, # Additional field for YouTube videos - "channel_name": channel_name # Additional field for YouTube videos - } + source = { + "id": self.source_id_counter, + "title": title, + "description": description, + "url": url, + "video_id": video_id, # Additional field for YouTube videos + "channel_name": channel_name # Additional field for YouTube videos + } - self.source_id_counter += 1 - sources_list.append(source) + self.source_id_counter += 1 + sources_list.append(source) # Create result object result_object = { @@ -529,27 +601,37 @@ class ConnectorService: search_space_id=search_space_id, document_type="GITHUB_CONNECTOR" ) + + # Early return if no results + if not github_chunks: + return { + "id": 8, + "name": "GitHub", + "type": "GITHUB_CONNECTOR", + "sources": [], + }, [] # Process each chunk and create sources directly without deduplication sources_list = [] - for i, chunk in enumerate(github_chunks): - # Fix for UI - assign a unique ID for citation/source tracking - github_chunks[i]['document']['id'] = self.source_id_counter - - # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) + async with self.counter_lock: + for i, chunk in enumerate(github_chunks): + # Fix for UI - assign a unique ID for citation/source tracking + github_chunks[i]['document']['id'] = self.source_id_counter + + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) - # Create a source entry - source = { - "id": self.source_id_counter, - "title": document.get('title', 'GitHub Document'), # Use specific title if available - "description": metadata.get('description', chunk.get('content', '')[:100]), # Use description or content preview - "url": metadata.get('url', '') # Use URL if available in metadata - } + # Create a source entry + source = { + "id": self.source_id_counter, + "title": document.get('title', 'GitHub Document'), # Use specific title if available + "description": metadata.get('description', chunk.get('content', '')[:100]), # Use description or content preview + "url": metadata.get('url', '') # Use URL if available in metadata + } - self.source_id_counter += 1 - sources_list.append(source) + self.source_id_counter += 1 + sources_list.append(source) # Create result object result_object = { @@ -581,59 +663,69 @@ class ConnectorService: search_space_id=search_space_id, document_type="LINEAR_CONNECTOR" ) + + # Early return if no results + if not linear_chunks: + return { + "id": 9, + "name": "Linear Issues", + "type": "LINEAR_CONNECTOR", + "sources": [], + }, [] # Process each chunk and create sources directly without deduplication sources_list = [] - for i, chunk in enumerate(linear_chunks): - # Fix for UI - linear_chunks[i]['document']['id'] = self.source_id_counter - - # Extract document metadata - document = chunk.get('document', {}) - metadata = document.get('metadata', {}) - - # Extract Linear-specific metadata - issue_identifier = metadata.get('issue_identifier', '') - issue_title = metadata.get('issue_title', 'Untitled Issue') - issue_state = metadata.get('state', '') - comment_count = metadata.get('comment_count', 0) - - # Create a more descriptive title for Linear issues - title = f"Linear: {issue_identifier} - {issue_title}" - if issue_state: - title += f" ({issue_state})" + async with self.counter_lock: + for i, chunk in enumerate(linear_chunks): + # Fix for UI + linear_chunks[i]['document']['id'] = self.source_id_counter - # Create a more descriptive description for Linear issues - description = chunk.get('content', '')[:100] - if len(description) == 100: - description += "..." - - # Add comment count info to description - if comment_count: - if description: - description += f" | Comments: {comment_count}" - else: - description = f"Comments: {comment_count}" - - # For URL, we could construct a URL to the Linear issue if we have the workspace info - # For now, use a generic placeholder - url = "" - if issue_identifier: - # This is a generic format, may need to be adjusted based on actual Linear workspace - url = f"https://linear.app/issue/{issue_identifier}" + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) - source = { - "id": self.source_id_counter, - "title": title, - "description": description, - "url": url, - "issue_identifier": issue_identifier, - "state": issue_state, - "comment_count": comment_count - } + # Extract Linear-specific metadata + issue_identifier = metadata.get('issue_identifier', '') + issue_title = metadata.get('issue_title', 'Untitled Issue') + issue_state = metadata.get('state', '') + comment_count = metadata.get('comment_count', 0) + + # Create a more descriptive title for Linear issues + title = f"Linear: {issue_identifier} - {issue_title}" + if issue_state: + title += f" ({issue_state})" + + # Create a more descriptive description for Linear issues + description = chunk.get('content', '')[:100] + if len(description) == 100: + description += "..." + + # Add comment count info to description + if comment_count: + if description: + description += f" | Comments: {comment_count}" + else: + description = f"Comments: {comment_count}" + + # For URL, we could construct a URL to the Linear issue if we have the workspace info + # For now, use a generic placeholder + url = "" + if issue_identifier: + # This is a generic format, may need to be adjusted based on actual Linear workspace + url = f"https://linear.app/issue/{issue_identifier}" - self.source_id_counter += 1 - sources_list.append(source) + source = { + "id": self.source_id_counter, + "title": title, + "description": description, + "url": url, + "issue_identifier": issue_identifier, + "state": issue_state, + "comment_count": comment_count + } + + self.source_id_counter += 1 + sources_list.append(source) # Create result object result_object = { @@ -697,38 +789,39 @@ class ConnectorService: sources_list = [] documents = [] - for i, result in enumerate(linkup_results): - # Only process results that have content - if not hasattr(result, 'content') or not result.content: - continue - - # Create a source entry - source = { - "id": self.source_id_counter, - "title": result.name if hasattr(result, 'name') else "Linkup Result", - "description": result.content[:100] if hasattr(result, 'content') else "", - "url": result.url if hasattr(result, 'url') else "" - } - sources_list.append(source) - - # Create a document entry - document = { - "chunk_id": f"linkup_chunk_{i}", - "content": result.content if hasattr(result, 'content') else "", - "score": 1.0, # Default score since not provided by Linkup - "document": { + async with self.counter_lock: + for i, result in enumerate(linkup_results): + # Only process results that have content + if not hasattr(result, 'content') or not result.content: + continue + + # Create a source entry + source = { "id": self.source_id_counter, "title": result.name if hasattr(result, 'name') else "Linkup Result", - "document_type": "LINKUP_API", - "metadata": { - "url": result.url if hasattr(result, 'url') else "", - "type": result.type if hasattr(result, 'type') else "", - "source": "LINKUP_API" + "description": result.content[:100] if hasattr(result, 'content') else "", + "url": result.url if hasattr(result, 'url') else "" + } + sources_list.append(source) + + # Create a document entry + document = { + "chunk_id": f"linkup_chunk_{i}", + "content": result.content if hasattr(result, 'content') else "", + "score": 1.0, # Default score since not provided by Linkup + "document": { + "id": self.source_id_counter, + "title": result.name if hasattr(result, 'name') else "Linkup Result", + "document_type": "LINKUP_API", + "metadata": { + "url": result.url if hasattr(result, 'url') else "", + "type": result.type if hasattr(result, 'type') else "", + "source": "LINKUP_API" + } } } - } - documents.append(document) - self.source_id_counter += 1 + documents.append(document) + self.source_id_counter += 1 # Create result object result_object = {