diff --git a/surfsense_backend/alembic/versions/30_add_elasticsearch_connector_enums.py b/surfsense_backend/alembic/versions/31_add_elasticsearch_connector_enums.py similarity index 94% rename from surfsense_backend/alembic/versions/30_add_elasticsearch_connector_enums.py rename to surfsense_backend/alembic/versions/31_add_elasticsearch_connector_enums.py index 698d6d9a0..82311054b 100644 --- a/surfsense_backend/alembic/versions/30_add_elasticsearch_connector_enums.py +++ b/surfsense_backend/alembic/versions/31_add_elasticsearch_connector_enums.py @@ -1,7 +1,7 @@ """Add ElasticSearch connector enums -Revision ID: 30 -Revises: 29 +Revision ID: 31 +Revises: 30 """ from collections.abc import Sequence @@ -9,8 +9,8 @@ from collections.abc import Sequence from alembic import op # revision identifiers -revision: str = "30" -down_revision: str | None = "29" +revision: str = "31" +down_revision: str | None = "30" branch_labels: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None diff --git a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py index 9c35f90cc..bad0fa813 100644 --- a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py +++ b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py @@ -34,6 +34,7 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel - NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management) - YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos) - GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions) +- ELASTICSEARCH_CONNECTOR: "Elasticsearch indexed documents and data" (personal Elasticsearch instances and custom data sources) - LINEAR_CONNECTOR: "Linear project issues and discussions" (personal project management) - JIRA_CONNECTOR: "Jira project issues, tickets, and comments" (personal project tracking) - CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation) diff --git a/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py b/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py index 3c34eb474..a0134bf7d 100644 --- a/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py +++ b/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py @@ -35,6 +35,7 @@ You are SurfSense, an advanced AI research assistant that synthesizes informatio - NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management) - YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos) - GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions) +- ELASTICSEARCH_CONNECTOR: "Elasticsearch documents and indices (indexed content from your ES connector)" (personal search index) - LINEAR_CONNECTOR: "Linear project issues and discussions" (personal project management) - JIRA_CONNECTOR: "Jira project issues, tickets, and comments" (personal project tracking) - CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation) diff --git a/surfsense_backend/app/connectors/elasticsearch_connector.py b/surfsense_backend/app/connectors/elasticsearch_connector.py index af5e30608..23b58c4f6 100644 --- a/surfsense_backend/app/connectors/elasticsearch_connector.py +++ b/surfsense_backend/app/connectors/elasticsearch_connector.py @@ -117,8 +117,11 @@ class ElasticsearchConnector: response = await self.client.search(index=index, body=search_body) + total_hits = response.get("hits", {}).get("total", {}) + # normalize total value (could be dict or int depending on server) + total_val = total_hits.get("value", total_hits) if isinstance(total_hits, dict) else total_hits logger.info( - f"Successfully searched index '{index}', found {response['hits']['total']['value']} results" + f"Successfully searched index '{index}', found {total_val} results" ) return response @@ -202,7 +205,7 @@ class ElasticsearchConnector: ) scroll_id = response.get("_scroll_id") - hits = response["hits"]["hits"] + hits = response.get("hits", {}).get("hits", []) while hits: for hit in hits: @@ -210,18 +213,19 @@ class ElasticsearchConnector: # Continue scrolling if scroll_id: - response = await self.client.scroll( - scroll_id=scroll_id, scroll=scroll_timeout - ) + response = await self.client.scroll(scroll_id=scroll_id, scroll=scroll_timeout) scroll_id = response.get("_scroll_id") - hits = response["hits"]["hits"] + hits = response.get("hits", {}).get("hits", []) # Clear scroll if scroll_id: - await self.client.clear_scroll(scroll_id=scroll_id) + try: + await self.client.clear_scroll(scroll_id=scroll_id) + except Exception: + logger.debug("Failed to clear scroll id (non-fatal)") except Exception as e: - logger.error(f"Scroll search failed: {e}") + logger.error(f"Scroll search failed: {e}", exc_info=True) raise async def count_documents( diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index ba81a53f4..43dccf093 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -13,9 +13,14 @@ from sqlalchemy.future import select from app.connectors.elasticsearch_connector import ElasticsearchConnector from app.db import Document, DocumentType, SearchSourceConnector -from app.utils.document_converters import create_document_chunks +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_unique_identifier_hash, +) -from .base import check_document_by_unique_identifier +from .base import check_document_by_unique_identifier, check_duplicate_document_by_hash logger = logging.getLogger(__name__) @@ -43,6 +48,19 @@ async def index_elasticsearch_documents( Returns: Tuple of (number of documents processed, error message if any) """ + task_logger = TaskLoggingService(session, search_space_id) + log_entry = await task_logger.log_task_start( + task_name="elasticsearch_indexing", + source="connector_indexing_task", + message=f"Starting Elasticsearch indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "index": None, + "start_date": start_date, + "end_date": end_date, + }, + ) es_connector = None try: # Get the connector configuration @@ -56,6 +74,9 @@ async def index_elasticsearch_documents( if not connector: error_msg = f"Elasticsearch connector with ID {connector_id} not found" logger.error(error_msg) + await task_logger.log_task_failure( + log_entry, "Connector not found", error_msg, {"connector_id": connector_id} + ) return 0, error_msg # Get connector configuration @@ -75,6 +96,11 @@ async def index_elasticsearch_documents( logger.info( "ELASTICSEARCH_INDEX missing or empty in connector config; defaulting to '*' (search all indices)" ) + await task_logger.log_task_progress( + log_entry, + "Using default index", + {"index": index_name, "stage": "index_defaulted"}, + ) # Check authentication - must have either API key or username+password has_api_key = ( @@ -104,6 +130,11 @@ async def index_elasticsearch_documents( verify_certs=config.get("ELASTICSEARCH_VERIFY_CERTS", True), ca_certs=config.get("ELASTICSEARCH_CA_CERTS"), ) + await task_logger.log_task_progress( + log_entry, + "Initialized Elasticsearch connector", + {"index": index_name, "stage": "connector_initialized"}, + ) # Build query based on configuration query = _build_elasticsearch_query(config) @@ -118,6 +149,11 @@ async def index_elasticsearch_documents( documents_processed = 0 try: + await task_logger.log_task_progress( + log_entry, + "Starting scroll search", + {"index": index_name, "stage": "scroll_start", "max_documents": max_documents}, + ) # Use scroll search for large result sets async for hit in es_connector.scroll_search( index=index_name, @@ -154,7 +190,7 @@ async def index_elasticsearch_documents( continue # Create content hash - content_hash = hashlib.sha256(content.encode()).hexdigest() + content_hash = generate_content_hash(content, search_space_id) # Build metadata metadata = { @@ -171,23 +207,51 @@ async def index_elasticsearch_documents( if field in source: metadata[f"es_{field}"] = source[field] - # Check if document already exists - existing_doc = await check_document_by_unique_identifier( - session, - DocumentType.ELASTICSEARCH_CONNECTOR, - content_hash, - search_space_id, + # Build source-unique identifier and hash (prefer source id dedupe) + source_identifier = f"{hit.get('_index', index_name)}:{doc_id}" + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.ELASTICSEARCH_CONNECTOR, source_identifier, search_space_id ) - if existing_doc: - logger.debug(f"Document {doc_id} already exists, skipping") - continue + # Two-step duplicate detection: first by source-unique id, then by content hash + existing_doc = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + if not existing_doc: + existing_doc = await check_duplicate_document_by_hash( + session, content_hash + ) + if existing_doc: + # If content is unchanged, skip. Otherwise update the existing document. + if existing_doc.content_hash == content_hash: + logger.info( + f"Skipping ES doc {doc_id} — already indexed (doc id {existing_doc.id})" + ) + continue + else: + logger.info( + f"Updating existing document {existing_doc.id} for ES doc {doc_id}" + ) + existing_doc.title = title + existing_doc.content = content + existing_doc.content_hash = content_hash + existing_doc.document_metadata = metadata + existing_doc.unique_identifier_hash = unique_identifier_hash + chunks = await create_document_chunks(content) + existing_doc.chunks = chunks + await session.flush() + documents_processed += 1 + if documents_processed % 10 == 0: + await session.commit() + continue + # Create document document = Document( title=title, content=content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, document_type=DocumentType.ELASTICSEARCH_CONNECTOR, document_metadata=metadata, search_space_id=search_space_id, @@ -208,28 +272,38 @@ async def index_elasticsearch_documents( await session.commit() except Exception as e: - logger.error( - f"Error processing Elasticsearch document {hit.get('_id', 'unknown')}: {e}" + msg = f"Error processing Elasticsearch document {hit.get('_id', 'unknown')}: {e}" + logger.error(msg) + await task_logger.log_task_failure( + log_entry, + "Document processing error", + msg, + {"document_id": hit.get("_id", "unknown"), "error_type": type(e).__name__}, ) continue # Final commit await session.commit() + await task_logger.log_task_success( + log_entry, + f"Successfully indexed {documents_processed} documents from Elasticsearch", + {"documents_indexed": documents_processed, "index": index_name}, + ) logger.info( f"Successfully indexed {documents_processed} documents from Elasticsearch" ) # Update last indexed timestamp if requested if update_last_indexed and documents_processed > 0: - connector.last_indexed_at = datetime.now() + # connector.last_indexed_at = datetime.now() + connector.last_indexed_at = datetime.now(UTC).isoformat().replace("+00:00", "Z") await session.commit() - if update_last_indexed and documents_processed > 0: - # store ISO-8601 UTC timestamp with 'Z' suffix, e.g. 2025-10-09T22:04:53.599658Z - connector.last_indexed_at = ( - datetime.now(UTC).isoformat().replace("+00:00", "Z") + await task_logger.log_task_progress( + log_entry, + "Updated connector.last_indexed_at", + {"last_indexed_at": connector.last_indexed_at}, ) - await session.commit() return documents_processed, None @@ -241,6 +315,9 @@ async def index_elasticsearch_documents( except Exception as e: error_msg = f"Error indexing Elasticsearch documents: {e}" logger.error(error_msg, exc_info=True) + await task_logger.log_task_failure( + log_entry, "Indexing failed", error_msg, {"error_type": type(e).__name__} + ) await session.rollback() if es_connector: await es_connector.close() diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx index 4f34b9c22..ccf02f107 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx @@ -64,6 +64,7 @@ const connectorCategories: ConnectorCategory[] = [ title: "Elasticsearch", description: "Connect to Elasticsearch to index and search documents, logs and metrics.", icon: getConnectorIcon(EnumConnectorName.ELASTICSEARCH_CONNECTOR, "h-6 w-6"), + status: "available", }, { id: "baidu-search-api",