feat: update Elasticsearch integration and logging

- revised Elasticsearch connector enum revision IDs
- added `TaskLoggingService` to elasticsearch_indexer
- integrated Elasticsearch into prompts.py as requested
This commit is contained in:
Anish Sarkar 2025-10-17 02:21:56 +05:30
parent 8e1e81ebae
commit 0ff1b586a2
6 changed files with 116 additions and 32 deletions

View file

@ -1,7 +1,7 @@
"""Add ElasticSearch connector enums
Revision ID: 30
Revises: 29
Revision ID: 31
Revises: 30
"""
from collections.abc import Sequence
@ -9,8 +9,8 @@ from collections.abc import Sequence
from alembic import op
# revision identifiers
revision: str = "30"
down_revision: str | None = "29"
revision: str = "31"
down_revision: str | None = "30"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

View file

@ -34,6 +34,7 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel
- NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management)
- YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos)
- GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions)
- ELASTICSEARCH_CONNECTOR: "Elasticsearch indexed documents and data" (personal Elasticsearch instances and custom data sources)
- LINEAR_CONNECTOR: "Linear project issues and discussions" (personal project management)
- JIRA_CONNECTOR: "Jira project issues, tickets, and comments" (personal project tracking)
- CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation)

View file

@ -35,6 +35,7 @@ You are SurfSense, an advanced AI research assistant that synthesizes informatio
- NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management)
- YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos)
- GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions)
- ELASTICSEARCH_CONNECTOR: "Elasticsearch documents and indices (indexed content from your ES connector)" (personal search index)
- LINEAR_CONNECTOR: "Linear project issues and discussions" (personal project management)
- JIRA_CONNECTOR: "Jira project issues, tickets, and comments" (personal project tracking)
- CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation)

View file

@ -117,8 +117,11 @@ class ElasticsearchConnector:
response = await self.client.search(index=index, body=search_body)
total_hits = response.get("hits", {}).get("total", {})
# normalize total value (could be dict or int depending on server)
total_val = total_hits.get("value", total_hits) if isinstance(total_hits, dict) else total_hits
logger.info(
f"Successfully searched index '{index}', found {response['hits']['total']['value']} results"
f"Successfully searched index '{index}', found {total_val} results"
)
return response
@ -202,7 +205,7 @@ class ElasticsearchConnector:
)
scroll_id = response.get("_scroll_id")
hits = response["hits"]["hits"]
hits = response.get("hits", {}).get("hits", [])
while hits:
for hit in hits:
@ -210,18 +213,19 @@ class ElasticsearchConnector:
# Continue scrolling
if scroll_id:
response = await self.client.scroll(
scroll_id=scroll_id, scroll=scroll_timeout
)
response = await self.client.scroll(scroll_id=scroll_id, scroll=scroll_timeout)
scroll_id = response.get("_scroll_id")
hits = response["hits"]["hits"]
hits = response.get("hits", {}).get("hits", [])
# Clear scroll
if scroll_id:
await self.client.clear_scroll(scroll_id=scroll_id)
try:
await self.client.clear_scroll(scroll_id=scroll_id)
except Exception:
logger.debug("Failed to clear scroll id (non-fatal)")
except Exception as e:
logger.error(f"Scroll search failed: {e}")
logger.error(f"Scroll search failed: {e}", exc_info=True)
raise
async def count_documents(

View file

@ -13,9 +13,14 @@ from sqlalchemy.future import select
from app.connectors.elasticsearch_connector import ElasticsearchConnector
from app.db import Document, DocumentType, SearchSourceConnector
from app.utils.document_converters import create_document_chunks
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_unique_identifier_hash,
)
from .base import check_document_by_unique_identifier
from .base import check_document_by_unique_identifier, check_duplicate_document_by_hash
logger = logging.getLogger(__name__)
@ -43,6 +48,19 @@ async def index_elasticsearch_documents(
Returns:
Tuple of (number of documents processed, error message if any)
"""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="elasticsearch_indexing",
source="connector_indexing_task",
message=f"Starting Elasticsearch indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"index": None,
"start_date": start_date,
"end_date": end_date,
},
)
es_connector = None
try:
# Get the connector configuration
@ -56,6 +74,9 @@ async def index_elasticsearch_documents(
if not connector:
error_msg = f"Elasticsearch connector with ID {connector_id} not found"
logger.error(error_msg)
await task_logger.log_task_failure(
log_entry, "Connector not found", error_msg, {"connector_id": connector_id}
)
return 0, error_msg
# Get connector configuration
@ -75,6 +96,11 @@ async def index_elasticsearch_documents(
logger.info(
"ELASTICSEARCH_INDEX missing or empty in connector config; defaulting to '*' (search all indices)"
)
await task_logger.log_task_progress(
log_entry,
"Using default index",
{"index": index_name, "stage": "index_defaulted"},
)
# Check authentication - must have either API key or username+password
has_api_key = (
@ -104,6 +130,11 @@ async def index_elasticsearch_documents(
verify_certs=config.get("ELASTICSEARCH_VERIFY_CERTS", True),
ca_certs=config.get("ELASTICSEARCH_CA_CERTS"),
)
await task_logger.log_task_progress(
log_entry,
"Initialized Elasticsearch connector",
{"index": index_name, "stage": "connector_initialized"},
)
# Build query based on configuration
query = _build_elasticsearch_query(config)
@ -118,6 +149,11 @@ async def index_elasticsearch_documents(
documents_processed = 0
try:
await task_logger.log_task_progress(
log_entry,
"Starting scroll search",
{"index": index_name, "stage": "scroll_start", "max_documents": max_documents},
)
# Use scroll search for large result sets
async for hit in es_connector.scroll_search(
index=index_name,
@ -154,7 +190,7 @@ async def index_elasticsearch_documents(
continue
# Create content hash
content_hash = hashlib.sha256(content.encode()).hexdigest()
content_hash = generate_content_hash(content, search_space_id)
# Build metadata
metadata = {
@ -171,23 +207,51 @@ async def index_elasticsearch_documents(
if field in source:
metadata[f"es_{field}"] = source[field]
# Check if document already exists
existing_doc = await check_document_by_unique_identifier(
session,
DocumentType.ELASTICSEARCH_CONNECTOR,
content_hash,
search_space_id,
# Build source-unique identifier and hash (prefer source id dedupe)
source_identifier = f"{hit.get('_index', index_name)}:{doc_id}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.ELASTICSEARCH_CONNECTOR, source_identifier, search_space_id
)
if existing_doc:
logger.debug(f"Document {doc_id} already exists, skipping")
continue
# Two-step duplicate detection: first by source-unique id, then by content hash
existing_doc = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if not existing_doc:
existing_doc = await check_duplicate_document_by_hash(
session, content_hash
)
if existing_doc:
# If content is unchanged, skip. Otherwise update the existing document.
if existing_doc.content_hash == content_hash:
logger.info(
f"Skipping ES doc {doc_id} — already indexed (doc id {existing_doc.id})"
)
continue
else:
logger.info(
f"Updating existing document {existing_doc.id} for ES doc {doc_id}"
)
existing_doc.title = title
existing_doc.content = content
existing_doc.content_hash = content_hash
existing_doc.document_metadata = metadata
existing_doc.unique_identifier_hash = unique_identifier_hash
chunks = await create_document_chunks(content)
existing_doc.chunks = chunks
await session.flush()
documents_processed += 1
if documents_processed % 10 == 0:
await session.commit()
continue
# Create document
document = Document(
title=title,
content=content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
document_type=DocumentType.ELASTICSEARCH_CONNECTOR,
document_metadata=metadata,
search_space_id=search_space_id,
@ -208,28 +272,38 @@ async def index_elasticsearch_documents(
await session.commit()
except Exception as e:
logger.error(
f"Error processing Elasticsearch document {hit.get('_id', 'unknown')}: {e}"
msg = f"Error processing Elasticsearch document {hit.get('_id', 'unknown')}: {e}"
logger.error(msg)
await task_logger.log_task_failure(
log_entry,
"Document processing error",
msg,
{"document_id": hit.get("_id", "unknown"), "error_type": type(e).__name__},
)
continue
# Final commit
await session.commit()
await task_logger.log_task_success(
log_entry,
f"Successfully indexed {documents_processed} documents from Elasticsearch",
{"documents_indexed": documents_processed, "index": index_name},
)
logger.info(
f"Successfully indexed {documents_processed} documents from Elasticsearch"
)
# Update last indexed timestamp if requested
if update_last_indexed and documents_processed > 0:
connector.last_indexed_at = datetime.now()
# connector.last_indexed_at = datetime.now()
connector.last_indexed_at = datetime.now(UTC).isoformat().replace("+00:00", "Z")
await session.commit()
if update_last_indexed and documents_processed > 0:
# store ISO-8601 UTC timestamp with 'Z' suffix, e.g. 2025-10-09T22:04:53.599658Z
connector.last_indexed_at = (
datetime.now(UTC).isoformat().replace("+00:00", "Z")
await task_logger.log_task_progress(
log_entry,
"Updated connector.last_indexed_at",
{"last_indexed_at": connector.last_indexed_at},
)
await session.commit()
return documents_processed, None
@ -241,6 +315,9 @@ async def index_elasticsearch_documents(
except Exception as e:
error_msg = f"Error indexing Elasticsearch documents: {e}"
logger.error(error_msg, exc_info=True)
await task_logger.log_task_failure(
log_entry, "Indexing failed", error_msg, {"error_type": type(e).__name__}
)
await session.rollback()
if es_connector:
await es_connector.close()

View file

@ -64,6 +64,7 @@ const connectorCategories: ConnectorCategory[] = [
title: "Elasticsearch",
description: "Connect to Elasticsearch to index and search documents, logs and metrics.",
icon: getConnectorIcon(EnumConnectorName.ELASTICSEARCH_CONNECTOR, "h-6 w-6"),
status: "available",
},
{
id: "baidu-search-api",