diff --git a/surfsense_backend/app/tasks/background_tasks.py b/surfsense_backend/app/tasks/background_tasks.py index 099391f..1d278f7 100644 --- a/surfsense_backend/app/tasks/background_tasks.py +++ b/surfsense_backend/app/tasks/background_tasks.py @@ -11,17 +11,19 @@ from langchain_core.documents import Document as LangChainDocument from langchain_community.document_loaders import FireCrawlLoader, AsyncChromiumLoader from langchain_community.document_transformers import MarkdownifyTransformer import validators +from youtube_transcript_api import YouTubeTranscriptApi +from urllib.parse import urlparse, parse_qs +import aiohttp +from app.db import Document as DB_Document, DocumentType as DB_DocumentType +import logging md = MarkdownifyTransformer() async def add_crawled_url_document( - session: AsyncSession, - url: str, - search_space_id: int + session: AsyncSession, url: str, search_space_id: int ) -> Optional[Document]: try: - if not validators.url(url): raise ValueError(f"Url {url} is not a valid URL address") @@ -33,7 +35,7 @@ async def add_crawled_url_document( params={ "formats": ["markdown"], "excludeTags": ["a"], - } + }, ) else: crawl_loader = AsyncChromiumLoader(urls=[url], headless=True) @@ -43,20 +45,21 @@ async def add_crawled_url_document( if type(crawl_loader) == FireCrawlLoader: content_in_markdown = url_crawled[0].page_content elif type(crawl_loader) == AsyncChromiumLoader: - content_in_markdown = md.transform_documents(url_crawled)[ - 0].page_content + content_in_markdown = md.transform_documents(url_crawled)[0].page_content # Format document metadata in a more maintainable way metadata_sections = [ - ("METADATA", [ - f"{key.upper()}: {value}" for key, value in url_crawled[0].metadata.items() - ]), - ("CONTENT", [ - "FORMAT: markdown", - "TEXT_START", - content_in_markdown, - "TEXT_END" - ]) + ( + "METADATA", + [ + f"{key.upper()}: {value}" + for key, value in url_crawled[0].metadata.items() + ], + ), + ( + "CONTENT", + ["FORMAT: markdown", "TEXT_START", content_in_markdown, "TEXT_END"], + ), ] # Build the document string more efficiently @@ -69,31 +72,36 @@ async def add_crawled_url_document( document_parts.append(f"") document_parts.append("") - combined_document_string = '\n'.join(document_parts) + combined_document_string = "\n".join(document_parts) # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance - summary_result = await summary_chain.ainvoke({"document": combined_document_string}) + summary_result = await summary_chain.ainvoke( + {"document": combined_document_string} + ) summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed( - summary_content) + summary_embedding = config.embedding_model_instance.embed(summary_content) # Process chunks chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) for chunk in config.chunker_instance.chunk(content_in_markdown) ] # Create and store document document = Document( search_space_id=search_space_id, - title=url_crawled[0].metadata['title'] if type( - crawl_loader) == FireCrawlLoader else url_crawled[0].metadata['source'], + title=url_crawled[0].metadata["title"] + if type(crawl_loader) == FireCrawlLoader + else url_crawled[0].metadata["source"], document_type=DocumentType.CRAWLED_URL, document_metadata=url_crawled[0].metadata, content=summary_content, embedding=summary_embedding, - chunks=chunks + chunks=chunks, ) session.add(document) @@ -111,9 +119,7 @@ async def add_crawled_url_document( async def add_extension_received_document( - session: AsyncSession, - content: ExtensionDocumentContent, - search_space_id: int + session: AsyncSession, content: ExtensionDocumentContent, search_space_id: int ) -> Optional[Document]: """ Process and store document content received from the SurfSense Extension. @@ -129,20 +135,21 @@ async def add_extension_received_document( try: # Format document metadata in a more maintainable way metadata_sections = [ - ("METADATA", [ - f"SESSION_ID: {content.metadata.BrowsingSessionId}", - f"URL: {content.metadata.VisitedWebPageURL}", - f"TITLE: {content.metadata.VisitedWebPageTitle}", - f"REFERRER: {content.metadata.VisitedWebPageReffererURL}", - f"TIMESTAMP: {content.metadata.VisitedWebPageDateWithTimeInISOString}", - f"DURATION_MS: {content.metadata.VisitedWebPageVisitDurationInMilliseconds}" - ]), - ("CONTENT", [ - "FORMAT: markdown", - "TEXT_START", - content.pageContent, - "TEXT_END" - ]) + ( + "METADATA", + [ + f"SESSION_ID: {content.metadata.BrowsingSessionId}", + f"URL: {content.metadata.VisitedWebPageURL}", + f"TITLE: {content.metadata.VisitedWebPageTitle}", + f"REFERRER: {content.metadata.VisitedWebPageReffererURL}", + f"TIMESTAMP: {content.metadata.VisitedWebPageDateWithTimeInISOString}", + f"DURATION_MS: {content.metadata.VisitedWebPageVisitDurationInMilliseconds}", + ], + ), + ( + "CONTENT", + ["FORMAT: markdown", "TEXT_START", content.pageContent, "TEXT_END"], + ), ] # Build the document string more efficiently @@ -155,18 +162,22 @@ async def add_extension_received_document( document_parts.append(f"") document_parts.append("") - combined_document_string = '\n'.join(document_parts) + combined_document_string = "\n".join(document_parts) # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance - summary_result = await summary_chain.ainvoke({"document": combined_document_string}) + summary_result = await summary_chain.ainvoke( + {"document": combined_document_string} + ) summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed( - summary_content) + summary_embedding = config.embedding_model_instance.embed(summary_content) # Process chunks chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) for chunk in config.chunker_instance.chunk(content.pageContent) ] @@ -178,7 +189,7 @@ async def add_extension_received_document( document_metadata=content.metadata.model_dump(), content=summary_content, embedding=summary_embedding, - chunks=chunks + chunks=chunks, ) session.add(document) @@ -194,24 +205,23 @@ async def add_extension_received_document( await session.rollback() raise RuntimeError(f"Failed to process extension document: {str(e)}") + async def add_received_markdown_file_document( - session: AsyncSession, - file_name: str, - file_in_markdown: str, - search_space_id: int + session: AsyncSession, file_name: str, file_in_markdown: str, search_space_id: int ) -> Optional[Document]: try: - # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance summary_result = await summary_chain.ainvoke({"document": file_in_markdown}) summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed( - summary_content) + summary_embedding = config.embedding_model_instance.embed(summary_content) - # Process chunks + # Process chunks chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) for chunk in config.chunker_instance.chunk(file_in_markdown) ] @@ -222,11 +232,11 @@ async def add_received_markdown_file_document( document_type=DocumentType.FILE, document_metadata={ "FILE_NAME": file_name, - "SAVED_AT": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "SAVED_AT": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, content=summary_content, embedding=summary_embedding, - chunks=chunks + chunks=chunks, ) session.add(document) @@ -241,14 +251,17 @@ async def add_received_markdown_file_document( await session.rollback() raise RuntimeError(f"Failed to process file document: {str(e)}") + async def add_received_file_document( session: AsyncSession, file_name: str, unstructured_processed_elements: List[LangChainDocument], - search_space_id: int + search_space_id: int, ) -> Optional[Document]: try: - file_in_markdown = await convert_document_to_markdown(unstructured_processed_elements) + file_in_markdown = await convert_document_to_markdown( + unstructured_processed_elements + ) # TODO: Check if file_markdown exceeds token limit of embedding model @@ -256,12 +269,14 @@ async def add_received_file_document( summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance summary_result = await summary_chain.ainvoke({"document": file_in_markdown}) summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed( - summary_content) + summary_embedding = config.embedding_model_instance.embed(summary_content) - # Process chunks + # Process chunks chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) for chunk in config.chunker_instance.chunk(file_in_markdown) ] @@ -272,11 +287,11 @@ async def add_received_file_document( document_type=DocumentType.FILE, document_metadata={ "FILE_NAME": file_name, - "SAVED_AT": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "SAVED_AT": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, content=summary_content, embedding=summary_embedding, - chunks=chunks + chunks=chunks, ) session.add(document) @@ -293,20 +308,27 @@ async def add_received_file_document( async def add_youtube_video_document( - session: AsyncSession, - url: str, - search_space_id: int + session: AsyncSession, url: str, search_space_id: int ): """ - Process a YouTube video URL, extract transcripts, and add as document. + Process a YouTube video URL, extract transcripts, and store as a document. + + Args: + session: Database session for storing the document + url: YouTube video URL (supports standard, shortened, and embed formats) + search_space_id: ID of the search space to add the document to + + Returns: + Document: The created document object + + Raises: + ValueError: If the YouTube video ID cannot be extracted from the URL + SQLAlchemyError: If there's a database error + RuntimeError: If the video processing fails """ try: - from youtube_transcript_api import YouTubeTranscriptApi - # Extract video ID from URL def get_youtube_video_id(url: str): - from urllib.parse import urlparse, parse_qs - parsed_url = urlparse(url) hostname = parsed_url.hostname @@ -327,20 +349,16 @@ async def add_youtube_video_document( if not video_id: raise ValueError(f"Could not extract video ID from URL: {url}") - # Get video metadata - import json - from urllib.parse import urlencode - from urllib.request import urlopen - - params = {"format": "json", - "url": f"https://www.youtube.com/watch?v={video_id}"} + # Get video metadata using async HTTP client + params = { + "format": "json", + "url": f"https://www.youtube.com/watch?v={video_id}", + } oembed_url = "https://www.youtube.com/oembed" - query_string = urlencode(params) - full_url = oembed_url + "?" + query_string - with urlopen(full_url) as response: - response_text = response.read() - video_data = json.loads(response_text.decode()) + async with aiohttp.ClientSession() as http_session: + async with http_session.get(oembed_url, params=params) as response: + video_data = await response.json() # Get video transcript try: @@ -359,19 +377,20 @@ async def add_youtube_video_document( # Format document metadata in a more maintainable way metadata_sections = [ - ("METADATA", [ - f"TITLE: {video_data.get('title', 'YouTube Video')}", - f"URL: {url}", - f"VIDEO_ID: {video_id}", - f"AUTHOR: {video_data.get('author_name', 'Unknown')}", - f"THUMBNAIL: {video_data.get('thumbnail_url', '')}" - ]), - ("CONTENT", [ - "FORMAT: transcript", - "TEXT_START", - transcript_text, - "TEXT_END" - ]) + ( + "METADATA", + [ + f"TITLE: {video_data.get('title', 'YouTube Video')}", + f"URL: {url}", + f"VIDEO_ID: {video_id}", + f"AUTHOR: {video_data.get('author_name', 'Unknown')}", + f"THUMBNAIL: {video_data.get('thumbnail_url', '')}", + ], + ), + ( + "CONTENT", + ["FORMAT: transcript", "TEXT_START", transcript_text, "TEXT_END"], + ), ] # Build the document string more efficiently @@ -384,38 +403,41 @@ async def add_youtube_video_document( document_parts.append(f"") document_parts.append("") - combined_document_string = '\n'.join(document_parts) + combined_document_string = "\n".join(document_parts) # Generate summary summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance - summary_result = await summary_chain.ainvoke({"document": combined_document_string}) + summary_result = await summary_chain.ainvoke( + {"document": combined_document_string} + ) summary_content = summary_result.content - summary_embedding = config.embedding_model_instance.embed( - summary_content) + summary_embedding = config.embedding_model_instance.embed(summary_content) # Process chunks chunks = [ - Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + Chunk( + content=chunk.text, + embedding=config.embedding_model_instance.embed(chunk.text), + ) for chunk in config.chunker_instance.chunk(transcript_text) ] # Create document - from app.db import Document, DocumentType - document = Document( + document = DB_Document( title=video_data.get("title", "YouTube Video"), - document_type=DocumentType.YOUTUBE_VIDEO, + document_type=DB_DocumentType.YOUTUBE_VIDEO, document_metadata={ "url": url, "video_id": video_id, "video_title": video_data.get("title", "YouTube Video"), "author": video_data.get("author_name", "Unknown"), - "thumbnail": video_data.get("thumbnail_url", "") + "thumbnail": video_data.get("thumbnail_url", ""), }, content=summary_content, embedding=summary_embedding, chunks=chunks, - search_space_id=search_space_id + search_space_id=search_space_id, ) session.add(document) @@ -428,6 +450,5 @@ async def add_youtube_video_document( raise db_error except Exception as e: await session.rollback() - import logging logging.error(f"Failed to process YouTube video: {str(e)}") raise