From 0bf43571ba390c45ef73bd99e8d3b5bd9351da45 Mon Sep 17 00:00:00 2001 From: "DESKTOP-RTLN3BA\\$punk" Date: Fri, 16 May 2025 01:51:03 -0700 Subject: [PATCH] feat: Basic Streaming --- .../app/agents/researcher/nodes.py | 97 +++++++++++++------ surfsense_backend/app/utils/query_service.py | 2 + 2 files changed, 71 insertions(+), 28 deletions(-) diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 644ddd9..fcec440 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -520,6 +520,10 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW answer_outline = state.answer_outline streaming_service = state.streaming_service + # Initialize a dictionary to track content for all sections + # This is used to maintain section content while streaming multiple sections + section_contents = {} + streaming_service.only_update_terminal(f"🚀 Starting to process research sections...") writer({"yeild_value": streaming_service._format_annotations()}) @@ -578,7 +582,6 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW # Log the error and continue with an empty list of documents # This allows the process to continue, but the report might lack information relevant_documents = [] - # Consider adding more robust error handling or reporting if needed print(f"Fetched {len(relevant_documents)} relevant documents for all sections") streaming_service.only_update_terminal(f"✨ Starting to draft {len(answer_outline.answer_outline)} sections using {len(relevant_documents)} relevant document chunks") @@ -597,8 +600,16 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW else: sub_section_type = SubSectionType.MIDDLE + # Initialize the section_contents entry for this section + section_contents[i] = { + "title": section.section_title, + "content": "", + "index": i + } + section_tasks.append( process_section_with_documents( + section_id=i, section_title=section.section_title, section_questions=section.questions, user_query=configuration.user_query, @@ -607,7 +618,8 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW relevant_documents=relevant_documents, state=state, writer=writer, - sub_section_type=sub_section_type + sub_section_type=sub_section_type, + section_contents=section_contents ) ) @@ -649,28 +661,15 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW streaming_service.only_update_terminal("🎉 Final research report generated successfully!") writer({"yeild_value": streaming_service._format_annotations()}) - if hasattr(state, 'streaming_service') and state.streaming_service: - # Convert the final report to the expected format for UI: - # A list of strings where empty strings represent line breaks - formatted_report = [] - for section in final_report: - if section == "\n": - # Add an empty string for line breaks - formatted_report.append("") - else: - # Split any multiline content by newlines and add each line - section_lines = section.split("\n") - formatted_report.extend(section_lines) - - state.streaming_service.only_update_answer(formatted_report) - writer({"yeild_value": state.streaming_service._format_annotations()}) - + # Skip the final update since we've been streaming incremental updates + # The final answer from each section is already shown in the UI return { "final_written_report": final_written_report } async def process_section_with_documents( + section_id: int, section_title: str, section_questions: List[str], user_id: str, @@ -679,12 +678,14 @@ async def process_section_with_documents( user_query: str, state: State = None, writer: StreamWriter = None, - sub_section_type: SubSectionType = SubSectionType.MIDDLE + sub_section_type: SubSectionType = SubSectionType.MIDDLE, + section_contents: Dict[int, Dict[str, Any]] = None ) -> str: """ Process a single section using pre-fetched documents. Args: + section_id: The ID of the section section_title: The title of the section section_questions: List of research questions for this section user_id: The user ID @@ -692,6 +693,8 @@ async def process_section_with_documents( relevant_documents: Pre-fetched documents to use for this section state: The current state writer: StreamWriter for sending progress updates + sub_section_type: The type of section (start, middle, end) + section_contents: Dictionary to track content across multiple sections Returns: The written section content @@ -738,23 +741,61 @@ async def process_section_with_documents( "chat_history": state.chat_history } - # Invoke the sub-section writer graph + # Invoke the sub-section writer graph with streaming print(f"Invoking sub_section_writer for: {section_title}") if state and state.streaming_service and writer: state.streaming_service.only_update_terminal(f"🧠 Analyzing information and drafting content for section: \"{section_title}\"") writer({"yeild_value": state.streaming_service._format_annotations()}) - - result = await sub_section_writer_graph.ainvoke(sub_state, config) - # Return the final answer from the sub_section_writer - final_answer = result.get("final_answer", "No content was generated for this section.") + # Variables to track streaming state + complete_content = "" # Tracks the complete content received so far - # Send section content update via streaming if available + async for chunk_type, chunk in sub_section_writer_graph.astream(sub_state, config, stream_mode=["values"]): + if "final_answer" in chunk: + new_content = chunk["final_answer"] + if new_content and new_content != complete_content: + # Extract only the new content (delta) + delta = new_content[len(complete_content):] + + # Update what we've processed so far + complete_content = new_content + + # Only stream if there's actual new content + if delta and state and state.streaming_service and writer: + # Update terminal with real-time progress indicator + state.streaming_service.only_update_terminal(f"✍️ Writing section {section_id+1}... ({len(complete_content.split())} words)") + + # Update section_contents with just the new delta + section_contents[section_id]["content"] += delta + + # Build UI-friendly content for all sections + complete_answer = [] + for i in range(len(section_contents)): + if i in section_contents and section_contents[i]["content"]: + # Add section header + complete_answer.append(f"# {section_contents[i]['title']}") + complete_answer.append("") # Empty line after title + + # Add section content + content_lines = section_contents[i]["content"].split("\n") + complete_answer.extend(content_lines) + complete_answer.append("") # Empty line after content + + # Update answer in UI in real-time + state.streaming_service.only_update_answer(complete_answer) + writer({"yeild_value": state.streaming_service._format_annotations()}) + + # Set default if no content was received + if not complete_content: + complete_content = "No content was generated for this section." + section_contents[section_id]["content"] = complete_content + + # Final terminal update if state and state.streaming_service and writer: - state.streaming_service.only_update_terminal(f"✅ Completed writing section: \"{section_title}\"") + state.streaming_service.only_update_terminal(f"✅ Completed section: \"{section_title}\"") writer({"yeild_value": state.streaming_service._format_annotations()}) - - return final_answer + + return complete_content except Exception as e: print(f"Error processing section '{section_title}': {str(e)}") diff --git a/surfsense_backend/app/utils/query_service.py b/surfsense_backend/app/utils/query_service.py index 588a869..4442c8f 100644 --- a/surfsense_backend/app/utils/query_service.py +++ b/surfsense_backend/app/utils/query_service.py @@ -1,3 +1,4 @@ +import datetime from langchain.schema import HumanMessage, SystemMessage, AIMessage from app.config import config from typing import Any, List, Optional @@ -31,6 +32,7 @@ class QueryService: # Create system message with instructions system_message = SystemMessage( content=f""" + Today's date: {datetime.datetime.now().strftime("%Y-%m-%d")} You are a highly skilled AI assistant specializing in query optimization for advanced research. Your primary objective is to transform a user's initial query into a highly effective search query. This reformulated query will be used to retrieve information from diverse data sources.