mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-15 09:45:42 +00:00
204 lines
7 KiB
Python
204 lines
7 KiB
Python
"""
|
|
Unified document save/update logic for file processors.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.db import Document, DocumentStatus, DocumentType
|
|
from app.services.llm_service import get_user_long_context_llm
|
|
from app.utils.document_converters import (
|
|
create_document_chunks,
|
|
embed_text,
|
|
generate_content_hash,
|
|
generate_document_summary,
|
|
)
|
|
|
|
from ._helpers import (
|
|
find_existing_document_with_migration,
|
|
get_google_drive_unique_identifier,
|
|
handle_existing_document_update,
|
|
)
|
|
from .base import get_current_timestamp, safe_set_chunks
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Summary generation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _generate_summary(
|
|
markdown_content: str,
|
|
file_name: str,
|
|
etl_service: str,
|
|
user_llm,
|
|
enable_summary: bool,
|
|
) -> tuple[str, list[float]]:
|
|
"""
|
|
Generate a document summary and embedding.
|
|
|
|
Docling uses its own large-document summary strategy; other ETL services
|
|
use the standard ``generate_document_summary`` helper.
|
|
"""
|
|
if not enable_summary:
|
|
summary = f"File: {file_name}\n\n{markdown_content[:4000]}"
|
|
return summary, embed_text(summary)
|
|
|
|
if etl_service == "DOCLING":
|
|
from app.services.docling_service import create_docling_service
|
|
|
|
docling_service = create_docling_service()
|
|
summary_text = await docling_service.process_large_document_summary(
|
|
content=markdown_content, llm=user_llm, document_title=file_name
|
|
)
|
|
|
|
meta = {
|
|
"file_name": file_name,
|
|
"etl_service": etl_service,
|
|
"document_type": "File Document",
|
|
}
|
|
parts = ["# DOCUMENT METADATA"]
|
|
for key, value in meta.items():
|
|
if value:
|
|
formatted_key = key.replace("_", " ").title()
|
|
parts.append(f"**{formatted_key}:** {value}")
|
|
|
|
enhanced = "\n".join(parts) + "\n\n# DOCUMENT SUMMARY\n\n" + summary_text
|
|
return enhanced, embed_text(enhanced)
|
|
|
|
# Standard summary (Unstructured / LlamaCloud / others)
|
|
meta = {
|
|
"file_name": file_name,
|
|
"etl_service": etl_service,
|
|
"document_type": "File Document",
|
|
}
|
|
return await generate_document_summary(markdown_content, user_llm, meta)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unified save function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def save_file_document(
|
|
session: AsyncSession,
|
|
file_name: str,
|
|
markdown_content: str,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
etl_service: str,
|
|
connector: dict | None = None,
|
|
enable_summary: bool = True,
|
|
) -> Document | None:
|
|
"""
|
|
Process and store a file document with deduplication and migration support.
|
|
|
|
Handles both creating new documents and updating existing ones. This is
|
|
the single implementation behind the per-ETL-service wrapper functions.
|
|
|
|
Args:
|
|
session: Database session
|
|
file_name: Name of the processed file
|
|
markdown_content: Markdown content to store
|
|
search_space_id: ID of the search space
|
|
user_id: ID of the user
|
|
etl_service: Name of the ETL service (UNSTRUCTURED, LLAMACLOUD, DOCLING)
|
|
connector: Optional connector info for Google Drive files
|
|
enable_summary: Whether to generate an AI summary
|
|
|
|
Returns:
|
|
Document object if successful, None if duplicate detected
|
|
"""
|
|
try:
|
|
primary_hash, legacy_hash = get_google_drive_unique_identifier(
|
|
connector, file_name, search_space_id
|
|
)
|
|
content_hash = generate_content_hash(markdown_content, search_space_id)
|
|
|
|
existing_document = await find_existing_document_with_migration(
|
|
session, primary_hash, legacy_hash, content_hash
|
|
)
|
|
|
|
if existing_document:
|
|
should_skip, doc = await handle_existing_document_update(
|
|
session,
|
|
existing_document,
|
|
content_hash,
|
|
connector,
|
|
file_name,
|
|
primary_hash,
|
|
)
|
|
if should_skip:
|
|
return doc
|
|
|
|
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
|
if not user_llm:
|
|
raise RuntimeError(
|
|
f"No long context LLM configured for user {user_id} "
|
|
f"in search space {search_space_id}"
|
|
)
|
|
|
|
summary_content, summary_embedding = await _generate_summary(
|
|
markdown_content, file_name, etl_service, user_llm, enable_summary
|
|
)
|
|
chunks = await create_document_chunks(markdown_content)
|
|
doc_metadata = {"FILE_NAME": file_name, "ETL_SERVICE": etl_service}
|
|
|
|
if existing_document:
|
|
existing_document.title = file_name
|
|
existing_document.content = summary_content
|
|
existing_document.content_hash = content_hash
|
|
existing_document.embedding = summary_embedding
|
|
existing_document.document_metadata = doc_metadata
|
|
await safe_set_chunks(session, existing_document, chunks)
|
|
existing_document.source_markdown = markdown_content
|
|
existing_document.content_needs_reindexing = False
|
|
existing_document.updated_at = get_current_timestamp()
|
|
existing_document.status = DocumentStatus.ready()
|
|
|
|
await session.commit()
|
|
await session.refresh(existing_document)
|
|
return existing_document
|
|
|
|
doc_type = DocumentType.FILE
|
|
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
|
|
doc_type = DocumentType.GOOGLE_DRIVE_FILE
|
|
|
|
document = Document(
|
|
search_space_id=search_space_id,
|
|
title=file_name,
|
|
document_type=doc_type,
|
|
document_metadata=doc_metadata,
|
|
content=summary_content,
|
|
embedding=summary_embedding,
|
|
chunks=chunks,
|
|
content_hash=content_hash,
|
|
unique_identifier_hash=primary_hash,
|
|
source_markdown=markdown_content,
|
|
content_needs_reindexing=False,
|
|
updated_at=get_current_timestamp(),
|
|
created_by_id=user_id,
|
|
connector_id=connector.get("connector_id") if connector else None,
|
|
status=DocumentStatus.ready(),
|
|
)
|
|
session.add(document)
|
|
await session.commit()
|
|
await session.refresh(document)
|
|
return document
|
|
|
|
except SQLAlchemyError as db_error:
|
|
await session.rollback()
|
|
if "ix_documents_content_hash" in str(db_error):
|
|
logging.warning(
|
|
"content_hash collision during commit for %s (%s). Skipping.",
|
|
file_name,
|
|
etl_service,
|
|
)
|
|
return None
|
|
raise db_error
|
|
except Exception as e:
|
|
await session.rollback()
|
|
raise RuntimeError(
|
|
f"Failed to process file document using {etl_service}: {e!s}"
|
|
) from e
|