diff --git a/surfsense_backend/app/tasks/background_tasks.py b/surfsense_backend/app/tasks/background_tasks.py index f18890f..07e0f5f 100644 --- a/surfsense_backend/app/tasks/background_tasks.py +++ b/surfsense_backend/app/tasks/background_tasks.py @@ -30,7 +30,7 @@ async def add_crawled_url_document( task_name="crawl_url_document", source="background_task", message=f"Starting URL crawling process for: {url}", - metadata={"url": url, "user_id": user_id} + metadata={"url": url, "user_id": str(user_id)} ) try: @@ -259,7 +259,7 @@ async def add_extension_received_document( metadata={ "url": content.metadata.VisitedWebPageURL, "title": content.metadata.VisitedWebPageTitle, - "user_id": user_id + "user_id": str(user_id) } ) @@ -392,7 +392,7 @@ async def add_received_markdown_file_document( task_name="markdown_file_document", source="background_task", message=f"Processing markdown file: {file_name}", - metadata={"filename": file_name, "user_id": user_id, "content_length": len(file_in_markdown)} + metadata={"filename": file_name, "user_id": str(user_id), "content_length": len(file_in_markdown)} ) try: @@ -667,7 +667,7 @@ async def add_youtube_video_document( task_name="youtube_video_document", source="background_task", message=f"Starting YouTube video processing for: {url}", - metadata={"url": url, "user_id": user_id} + metadata={"url": url, "user_id": str(user_id)} ) try: diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index 6a972be..e0b3cd1 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -50,7 +50,7 @@ async def index_slack_messages( task_name="slack_messages_indexing", source="connector_indexing_task", message=f"Starting Slack messages indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} ) try: @@ -412,7 +412,7 @@ async def index_notion_pages( task_name="notion_pages_indexing", source="connector_indexing_task", message=f"Starting Notion pages indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} ) try: @@ -763,7 +763,7 @@ async def index_github_repos( task_name="github_repos_indexing", source="connector_indexing_task", message=f"Starting GitHub repositories indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} ) documents_processed = 0 @@ -1007,7 +1007,7 @@ async def index_linear_issues( task_name="linear_issues_indexing", source="connector_indexing_task", message=f"Starting Linear issues indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} ) try: @@ -1315,7 +1315,7 @@ async def index_discord_messages( task_name="discord_messages_indexing", source="connector_indexing_task", message=f"Starting Discord messages indexing for connector {connector_id}", - metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} + metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} ) try: diff --git a/surfsense_backend/app/tasks/podcast_tasks.py b/surfsense_backend/app/tasks/podcast_tasks.py index a6be546..f4907af 100644 --- a/surfsense_backend/app/tasks/podcast_tasks.py +++ b/surfsense_backend/app/tasks/podcast_tasks.py @@ -2,8 +2,10 @@ from app.agents.podcaster.graph import graph as podcaster_graph from app.agents.podcaster.state import State from app.db import Chat, Podcast +from app.services.task_logging_service import TaskLoggingService from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.exc import SQLAlchemyError async def generate_document_podcast( @@ -24,73 +26,177 @@ async def generate_chat_podcast( podcast_title: str, user_id: int ): - # Fetch the chat with the specified ID - query = select(Chat).filter( - Chat.id == chat_id, - Chat.search_space_id == search_space_id - ) + task_logger = TaskLoggingService(session, search_space_id) - result = await session.execute(query) - chat = result.scalars().first() - - if not chat: - raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}") - - # Create chat history structure - chat_history_str = "" - - for message in chat.messages: - if message["role"] == "user": - chat_history_str += f"{message['content']}" - elif message["role"] == "assistant": - # Last annotation type will always be "ANSWER" here - answer_annotation = message["annotations"][-1] - answer_text = "" - if answer_annotation["type"] == "ANSWER": - answer_text = answer_annotation["content"] - # If content is a list, join it into a single string - if isinstance(answer_text, list): - answer_text = "\n".join(answer_text) - chat_history_str += f"{answer_text}" - - chat_history_str += "" - - # Pass it to the SurfSense Podcaster - config = { - "configurable": { - "podcast_title": "SurfSense", - "user_id": str(user_id), + # Log task start + log_entry = await task_logger.log_task_start( + task_name="generate_chat_podcast", + source="podcast_task", + message=f"Starting podcast generation for chat {chat_id}", + metadata={ + "chat_id": chat_id, + "search_space_id": search_space_id, + "podcast_title": podcast_title, + "user_id": str(user_id) } - } - # Initialize state with database session and streaming service - initial_state = State( - source_content=chat_history_str, - db_session=session ) - # Run the graph directly - result = await podcaster_graph.ainvoke(initial_state, config=config) - - # Convert podcast transcript entries to serializable format - serializable_transcript = [] - for entry in result["podcast_transcript"]: - serializable_transcript.append({ - "speaker_id": entry.speaker_id, - "dialog": entry.dialog - }) - - # Create a new podcast entry - podcast = Podcast( - title=f"{podcast_title}", - podcast_transcript=serializable_transcript, - file_location=result["final_podcast_file_path"], - search_space_id=search_space_id - ) - - # Add to session and commit - session.add(podcast) - await session.commit() - await session.refresh(podcast) - - return podcast + try: + # Fetch the chat with the specified ID + await task_logger.log_task_progress( + log_entry, + f"Fetching chat {chat_id} from database", + {"stage": "fetch_chat"} + ) + + query = select(Chat).filter( + Chat.id == chat_id, + Chat.search_space_id == search_space_id + ) + + result = await session.execute(query) + chat = result.scalars().first() + + if not chat: + await task_logger.log_task_failure( + log_entry, + f"Chat with id {chat_id} not found in search space {search_space_id}", + "Chat not found", + {"error_type": "ChatNotFound"} + ) + raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}") + + # Create chat history structure + await task_logger.log_task_progress( + log_entry, + f"Processing chat history for chat {chat_id}", + {"stage": "process_chat_history", "message_count": len(chat.messages)} + ) + + chat_history_str = "" + + processed_messages = 0 + for message in chat.messages: + if message["role"] == "user": + chat_history_str += f"{message['content']}" + processed_messages += 1 + elif message["role"] == "assistant": + # Last annotation type will always be "ANSWER" here + answer_annotation = message["annotations"][-1] + answer_text = "" + if answer_annotation["type"] == "ANSWER": + answer_text = answer_annotation["content"] + # If content is a list, join it into a single string + if isinstance(answer_text, list): + answer_text = "\n".join(answer_text) + chat_history_str += f"{answer_text}" + processed_messages += 1 + + chat_history_str += "" + + # Pass it to the SurfSense Podcaster + await task_logger.log_task_progress( + log_entry, + f"Initializing podcast generation for chat {chat_id}", + {"stage": "initialize_podcast_generation", "processed_messages": processed_messages, "content_length": len(chat_history_str)} + ) + + config = { + "configurable": { + "podcast_title": "SurfSense", + "user_id": str(user_id), + } + } + # Initialize state with database session and streaming service + initial_state = State( + source_content=chat_history_str, + db_session=session + ) + + # Run the graph directly + await task_logger.log_task_progress( + log_entry, + f"Running podcast generation graph for chat {chat_id}", + {"stage": "run_podcast_graph"} + ) + + result = await podcaster_graph.ainvoke(initial_state, config=config) + + # Convert podcast transcript entries to serializable format + await task_logger.log_task_progress( + log_entry, + f"Processing podcast transcript for chat {chat_id}", + {"stage": "process_transcript", "transcript_entries": len(result["podcast_transcript"])} + ) + + serializable_transcript = [] + for entry in result["podcast_transcript"]: + serializable_transcript.append({ + "speaker_id": entry.speaker_id, + "dialog": entry.dialog + }) + + # Create a new podcast entry + await task_logger.log_task_progress( + log_entry, + f"Creating podcast database entry for chat {chat_id}", + {"stage": "create_podcast_entry", "file_location": result.get("final_podcast_file_path")} + ) + + podcast = Podcast( + title=f"{podcast_title}", + podcast_transcript=serializable_transcript, + file_location=result["final_podcast_file_path"], + search_space_id=search_space_id + ) + + # Add to session and commit + session.add(podcast) + await session.commit() + await session.refresh(podcast) + + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully generated podcast for chat {chat_id}", + { + "podcast_id": podcast.id, + "podcast_title": podcast_title, + "transcript_entries": len(serializable_transcript), + "file_location": result.get("final_podcast_file_path"), + "processed_messages": processed_messages, + "content_length": len(chat_history_str) + } + ) + + return podcast + + except ValueError as ve: + # ValueError is already logged above for chat not found + if "not found" not in str(ve): + await task_logger.log_task_failure( + log_entry, + f"Value error during podcast generation for chat {chat_id}", + str(ve), + {"error_type": "ValueError"} + ) + raise ve + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during podcast generation for chat {chat_id}", + str(db_error), + {"error_type": "SQLAlchemyError"} + ) + raise db_error + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Unexpected error during podcast generation for chat {chat_id}", + str(e), + {"error_type": type(e).__name__} + ) + raise RuntimeError(f"Failed to generate podcast for chat {chat_id}: {str(e)}") diff --git a/surfsense_web/app/dashboard/[search_space_id]/chats/chats-client.tsx b/surfsense_web/app/dashboard/[search_space_id]/chats/chats-client.tsx index 7b14c7e..45f6d46 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/chats/chats-client.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/chats/chats-client.tsx @@ -329,7 +329,7 @@ export default function ChatsPageClient({ searchSpaceId }: ChatsPageClientProps) // Helper to finish the podcast generation process const finishPodcastGeneration = () => { - toast.success("All podcasts are being generated! Check the podcasts tab to see them when ready."); + toast.success("All podcasts are being generated! Check the logs tab to see their status."); setPodcastDialogOpen(false); setSelectedChats([]); setSelectionMode(false);