fix: run ruff formatter to fix code quality

This commit is contained in:
Anish Sarkar 2025-11-30 04:15:38 +05:30
parent b98c312fb1
commit e419702ebd
4 changed files with 77 additions and 66 deletions

View file

@ -48,15 +48,19 @@ def upgrade() -> None:
from app.tasks.celery_tasks.blocknote_migration_tasks import (
populate_blocknote_for_documents_task,
)
# Queue the task to run asynchronously
populate_blocknote_for_documents_task.apply_async()
print("✓ Queued Celery task to populate blocknote_document for existing documents")
print(
"✓ Queued Celery task to populate blocknote_document for existing documents"
)
except Exception as e:
# If Celery is not available or task queueing fails, log but don't fail the migration
print(f"⚠ Warning: Could not queue blocknote population task: {e}")
print(" You can manually trigger it later with:")
print(" celery -A app.celery_app call app.tasks.celery_tasks.blocknote_migration_tasks.populate_blocknote_for_documents_task")
print(
" celery -A app.celery_app call app.tasks.celery_tasks.blocknote_migration_tasks.populate_blocknote_for_documents_task"
)
def downgrade() -> None:

View file

@ -33,7 +33,7 @@ async def get_editor_content(
attempts to generate it from chunks (lazy migration).
"""
from sqlalchemy.orm import selectinload
result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
@ -58,39 +58,39 @@ async def get_editor_content(
# Lazy migration: Try to generate blocknote_document from chunks
from app.utils.blocknote_converter import convert_markdown_to_blocknote
chunks = sorted(document.chunks, key=lambda c: c.id)
if not chunks:
raise HTTPException(
status_code=400,
detail="This document has no chunks and cannot be edited. Please re-upload to enable editing.",
)
# Reconstruct markdown from chunks
markdown_content = "\n\n".join(chunk.content for chunk in chunks)
if not markdown_content.strip():
raise HTTPException(
status_code=400,
detail="This document has empty content and cannot be edited.",
)
# Convert to BlockNote
blocknote_json = await convert_markdown_to_blocknote(markdown_content)
if not blocknote_json:
raise HTTPException(
status_code=500,
detail="Failed to convert document to editable format. Please try again later.",
)
# Save the generated blocknote_document (lazy migration)
document.blocknote_document = blocknote_json
document.content_needs_reindexing = False
document.last_edited_at = None
await session.commit()
return {
"document_id": document.id,
"title": document.title,
@ -111,7 +111,7 @@ async def save_document(
Called when user clicks 'Save & Exit'.
"""
from app.tasks.celery_tasks.document_reindex_tasks import reindex_document_task
# Verify ownership
result = await session.execute(
select(Document)
@ -119,27 +119,27 @@ async def save_document(
.filter(Document.id == document_id, SearchSpace.user_id == user.id)
)
document = result.scalars().first()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
blocknote_document = data.get("blocknote_document")
if not blocknote_document:
raise HTTPException(status_code=400, detail="blocknote_document is required")
# Save BlockNote document
document.blocknote_document = blocknote_document
document.last_edited_at = datetime.now(UTC)
document.content_needs_reindexing = True
await session.commit()
# Queue reindex task
reindex_document_task.delay(document_id, str(user.id))
return {
"status": "saved",
"document_id": document_id,
"message": "Document saved and will be reindexed in the background",
"last_edited_at": document.last_edited_at.isoformat()
"last_edited_at": document.last_edited_at.isoformat(),
}

View file

@ -36,7 +36,7 @@ def populate_blocknote_for_documents_task(
):
"""
Celery task to populate blocknote_document for existing documents.
Args:
document_ids: Optional list of specific document IDs to process.
If None, processes all documents with blocknote_document IS NULL.
@ -60,7 +60,7 @@ async def _populate_blocknote_for_documents(
):
"""
Async function to populate blocknote_document for documents.
Args:
document_ids: Optional list of specific document IDs to process
batch_size: Number of documents to process per batch
@ -69,75 +69,83 @@ async def _populate_blocknote_for_documents(
try:
# Build query for documents that need blocknote_document populated
query = select(Document).where(Document.blocknote_document.is_(None))
# If specific document IDs provided, filter by them
if document_ids:
query = query.where(Document.id.in_(document_ids))
# Load chunks relationship to avoid N+1 queries
query = query.options(selectinload(Document.chunks))
# Execute query
result = await session.execute(query)
documents = result.scalars().all()
total_documents = len(documents)
logger.info(f"Found {total_documents} documents to process")
if total_documents == 0:
logger.info("No documents to process")
return
# Process documents in batches
processed = 0
failed = 0
for i in range(0, total_documents, batch_size):
batch = documents[i : i + batch_size]
logger.info(f"Processing batch {i // batch_size + 1}: documents {i+1}-{min(i+batch_size, total_documents)}")
logger.info(
f"Processing batch {i // batch_size + 1}: documents {i + 1}-{min(i + batch_size, total_documents)}"
)
for document in batch:
try:
# Use preloaded chunks from selectinload - no need to query again
chunks = sorted(document.chunks, key=lambda c: c.id)
if not chunks:
logger.warning(
f"Document {document.id} ({document.title}) has no chunks, skipping"
)
failed += 1
continue
# Reconstruct markdown by concatenating chunk contents
markdown_content = "\n\n".join(chunk.content for chunk in chunks)
markdown_content = "\n\n".join(
chunk.content for chunk in chunks
)
if not markdown_content or not markdown_content.strip():
logger.warning(
f"Document {document.id} ({document.title}) has empty markdown content, skipping"
)
failed += 1
continue
# Convert markdown to BlockNote JSON
blocknote_json = await convert_markdown_to_blocknote(markdown_content)
blocknote_json = await convert_markdown_to_blocknote(
markdown_content
)
if not blocknote_json:
logger.warning(
f"Failed to convert markdown to BlockNote for document {document.id} ({document.title})"
)
failed += 1
continue
# Update document with blocknote_document (other fields already have correct defaults)
document.blocknote_document = blocknote_json
processed += 1
# Commit every batch_size documents to avoid long transactions
if processed % batch_size == 0:
await session.commit()
logger.info(f"Committed batch: {processed} documents processed so far")
logger.info(
f"Committed batch: {processed} documents processed so far"
)
except Exception as e:
logger.error(
f"Error processing document {document.id} ({document.title}): {e}",
@ -146,15 +154,15 @@ async def _populate_blocknote_for_documents(
failed += 1
# Continue with next document instead of failing entire batch
continue
# Commit remaining changes in the batch
await session.commit()
logger.info(f"Completed batch {i // batch_size + 1}")
logger.info(
f"Migration complete: {processed} documents processed, {failed} failed"
)
except Exception as e:
await session.rollback()
logger.error(f"Error in blocknote migration task: {e}", exc_info=True)

View file

@ -35,7 +35,7 @@ def get_celery_session_maker():
def reindex_document_task(self, document_id: int, user_id: str):
"""
Celery task to reindex a document after editing.
Args:
document_id: ID of document to reindex
user_id: ID of user who edited the document
@ -62,66 +62,65 @@ async def _reindex_document(document_id: int, user_id: str):
.where(Document.id == document_id)
)
document = result.scalars().first()
if not document:
logger.error(f"Document {document_id} not found")
return
if not document.blocknote_document:
logger.warning(f"Document {document_id} has no BlockNote content")
return
logger.info(f"Reindexing document {document_id} ({document.title})")
# 1. Convert BlockNote → Markdown
markdown_content = await convert_blocknote_to_markdown(
document.blocknote_document
)
if not markdown_content:
logger.error(f"Failed to convert document {document_id} to markdown")
return
# 2. Delete old chunks explicitly
from app.db import Chunk
await session.execute(
delete(Chunk).where(Chunk.document_id == document_id)
)
await session.execute(delete(Chunk).where(Chunk.document_id == document_id))
await session.flush() # Ensure old chunks are deleted
# 3. Create new chunks
new_chunks = await create_document_chunks(markdown_content)
# 4. Add new chunks to session
for chunk in new_chunks:
chunk.document_id = document_id
session.add(chunk)
logger.info(f"Created {len(new_chunks)} chunks for document {document_id}")
# 5. Regenerate summary
user_llm = await get_user_long_context_llm(
session, user_id, document.search_space_id
)
document_metadata = {
"title": document.title,
"document_type": document.document_type.value,
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
# 6. Update document
document.content = summary_content
document.embedding = summary_embedding
document.content_needs_reindexing = False
await session.commit()
logger.info(f"Successfully reindexed document {document_id}")
except Exception as e:
await session.rollback()
logger.error(f"Error reindexing document {document_id}: {e}", exc_info=True)