refactor: implement UploadDocumentAdapter for file indexing and reindexing

This commit is contained in:
Anish Sarkar 2026-02-28 01:38:32 +05:30
parent 1e4b8d3e89
commit 23a98d802c
4 changed files with 227 additions and 97 deletions

View file

@ -1,47 +1,83 @@
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import DocumentStatus, DocumentType
from app.db import Document, DocumentStatus, DocumentType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
async def index_uploaded_file(
markdown_content: str,
filename: str,
etl_service: str,
search_space_id: int,
user_id: str,
session: AsyncSession,
llm,
should_summarize: bool = False,
) -> None:
connector_doc = ConnectorDocument(
title=filename,
source_markdown=markdown_content,
unique_id=filename,
document_type=DocumentType.FILE,
search_space_id=search_space_id,
created_by_id=user_id,
connector_id=None,
should_summarize=should_summarize,
should_use_code_chunker=False,
fallback_summary=markdown_content[:4000],
metadata={
"FILE_NAME": filename,
"ETL_SERVICE": etl_service,
},
)
class UploadDocumentAdapter:
def __init__(self, session: AsyncSession) -> None:
self._session = session
self._service = IndexingPipelineService(session)
service = IndexingPipelineService(session)
documents = await service.prepare_for_indexing([connector_doc])
async def index(
self,
markdown_content: str,
filename: str,
etl_service: str,
search_space_id: int,
user_id: str,
llm,
should_summarize: bool = False,
) -> None:
connector_doc = ConnectorDocument(
title=filename,
source_markdown=markdown_content,
unique_id=filename,
document_type=DocumentType.FILE,
search_space_id=search_space_id,
created_by_id=user_id,
connector_id=None,
should_summarize=should_summarize,
should_use_code_chunker=False,
fallback_summary=markdown_content[:4000],
metadata={
"FILE_NAME": filename,
"ETL_SERVICE": etl_service,
},
)
if not documents:
raise RuntimeError("prepare_for_indexing returned no documents")
documents = await self._service.prepare_for_indexing([connector_doc])
indexed = await service.index(documents[0], connector_doc, llm)
if not documents:
raise RuntimeError("prepare_for_indexing returned no documents")
if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY):
raise RuntimeError(indexed.status.get("reason", "Indexing failed"))
indexed = await self._service.index(documents[0], connector_doc, llm)
indexed.content_needs_reindexing = False
await session.commit()
if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY):
raise RuntimeError(indexed.status.get("reason", "Indexing failed"))
indexed.content_needs_reindexing = False
await self._session.commit()
async def reindex(self, document: Document, llm) -> None:
"""Re-index an existing document after its source_markdown has been updated."""
if not document.source_markdown:
raise RuntimeError("Document has no source_markdown to reindex")
metadata = document.document_metadata or {}
connector_doc = ConnectorDocument(
title=document.title,
source_markdown=document.source_markdown,
unique_id=document.title,
document_type=document.document_type,
search_space_id=document.search_space_id,
created_by_id=str(document.created_by_id),
connector_id=document.connector_id,
should_summarize=True,
should_use_code_chunker=False,
fallback_summary=document.source_markdown[:4000],
metadata=metadata,
)
document.content_hash = compute_content_hash(connector_doc)
indexed = await self._service.index(document, connector_doc, llm)
if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY):
raise RuntimeError(indexed.status.get("reason", "Reindexing failed"))
indexed.content_needs_reindexing = False
await self._session.commit()

View file

@ -2,7 +2,7 @@
import logging
from sqlalchemy import delete, select
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import selectinload
@ -11,12 +11,9 @@ from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.db import Document
from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_document_summary,
)
logger = logging.getLogger(__name__)
@ -54,7 +51,6 @@ def reindex_document_task(self, document_id: int, user_id: str):
async def _reindex_document(document_id: int, user_id: str):
"""Async function to reindex a document."""
async with get_celery_session_maker()() as session:
# First, get the document to get search_space_id for logging
result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
@ -66,10 +62,8 @@ async def _reindex_document(document_id: int, user_id: str):
logger.error(f"Document {document_id} not found")
return
# Initialize task logger
task_logger = TaskLoggingService(session, document.search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="document_reindex",
source="editor",
@ -83,10 +77,7 @@ async def _reindex_document(document_id: int, user_id: str):
)
try:
# Read markdown directly from source_markdown
markdown_content = document.source_markdown
if not markdown_content:
if not document.source_markdown:
await task_logger.log_task_failure(
log_entry,
f"Document {document_id} has no source_markdown to reindex",
@ -97,51 +88,17 @@ async def _reindex_document(document_id: int, user_id: str):
logger.info(f"Reindexing document {document_id} ({document.title})")
# 1. Delete old chunks explicitly
from app.db import Chunk
await session.execute(delete(Chunk).where(Chunk.document_id == document_id))
await session.flush() # Ensure old chunks are deleted
# 2. Create new chunks from source_markdown
new_chunks = await create_document_chunks(markdown_content)
# 3. Add new chunks to session
for chunk in new_chunks:
chunk.document_id = document_id
session.add(chunk)
logger.info(f"Created {len(new_chunks)} chunks for document {document_id}")
# 4. Regenerate summary
user_llm = await get_user_long_context_llm(
session, user_id, document.search_space_id
)
document_metadata = {
"title": document.title,
"document_type": document.document_type.value,
}
adapter = UploadDocumentAdapter(session)
await adapter.reindex(document=document, llm=user_llm)
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
# 5. Update document
document.content = summary_content
document.embedding = summary_embedding
document.content_needs_reindexing = False
await session.commit()
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully reindexed document: {document.title}",
{
"chunks_created": len(new_chunks),
"document_id": document_id,
},
{"document_id": document_id},
)
logger.info(f"Successfully reindexed document {document_id}")

View file

@ -18,7 +18,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config as app_config
from app.db import Document, DocumentStatus, DocumentType, Log, Notification
from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file
from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter
from app.services.llm_service import get_user_long_context_llm
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
@ -1871,13 +1871,13 @@ async def process_file_in_background_with_document(
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
await index_uploaded_file(
adapter = UploadDocumentAdapter(session)
await adapter.index(
markdown_content=markdown_content,
filename=filename,
etl_service=etl_service,
search_space_id=search_space_id,
user_id=user_id,
session=session,
llm=user_llm,
should_summarize=should_summarize,
)

View file

@ -2,7 +2,7 @@ import pytest
from sqlalchemy import select
from app.db import Chunk, Document, DocumentStatus
from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file
from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter
pytestmark = pytest.mark.integration
@ -12,13 +12,13 @@ pytestmark = pytest.mark.integration
)
async def test_sets_status_ready(db_session, db_search_space, db_user, mocker):
"""Document status is READY after successful indexing."""
await index_uploaded_file(
adapter = UploadDocumentAdapter(db_session)
await adapter.index(
markdown_content="## Hello\n\nSome content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
session=db_session,
llm=mocker.Mock(),
)
@ -35,13 +35,13 @@ async def test_sets_status_ready(db_session, db_search_space, db_user, mocker):
)
async def test_content_is_summary(db_session, db_search_space, db_user, mocker):
"""Document content is set to the LLM-generated summary."""
await index_uploaded_file(
adapter = UploadDocumentAdapter(db_session)
await adapter.index(
markdown_content="## Hello\n\nSome content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
session=db_session,
llm=mocker.Mock(),
)
@ -58,13 +58,13 @@ async def test_content_is_summary(db_session, db_search_space, db_user, mocker):
)
async def test_chunks_written_to_db(db_session, db_search_space, db_user, mocker):
"""Chunks derived from the source markdown are persisted in the DB."""
await index_uploaded_file(
adapter = UploadDocumentAdapter(db_session)
await adapter.index(
markdown_content="## Hello\n\nSome content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
session=db_session,
llm=mocker.Mock(),
)
@ -87,13 +87,150 @@ async def test_chunks_written_to_db(db_session, db_search_space, db_user, mocker
)
async def test_raises_on_indexing_failure(db_session, db_search_space, db_user, mocker):
"""RuntimeError is raised when the indexing step fails so the caller can fire a failure notification."""
adapter = UploadDocumentAdapter(db_session)
with pytest.raises(RuntimeError):
await index_uploaded_file(
await adapter.index(
markdown_content="## Hello\n\nSome content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
session=db_session,
llm=mocker.Mock(),
)
# ---------------------------------------------------------------------------
# reindex() tests
# ---------------------------------------------------------------------------
@pytest.mark.usefixtures(
"patched_summarize", "patched_embed_text", "patched_chunk_text"
)
async def test_reindex_sets_status_ready(db_session, db_search_space, db_user, mocker):
"""Document status is READY after successful reindexing."""
adapter = UploadDocumentAdapter(db_session)
await adapter.index(
markdown_content="## Original\n\nOriginal content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
llm=mocker.Mock(),
)
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
document = result.scalars().first()
document.source_markdown = "## Edited\n\nNew content after user edit."
await db_session.flush()
await adapter.reindex(document=document, llm=mocker.Mock())
await db_session.refresh(document)
assert DocumentStatus.is_state(document.status, DocumentStatus.READY)
@pytest.mark.usefixtures(
"patched_summarize", "patched_embed_text", "patched_chunk_text"
)
async def test_reindex_replaces_chunks(db_session, db_search_space, db_user, mocker):
"""Reindexing replaces old chunks rather than appending new ones."""
adapter = UploadDocumentAdapter(db_session)
await adapter.index(
markdown_content="## Original\n\nOriginal content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
llm=mocker.Mock(),
)
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
document = result.scalars().first()
document_id = document.id
document.source_markdown = "## Edited\n\nNew content after user edit."
await db_session.flush()
await adapter.reindex(document=document, llm=mocker.Mock())
chunks_result = await db_session.execute(
select(Chunk).filter(Chunk.document_id == document_id)
)
chunks = chunks_result.scalars().all()
assert len(chunks) == 1
@pytest.mark.usefixtures(
"patched_summarize", "patched_embed_text", "patched_chunk_text"
)
async def test_reindex_clears_reindexing_flag(
db_session, db_search_space, db_user, mocker
):
"""After successful reindex, content_needs_reindexing is False."""
adapter = UploadDocumentAdapter(db_session)
await adapter.index(
markdown_content="## Original\n\nOriginal content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
llm=mocker.Mock(),
)
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
document = result.scalars().first()
document.source_markdown = "## Edited\n\nNew content after user edit."
document.content_needs_reindexing = True
await db_session.flush()
await adapter.reindex(document=document, llm=mocker.Mock())
await db_session.refresh(document)
assert document.content_needs_reindexing is False
@pytest.mark.usefixtures("patched_embed_text", "patched_chunk_text")
async def test_reindex_raises_on_failure(
db_session, db_search_space, db_user, mocker
):
"""RuntimeError is raised when reindexing fails so the caller can handle it."""
mocker.patch(
"app.indexing_pipeline.indexing_pipeline_service.summarize_document",
return_value="Mocked summary.",
)
adapter = UploadDocumentAdapter(db_session)
await adapter.index(
markdown_content="## Original\n\nOriginal content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
llm=mocker.Mock(),
)
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
document = result.scalars().first()
document.source_markdown = "## Edited\n\nNew content after user edit."
await db_session.flush()
mocker.patch(
"app.indexing_pipeline.indexing_pipeline_service.summarize_document",
side_effect=RuntimeError("LLM unavailable"),
)
with pytest.raises(RuntimeError):
await adapter.reindex(document=document, llm=mocker.Mock())