feat: Removed Hard Dependency on Unstructured.io

- Added Llamaparse Support :)
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-05-30 19:17:19 -07:00
parent 5737ea80c0
commit 73751c0eb1
11 changed files with 402 additions and 84 deletions

View file

@ -7,7 +7,7 @@ from app.db import get_async_session, User, SearchSpace, Document, DocumentType
from app.schemas import DocumentsCreate, DocumentUpdate, DocumentRead
from app.users import current_active_user
from app.utils.check_ownership import check_ownership
from app.tasks.background_tasks import add_received_markdown_file_document, add_extension_received_document, add_received_file_document, add_crawled_url_document, add_youtube_video_document
from app.tasks.background_tasks import add_received_markdown_file_document, add_extension_received_document, add_received_file_document_using_unstructured, add_crawled_url_document, add_youtube_video_document, add_received_file_document_using_llamacloud
from app.config import config as app_config
# Force asyncio to use standard event loop before unstructured imports
import asyncio
@ -101,8 +101,7 @@ async def create_documents(
content = await file.read()
with open(temp_path, "wb") as f:
f.write(content)
# Process in background to avoid uvloop conflicts
fastapi_background_tasks.add_task(
process_file_in_background_with_new_session,
temp_path,
@ -191,36 +190,74 @@ async def process_file_in_background(
search_space_id
)
else:
# Use synchronous unstructured API to avoid event loop issues
from langchain_unstructured import UnstructuredLoader
if app_config.ETL_SERVICE == "UNSTRUCTURED":
from langchain_unstructured import UnstructuredLoader
# Process the file
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
# Process the file
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
docs = await loader.aload()
# Clean up the temp file
import os
try:
os.unlink(file_path)
except:
pass
# Clean up the temp file
import os
try:
os.unlink(file_path)
except:
pass
# Pass the documents to the existing background task
await add_received_file_document_using_unstructured(
session,
filename,
docs,
search_space_id
)
elif app_config.ETL_SERVICE == "LLAMACLOUD":
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
# Pass the documents to the existing background task
await add_received_file_document(
session,
filename,
docs,
search_space_id
)
# Create LlamaParse parser instance
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1, # Use single worker for file processing
verbose=True,
language="en",
result_type=ResultType.MD
)
# Parse the file asynchronously
result = await parser.aparse(file_path)
# Clean up the temp file
import os
try:
os.unlink(file_path)
except:
pass
# Get markdown documents from the result
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
for doc in markdown_documents:
# Extract text content from the markdown documents
markdown_content = doc.text
# Process the documents using our LlamaCloud background task
await add_received_file_document_using_llamacloud(
session,
filename,
llamacloud_markdown_document=markdown_content,
search_space_id=search_space_id
)
except Exception as e:
import logging
logging.error(f"Error processing file in background: {str(e)}")
@ -442,3 +479,5 @@ async def process_youtube_video_with_new_session(
except Exception as e:
import logging
logging.error(f"Error processing YouTube video: {str(e)}")