Merge pull request #209 from Utkarsh-Patel-13/fix/chat-optimization

Updated Streaming Service to efficiently stream content
This commit is contained in:
Rohan Verma 2025-07-20 16:57:51 +05:30 committed by GitHub
commit b4737008e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 649 additions and 270 deletions

View file

@ -267,8 +267,13 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
streaming_service = state.streaming_service streaming_service = state.streaming_service
streaming_service.only_update_terminal("🔍 Generating answer outline...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🔍 Generating answer outline..."
)
}
)
# Get configuration from runnable config # Get configuration from runnable config
configuration = Configuration.from_runnable_config(config) configuration = Configuration.from_runnable_config(config)
reformulated_query = state.reformulated_query reformulated_query = state.reformulated_query
@ -276,15 +281,19 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
num_sections = configuration.num_sections num_sections = configuration.num_sections
user_id = configuration.user_id user_id = configuration.user_id
streaming_service.only_update_terminal(f"🤔 Planning research approach for: \"{user_query[:100]}...\"") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f'🤔 Planning research approach for: "{user_query[:100]}..."'
)
}
)
# Get user's strategic LLM # Get user's strategic LLM
llm = await get_user_strategic_llm(state.db_session, user_id) llm = await get_user_strategic_llm(state.db_session, user_id)
if not llm: if not llm:
error_message = f"No strategic LLM configured for user {user_id}" error_message = f"No strategic LLM configured for user {user_id}"
streaming_service.only_update_terminal(f"{error_message}", "error") writer({"yield_value": streaming_service.format_error(error_message)})
writer({"yeild_value": streaming_service._format_annotations()})
raise RuntimeError(error_message) raise RuntimeError(error_message)
# Create the human message content # Create the human message content
@ -311,8 +320,13 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
Your output MUST be valid JSON in exactly this format. Do not include any other text or explanation. Your output MUST be valid JSON in exactly this format. Do not include any other text or explanation.
""" """
streaming_service.only_update_terminal("📝 Designing structured outline with AI...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"📝 Designing structured outline with AI..."
)
}
)
# Create messages for the LLM # Create messages for the LLM
messages = [ messages = [
@ -321,8 +335,13 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
] ]
# Call the LLM directly without using structured output # Call the LLM directly without using structured output
streaming_service.only_update_terminal("⚙️ Processing answer structure...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"⚙️ Processing answer structure..."
)
}
)
response = await llm.ainvoke(messages) response = await llm.ainvoke(messages)
@ -344,25 +363,33 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
answer_outline = AnswerOutline(**parsed_data) answer_outline = AnswerOutline(**parsed_data)
total_questions = sum(len(section.questions) for section in answer_outline.answer_outline) total_questions = sum(len(section.questions) for section in answer_outline.answer_outline)
streaming_service.only_update_terminal(f"✅ Successfully generated outline with {len(answer_outline.answer_outline)} sections and {total_questions} research questions!")
writer({"yeild_value": streaming_service._format_annotations()})
print(f"Successfully generated answer outline with {len(answer_outline.answer_outline)} sections") writer(
{
"yield_value": streaming_service.format_terminal_info_delta(
f"✅ Successfully generated outline with {len(answer_outline.answer_outline)} sections and {total_questions} research questions!"
)
}
)
print(
f"Successfully generated answer outline with {len(answer_outline.answer_outline)} sections"
)
# Return state update # Return state update
return {"answer_outline": answer_outline} return {"answer_outline": answer_outline}
else: else:
# If JSON structure not found, raise a clear error # If JSON structure not found, raise a clear error
error_message = f"Could not find valid JSON in LLM response. Raw response: {content}" error_message = (
streaming_service.only_update_terminal(f"{error_message}", "error") f"Could not find valid JSON in LLM response. Raw response: {content}"
writer({"yeild_value": streaming_service._format_annotations()}) )
writer({"yield_value": streaming_service.format_error(error_message)})
raise ValueError(error_message) raise ValueError(error_message)
except (json.JSONDecodeError, ValueError) as e: except (json.JSONDecodeError, ValueError) as e:
# Log the error and re-raise it # Log the error and re-raise it
error_message = f"Error parsing LLM response: {str(e)}" error_message = f"Error parsing LLM response: {str(e)}"
streaming_service.only_update_terminal(f"{error_message}", "error") writer({"yield_value": streaming_service.format_error(error_message)})
writer({"yeild_value": streaming_service._format_annotations()})
print(f"Error parsing LLM response: {str(e)}") print(f"Error parsing LLM response: {str(e)}")
print(f"Raw response: {response.content}") print(f"Raw response: {response.content}")
@ -414,8 +441,13 @@ async def fetch_relevant_documents(
if streaming_service and writer: if streaming_service and writer:
connector_names = [get_connector_friendly_name(connector) for connector in connectors_to_search] connector_names = [get_connector_friendly_name(connector) for connector in connectors_to_search]
connector_names_str = ", ".join(connector_names) connector_names_str = ", ".join(connector_names)
streaming_service.only_update_terminal(f"🔎 Starting research on {len(research_questions)} questions using {connector_names_str} data sources") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🔎 Starting research on {len(research_questions)} questions using {connector_names_str} data sources"
)
}
)
all_raw_documents = [] # Store all raw documents all_raw_documents = [] # Store all raw documents
all_sources = [] # Store all sources all_sources = [] # Store all sources
@ -423,8 +455,13 @@ async def fetch_relevant_documents(
for i, user_query in enumerate(research_questions): for i, user_query in enumerate(research_questions):
# Stream question being researched # Stream question being researched
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"🧠 Researching question {i+1}/{len(research_questions)}: \"{user_query[:100]}...\"") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f'🧠 Researching question {i + 1}/{len(research_questions)}: "{user_query[:100]}..."'
)
}
)
# Use original research question as the query # Use original research question as the query
reformulated_query = user_query reformulated_query = user_query
@ -435,8 +472,13 @@ async def fetch_relevant_documents(
if streaming_service and writer: if streaming_service and writer:
connector_emoji = get_connector_emoji(connector) connector_emoji = get_connector_emoji(connector)
friendly_name = get_connector_friendly_name(connector) friendly_name = get_connector_friendly_name(connector)
streaming_service.only_update_terminal(f"{connector_emoji} Searching {friendly_name} for relevant information...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"{connector_emoji} Searching {friendly_name} for relevant information..."
)
}
)
try: try:
if connector == "YOUTUBE_VIDEO": if connector == "YOUTUBE_VIDEO":
@ -455,8 +497,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"📹 Found {len(youtube_chunks)} YouTube chunks related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"📹 Found {len(youtube_chunks)} YouTube chunks related to your query"
)
}
)
elif connector == "EXTENSION": elif connector == "EXTENSION":
source_object, extension_chunks = await connector_service.search_extension( source_object, extension_chunks = await connector_service.search_extension(
@ -474,8 +521,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"🧩 Found {len(extension_chunks)} Browser Extension chunks related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🧩 Found {len(extension_chunks)} Browser Extension chunks related to your query"
)
}
)
elif connector == "CRAWLED_URL": elif connector == "CRAWLED_URL":
source_object, crawled_urls_chunks = await connector_service.search_crawled_urls( source_object, crawled_urls_chunks = await connector_service.search_crawled_urls(
@ -493,8 +545,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"🌐 Found {len(crawled_urls_chunks)} Web Pages chunks related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🌐 Found {len(crawled_urls_chunks)} Web Pages chunks related to your query"
)
}
)
elif connector == "FILE": elif connector == "FILE":
source_object, files_chunks = await connector_service.search_files( source_object, files_chunks = await connector_service.search_files(
@ -512,9 +569,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"📄 Found {len(files_chunks)} Files chunks related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"📄 Found {len(files_chunks)} Files chunks related to your query"
)
}
)
elif connector == "SLACK_CONNECTOR": elif connector == "SLACK_CONNECTOR":
source_object, slack_chunks = await connector_service.search_slack( source_object, slack_chunks = await connector_service.search_slack(
@ -532,8 +593,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"💬 Found {len(slack_chunks)} Slack messages related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"💬 Found {len(slack_chunks)} Slack messages related to your query"
)
}
)
elif connector == "NOTION_CONNECTOR": elif connector == "NOTION_CONNECTOR":
source_object, notion_chunks = await connector_service.search_notion( source_object, notion_chunks = await connector_service.search_notion(
@ -551,8 +617,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"📘 Found {len(notion_chunks)} Notion pages/blocks related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"📘 Found {len(notion_chunks)} Notion pages/blocks related to your query"
)
}
)
elif connector == "GITHUB_CONNECTOR": elif connector == "GITHUB_CONNECTOR":
source_object, github_chunks = await connector_service.search_github( source_object, github_chunks = await connector_service.search_github(
@ -570,8 +641,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"🐙 Found {len(github_chunks)} GitHub files/issues related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🐙 Found {len(github_chunks)} GitHub files/issues related to your query"
)
}
)
elif connector == "LINEAR_CONNECTOR": elif connector == "LINEAR_CONNECTOR":
source_object, linear_chunks = await connector_service.search_linear( source_object, linear_chunks = await connector_service.search_linear(
@ -589,8 +665,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"📊 Found {len(linear_chunks)} Linear issues related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"📊 Found {len(linear_chunks)} Linear issues related to your query"
)
}
)
elif connector == "TAVILY_API": elif connector == "TAVILY_API":
source_object, tavily_chunks = await connector_service.search_tavily( source_object, tavily_chunks = await connector_service.search_tavily(
@ -606,8 +687,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"🔍 Found {len(tavily_chunks)} Web Search results related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🔍 Found {len(tavily_chunks)} Web Search results related to your query"
)
}
)
elif connector == "LINKUP_API": elif connector == "LINKUP_API":
if top_k > 10: if top_k > 10:
@ -628,8 +714,13 @@ async def fetch_relevant_documents(
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"🔗 Found {len(linkup_chunks)} Linkup results related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🔗 Found {len(linkup_chunks)} Linkup results related to your query"
)
}
)
elif connector == "DISCORD_CONNECTOR": elif connector == "DISCORD_CONNECTOR":
source_object, discord_chunks = await connector_service.search_discord( source_object, discord_chunks = await connector_service.search_discord(
@ -645,9 +736,13 @@ async def fetch_relevant_documents(
all_raw_documents.extend(discord_chunks) all_raw_documents.extend(discord_chunks)
# Stream found document count # Stream found document count
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"🗨️ Found {len(discord_chunks)} Discord messages related to your query") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🗨️ Found {len(discord_chunks)} Discord messages related to your query"
)
}
)
except Exception as e: except Exception as e:
error_message = f"Error searching connector {connector}: {str(e)}" error_message = f"Error searching connector {connector}: {str(e)}"
@ -656,8 +751,13 @@ async def fetch_relevant_documents(
# Stream error message # Stream error message
if streaming_service and writer: if streaming_service and writer:
friendly_name = get_connector_friendly_name(connector) friendly_name = get_connector_friendly_name(connector)
streaming_service.only_update_terminal(f"⚠️ Error searching {friendly_name}: {str(e)}", "error") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_error(
f"Error searching {friendly_name}: {str(e)}"
)
}
)
# Continue with other connectors on error # Continue with other connectors on error
continue continue
@ -700,13 +800,18 @@ async def fetch_relevant_documents(
if streaming_service and writer: if streaming_service and writer:
user_source_count = len(user_selected_sources) if user_selected_sources else 0 user_source_count = len(user_selected_sources) if user_selected_sources else 0
connector_source_count = len(deduplicated_sources) - user_source_count connector_source_count = len(deduplicated_sources) - user_source_count
streaming_service.only_update_terminal(f"📚 Collected {len(deduplicated_sources)} total sources ({user_source_count} user-selected + {connector_source_count} from connectors)") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"📚 Collected {len(deduplicated_sources)} total sources ({user_source_count} user-selected + {connector_source_count} from connectors)"
)
}
)
# After all sources are collected and deduplicated, stream them # After all sources are collected and deduplicated, stream them
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_sources(deduplicated_sources) streaming_service.only_update_sources(deduplicated_sources)
writer({"yeild_value": streaming_service._format_annotations()}) writer({"yield_value": streaming_service._format_annotations()})
# Deduplicate raw documents based on chunk_id or content # Deduplicate raw documents based on chunk_id or content
seen_chunk_ids = set() seen_chunk_ids = set()
@ -730,8 +835,13 @@ async def fetch_relevant_documents(
# Stream info about deduplicated documents # Stream info about deduplicated documents
if streaming_service and writer: if streaming_service and writer:
streaming_service.only_update_terminal(f"🧹 Found {len(deduplicated_docs)} unique document chunks after removing duplicates") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🧹 Found {len(deduplicated_docs)} unique document chunks after removing duplicates"
)
}
)
# Return deduplicated documents # Return deduplicated documents
return deduplicated_docs return deduplicated_docs
@ -757,14 +867,19 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
# This is used to maintain section content while streaming multiple sections # This is used to maintain section content while streaming multiple sections
section_contents = {} section_contents = {}
streaming_service.only_update_terminal(f"🚀 Starting to process research sections...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🚀 Starting to process research sections..."
)
}
)
print(f"Processing sections from outline: {answer_outline is not None}") print(f"Processing sections from outline: {answer_outline is not None}")
if not answer_outline: if not answer_outline:
streaming_service.only_update_terminal("❌ Error: No answer outline was provided. Cannot generate report.", "error") error_message = "No answer outline was provided. Cannot generate report."
writer({"yeild_value": streaming_service._format_annotations()}) writer({"yield_value": streaming_service.format_error(error_message)})
return { return {
"final_written_report": "No answer outline was provided. Cannot generate final report." "final_written_report": "No answer outline was provided. Cannot generate final report."
} }
@ -775,12 +890,22 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
all_questions.extend(section.questions) all_questions.extend(section.questions)
print(f"Collected {len(all_questions)} questions from all sections") print(f"Collected {len(all_questions)} questions from all sections")
streaming_service.only_update_terminal(f"🧩 Found {len(all_questions)} research questions across {len(answer_outline.answer_outline)} sections") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🧩 Found {len(all_questions)} research questions across {len(answer_outline.answer_outline)} sections"
)
}
)
# Fetch relevant documents once for all questions # Fetch relevant documents once for all questions
streaming_service.only_update_terminal("🔍 Searching for relevant information across all connectors...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🔍 Searching for relevant information across all connectors..."
)
}
)
if configuration.num_sections == 1: if configuration.num_sections == 1:
TOP_K = 10 TOP_K = 10
@ -798,8 +923,13 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
try: try:
# First, fetch user-selected documents if any # First, fetch user-selected documents if any
if configuration.document_ids_to_add_in_context: if configuration.document_ids_to_add_in_context:
streaming_service.only_update_terminal(f"📋 Including {len(configuration.document_ids_to_add_in_context)} user-selected documents...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"📋 Including {len(configuration.document_ids_to_add_in_context)} user-selected documents..."
)
}
)
user_selected_sources, user_selected_documents = await fetch_documents_by_ids( user_selected_sources, user_selected_documents = await fetch_documents_by_ids(
document_ids=configuration.document_ids_to_add_in_context, document_ids=configuration.document_ids_to_add_in_context,
@ -808,8 +938,13 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
) )
if user_selected_documents: if user_selected_documents:
streaming_service.only_update_terminal(f"✅ Successfully added {len(user_selected_documents)} user-selected documents to context") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"✅ Successfully added {len(user_selected_documents)} user-selected documents to context"
)
}
)
# Create connector service using state db_session # Create connector service using state db_session
connector_service = ConnectorService(state.db_session, user_id=configuration.user_id) connector_service = ConnectorService(state.db_session, user_id=configuration.user_id)
@ -831,8 +966,7 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
except Exception as e: except Exception as e:
error_message = f"Error fetching relevant documents: {str(e)}" error_message = f"Error fetching relevant documents: {str(e)}"
print(error_message) print(error_message)
streaming_service.only_update_terminal(f"{error_message}", "error") writer({"yield_value": streaming_service.format_error(error_message)})
writer({"yeild_value": streaming_service._format_annotations()})
# Log the error and continue with an empty list of documents # Log the error and continue with an empty list of documents
# This allows the process to continue, but the report might lack information # This allows the process to continue, but the report might lack information
relevant_documents = [] relevant_documents = []
@ -844,13 +978,23 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
print(f"Added {len(user_selected_documents)} user-selected documents for all sections") print(f"Added {len(user_selected_documents)} user-selected documents for all sections")
print(f"Total documents for sections: {len(all_documents)}") print(f"Total documents for sections: {len(all_documents)}")
streaming_service.only_update_terminal(f"✨ Starting to draft {len(answer_outline.answer_outline)} sections using {len(all_documents)} total document chunks ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"✨ Starting to draft {len(answer_outline.answer_outline)} sections using {len(all_documents)} total document chunks ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)"
)
}
)
# Create tasks to process each section in parallel with the same document set # Create tasks to process each section in parallel with the same document set
section_tasks = [] section_tasks = []
streaming_service.only_update_terminal("⚙️ Creating processing tasks for each section...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"⚙️ Creating processing tasks for each section..."
)
}
)
for i, section in enumerate(answer_outline.answer_outline): for i, section in enumerate(answer_outline.answer_outline):
if i == 0: if i == 0:
@ -885,14 +1029,24 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
# Run all section processing tasks in parallel # Run all section processing tasks in parallel
print(f"Running {len(section_tasks)} section processing tasks in parallel") print(f"Running {len(section_tasks)} section processing tasks in parallel")
streaming_service.only_update_terminal(f"⏳ Processing {len(section_tasks)} sections simultaneously...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"⏳ Processing {len(section_tasks)} sections simultaneously..."
)
}
)
section_results = await asyncio.gather(*section_tasks, return_exceptions=True) section_results = await asyncio.gather(*section_tasks, return_exceptions=True)
# Handle any exceptions in the results # Handle any exceptions in the results
streaming_service.only_update_terminal("🧵 Combining section results into final report...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🧵 Combining section results into final report..."
)
}
)
processed_results = [] processed_results = []
for i, result in enumerate(section_results): for i, result in enumerate(section_results):
@ -900,8 +1054,7 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
section_title = answer_outline.answer_outline[i].section_title section_title = answer_outline.answer_outline[i].section_title
error_message = f"Error processing section '{section_title}': {str(result)}" error_message = f"Error processing section '{section_title}': {str(result)}"
print(error_message) print(error_message)
streaming_service.only_update_terminal(f"⚠️ {error_message}", "error") writer({"yield_value": streaming_service.format_error(error_message)})
writer({"yeild_value": streaming_service._format_annotations()})
processed_results.append(error_message) processed_results.append(error_message)
else: else:
processed_results.append(result) processed_results.append(result)
@ -913,16 +1066,25 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
final_report.append(content) final_report.append(content)
final_report.append("\n") final_report.append("\n")
# Stream each section with its title
writer(
{
"yield_value": state.streaming_service.format_text_chunk(f"# {section.section_title}\n\n{content}")
}
)
# Join all sections with newlines # Join all sections with newlines
final_written_report = "\n".join(final_report) final_written_report = "\n".join(final_report)
print(f"Generated final report with {len(final_report)} parts") print(f"Generated final report with {len(final_report)} parts")
streaming_service.only_update_terminal("🎉 Final research report generated successfully!") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
# Skip the final update since we've been streaming incremental updates "🎉 Final research report generated successfully!"
# The final answer from each section is already shown in the UI )
}
)
# Use the shared documents for further question generation # Use the shared documents for further question generation
# Since all sections used the same document pool, we can use it directly # Since all sections used the same document pool, we can use it directly
@ -969,15 +1131,25 @@ async def process_section_with_documents(
# Send status update via streaming if available # Send status update via streaming if available
if state and state.streaming_service and writer: if state and state.streaming_service and writer:
state.streaming_service.only_update_terminal(f"📝 Writing section: \"{section_title}\" with {len(section_questions)} research questions") writer(
writer({"yeild_value": state.streaming_service._format_annotations()}) {
"yield_value": state.streaming_service.format_terminal_info_delta(
f'📝 Writing section: "{section_title}" with {len(section_questions)} research questions'
)
}
)
# Fallback if no documents found # Fallback if no documents found
if not documents_to_use: if not documents_to_use:
print(f"No relevant documents found for section: {section_title}") print(f"No relevant documents found for section: {section_title}")
if state and state.streaming_service and writer: if state and state.streaming_service and writer:
state.streaming_service.only_update_terminal(f"⚠️ Warning: No relevant documents found for section: \"{section_title}\"", "warning") writer(
writer({"yeild_value": state.streaming_service._format_annotations()}) {
"yield_value": state.streaming_service.format_error(
f'Warning: No relevant documents found for section: "{section_title}"'
)
}
)
documents_to_use = [ documents_to_use = [
{"content": f"No specific information was found for: {question}"} {"content": f"No specific information was found for: {question}"}
@ -993,7 +1165,7 @@ async def process_section_with_documents(
"user_query": user_query, "user_query": user_query,
"relevant_documents": documents_to_use, "relevant_documents": documents_to_use,
"user_id": user_id, "user_id": user_id,
"search_space_id": search_space_id "search_space_id": search_space_id,
} }
} }
@ -1006,8 +1178,13 @@ async def process_section_with_documents(
# Invoke the sub-section writer graph with streaming # Invoke the sub-section writer graph with streaming
print(f"Invoking sub_section_writer for: {section_title}") print(f"Invoking sub_section_writer for: {section_title}")
if state and state.streaming_service and writer: if state and state.streaming_service and writer:
state.streaming_service.only_update_terminal(f"🧠 Analyzing information and drafting content for section: \"{section_title}\"") writer(
writer({"yeild_value": state.streaming_service._format_annotations()}) {
"yield_value": state.streaming_service.format_terminal_info_delta(
f'🧠 Analyzing information and drafting content for section: "{section_title}"'
)
}
)
# Variables to track streaming state # Variables to track streaming state
complete_content = "" # Tracks the complete content received so far complete_content = "" # Tracks the complete content received so far
@ -1025,7 +1202,13 @@ async def process_section_with_documents(
# Only stream if there's actual new content # Only stream if there's actual new content
if delta and state and state.streaming_service and writer: if delta and state and state.streaming_service and writer:
# Update terminal with real-time progress indicator # Update terminal with real-time progress indicator
state.streaming_service.only_update_terminal(f"✍️ Writing section {section_id+1}... ({len(complete_content.split())} words)") writer(
{
"yield_value": state.streaming_service.format_terminal_info_delta(
f"✍️ Writing section {section_id + 1}... ({len(complete_content.split())} words)"
)
}
)
# Update section_contents with just the new delta # Update section_contents with just the new delta
section_contents[section_id]["content"] += delta section_contents[section_id]["content"] += delta
@ -1043,9 +1226,6 @@ async def process_section_with_documents(
complete_answer.extend(content_lines) complete_answer.extend(content_lines)
complete_answer.append("") # Empty line after content complete_answer.append("") # Empty line after content
# Update answer in UI in real-time
state.streaming_service.only_update_answer(complete_answer)
writer({"yeild_value": state.streaming_service._format_annotations()})
# Set default if no content was received # Set default if no content was received
if not complete_content: if not complete_content:
@ -1054,8 +1234,13 @@ async def process_section_with_documents(
# Final terminal update # Final terminal update
if state and state.streaming_service and writer: if state and state.streaming_service and writer:
state.streaming_service.only_update_terminal(f"✅ Completed section: \"{section_title}\"") writer(
writer({"yeild_value": state.streaming_service._format_annotations()}) {
"yield_value": state.streaming_service.format_terminal_info_delta(
f'✅ Completed section: "{section_title}"'
)
}
)
return complete_content return complete_content
except Exception as e: except Exception as e:
@ -1063,8 +1248,13 @@ async def process_section_with_documents(
# Send error update via streaming if available # Send error update via streaming if available
if state and state.streaming_service and writer: if state and state.streaming_service and writer:
state.streaming_service.only_update_terminal(f"❌ Error processing section \"{section_title}\": {str(e)}", "error") writer(
writer({"yeild_value": state.streaming_service._format_annotations()}) {
"yield_value": state.streaming_service.format_error(
f'Error processing section "{section_title}": {str(e)}'
)
}
)
return f"Error processing section: {section_title}. Details: {str(e)}" return f"Error processing section: {section_title}. Details: {str(e)}"
@ -1103,15 +1293,30 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
reformulated_query = state.reformulated_query reformulated_query = state.reformulated_query
user_query = configuration.user_query user_query = configuration.user_query
streaming_service.only_update_terminal("🤔 Starting Q&A research workflow...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🤔 Starting Q&A research workflow..."
)
}
)
streaming_service.only_update_terminal(f"🔍 Researching: \"{user_query[:100]}...\"") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f'🔍 Researching: "{user_query[:100]}..."'
)
}
)
# Fetch relevant documents for the QNA query # Fetch relevant documents for the QNA query
streaming_service.only_update_terminal("🔍 Searching for relevant information across all connectors...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🔍 Searching for relevant information across all connectors..."
)
}
)
# Use a reasonable top_k for QNA - not too many documents to avoid overwhelming the LLM # Use a reasonable top_k for QNA - not too many documents to avoid overwhelming the LLM
TOP_K = 15 TOP_K = 15
@ -1123,8 +1328,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
try: try:
# First, fetch user-selected documents if any # First, fetch user-selected documents if any
if configuration.document_ids_to_add_in_context: if configuration.document_ids_to_add_in_context:
streaming_service.only_update_terminal(f"📋 Including {len(configuration.document_ids_to_add_in_context)} user-selected documents...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"📋 Including {len(configuration.document_ids_to_add_in_context)} user-selected documents..."
)
}
)
user_selected_sources, user_selected_documents = await fetch_documents_by_ids( user_selected_sources, user_selected_documents = await fetch_documents_by_ids(
document_ids=configuration.document_ids_to_add_in_context, document_ids=configuration.document_ids_to_add_in_context,
@ -1133,8 +1343,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
) )
if user_selected_documents: if user_selected_documents:
streaming_service.only_update_terminal(f"✅ Successfully added {len(user_selected_documents)} user-selected documents to context") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"✅ Successfully added {len(user_selected_documents)} user-selected documents to context"
)
}
)
# Create connector service using state db_session # Create connector service using state db_session
connector_service = ConnectorService(state.db_session, user_id=configuration.user_id) connector_service = ConnectorService(state.db_session, user_id=configuration.user_id)
@ -1159,8 +1374,7 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
except Exception as e: except Exception as e:
error_message = f"Error fetching relevant documents for QNA: {str(e)}" error_message = f"Error fetching relevant documents for QNA: {str(e)}"
print(error_message) print(error_message)
streaming_service.only_update_terminal(f"{error_message}", "error") writer({"yield_value": streaming_service.format_error(error_message)})
writer({"yeild_value": streaming_service._format_annotations()})
# Continue with empty documents - the QNA agent will handle this gracefully # Continue with empty documents - the QNA agent will handle this gracefully
relevant_documents = [] relevant_documents = []
@ -1171,8 +1385,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
print(f"Added {len(user_selected_documents)} user-selected documents for QNA") print(f"Added {len(user_selected_documents)} user-selected documents for QNA")
print(f"Total documents for QNA: {len(all_documents)}") print(f"Total documents for QNA: {len(all_documents)}")
streaming_service.only_update_terminal(f"🧠 Generating comprehensive answer using {len(all_documents)} total sources ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
f"🧠 Generating comprehensive answer using {len(all_documents)} total sources ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)..."
)
}
)
# Prepare configuration for the QNA agent # Prepare configuration for the QNA agent
qna_config = { qna_config = {
@ -1192,8 +1411,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
} }
try: try:
streaming_service.only_update_terminal("✍️ Writing comprehensive answer with citations...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"✍️ Writing comprehensive answer with citations..."
)
}
)
# Track streaming content for real-time updates # Track streaming content for real-time updates
complete_content = "" complete_content = ""
@ -1212,12 +1436,17 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
if delta: if delta:
# Update terminal with progress # Update terminal with progress
word_count = len(complete_content.split()) word_count = len(complete_content.split())
streaming_service.only_update_terminal(f"✍️ Writing answer... ({word_count} words)") writer(
{
"yield_value": streaming_service.format_terminal_info_delta(
f"✍️ Writing answer... ({word_count} words)"
)
}
)
# Update the answer in real-time writer(
answer_lines = complete_content.split("\n") {"yield_value": streaming_service.format_text_chunk(delta)}
streaming_service.only_update_answer(answer_lines) )
writer({"yeild_value": streaming_service._format_annotations()})
# Capture reranked documents from QNA agent for further question generation # Capture reranked documents from QNA agent for further question generation
if "reranked_documents" in chunk: if "reranked_documents" in chunk:
@ -1227,8 +1456,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
if not complete_content: if not complete_content:
complete_content = "I couldn't find relevant information in your knowledge base to answer this question." complete_content = "I couldn't find relevant information in your knowledge base to answer this question."
streaming_service.only_update_terminal("🎉 Q&A answer generated successfully!") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🎉 Q&A answer generated successfully!"
)
}
)
# Return the final answer and captured reranked documents for further question generation # Return the final answer and captured reranked documents for further question generation
return { return {
@ -1239,12 +1473,9 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
except Exception as e: except Exception as e:
error_message = f"Error generating QNA answer: {str(e)}" error_message = f"Error generating QNA answer: {str(e)}"
print(error_message) print(error_message)
streaming_service.only_update_terminal(f"{error_message}", "error") writer({"yield_value": streaming_service.format_error(error_message)})
writer({"yeild_value": streaming_service._format_annotations()})
return { return {"final_written_report": f"Error generating answer: {str(e)}"}
"final_written_report": f"Error generating answer: {str(e)}"
}
async def generate_further_questions(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]: async def generate_further_questions(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]:
@ -1269,19 +1500,23 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
# Get reranked documents from the state (will be populated by sub-agents) # Get reranked documents from the state (will be populated by sub-agents)
reranked_documents = getattr(state, 'reranked_documents', None) or [] reranked_documents = getattr(state, 'reranked_documents', None) or []
streaming_service.only_update_terminal("🤔 Generating follow-up questions...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🤔 Generating follow-up questions..."
)
}
)
# Get user's fast LLM # Get user's fast LLM
llm = await get_user_fast_llm(state.db_session, user_id) llm = await get_user_fast_llm(state.db_session, user_id)
if not llm: if not llm:
error_message = f"No fast LLM configured for user {user_id}" error_message = f"No fast LLM configured for user {user_id}"
print(error_message) print(error_message)
streaming_service.only_update_terminal(f"{error_message}", "error") writer({"yield_value": streaming_service.format_error(error_message)})
# Stream empty further questions to UI # Stream empty further questions to UI
streaming_service.only_update_further_questions([]) writer({"yield_value": streaming_service.format_further_questions_delta([])})
writer({"yeild_value": streaming_service._format_annotations()})
return {"further_questions": []} return {"further_questions": []}
# Format chat history for the prompt # Format chat history for the prompt
@ -1339,8 +1574,13 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
Do not include any other text or explanation. Only return the JSON. Do not include any other text or explanation. Only return the JSON.
""" """
streaming_service.only_update_terminal("🧠 Analyzing conversation context to suggest relevant questions...") writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_terminal_info_delta(
"🧠 Analyzing conversation context to suggest relevant questions..."
)
}
)
# Create messages for the LLM # Create messages for the LLM
messages = [ messages = [
@ -1367,46 +1607,66 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
# Extract the further_questions array # Extract the further_questions array
further_questions = parsed_data.get("further_questions", []) further_questions = parsed_data.get("further_questions", [])
streaming_service.only_update_terminal(f"✅ Generated {len(further_questions)} contextual follow-up questions!") writer(
{
"yield_value": streaming_service.format_terminal_info_delta(
f"✅ Generated {len(further_questions)} contextual follow-up questions!"
)
}
)
# Stream the further questions to the UI # Stream the further questions to the UI
streaming_service.only_update_further_questions(further_questions) writer(
writer({"yeild_value": streaming_service._format_annotations()}) {
"yield_value": streaming_service.format_further_questions_delta(
further_questions
)
}
)
print(f"Successfully generated {len(further_questions)} further questions") print(f"Successfully generated {len(further_questions)} further questions")
return {"further_questions": further_questions} return {"further_questions": further_questions}
else: else:
# If JSON structure not found, return empty list # If JSON structure not found, return empty list
error_message = "Could not find valid JSON in LLM response for further questions" error_message = (
"Could not find valid JSON in LLM response for further questions"
)
print(error_message) print(error_message)
streaming_service.only_update_terminal(f"⚠️ {error_message}", "warning") writer(
{
"yield_value": streaming_service.format_error(
f"Warning: {error_message}"
)
}
)
# Stream empty further questions to UI # Stream empty further questions to UI
streaming_service.only_update_further_questions([]) writer(
writer({"yeild_value": streaming_service._format_annotations()}) {"yield_value": streaming_service.format_further_questions_delta([])}
)
return {"further_questions": []} return {"further_questions": []}
except (json.JSONDecodeError, ValueError) as e: except (json.JSONDecodeError, ValueError) as e:
# Log the error and return empty list # Log the error and return empty list
error_message = f"Error parsing further questions response: {str(e)}" error_message = f"Error parsing further questions response: {str(e)}"
print(error_message) print(error_message)
streaming_service.only_update_terminal(f"⚠️ {error_message}", "warning") writer(
{"yield_value": streaming_service.format_error(f"Warning: {error_message}")}
)
# Stream empty further questions to UI # Stream empty further questions to UI
streaming_service.only_update_further_questions([]) writer({"yield_value": streaming_service.format_further_questions_delta([])})
writer({"yeild_value": streaming_service._format_annotations()})
return {"further_questions": []} return {"further_questions": []}
except Exception as e: except Exception as e:
# Handle any other errors # Handle any other errors
error_message = f"Error generating further questions: {str(e)}" error_message = f"Error generating further questions: {str(e)}"
print(error_message) print(error_message)
streaming_service.only_update_terminal(f"⚠️ {error_message}", "warning") writer(
{"yield_value": streaming_service.format_error(f"Warning: {error_message}")}
)
# Stream empty further questions to UI # Stream empty further questions to UI
streaming_service.only_update_further_questions([]) writer({"yield_value": streaming_service.format_further_questions_delta([])})
writer({"yeild_value": streaming_service._format_annotations()})
return {"further_questions": []} return {"further_questions": []}

View file

@ -54,32 +54,23 @@ async def handle_chat_data(
if message['role'] == "user": if message['role'] == "user":
langchain_chat_history.append(HumanMessage(content=message['content'])) langchain_chat_history.append(HumanMessage(content=message['content']))
elif message['role'] == "assistant": elif message['role'] == "assistant":
# Find the last "ANSWER" annotation specifically langchain_chat_history.append(AIMessage(content=message['content']))
answer_annotation = None
for annotation in reversed(message['annotations']):
if annotation['type'] == "ANSWER":
answer_annotation = annotation
break
if answer_annotation: response = StreamingResponse(
answer_text = answer_annotation['content'] stream_connector_search_results(
# If content is a list, join it into a single string user_query,
if isinstance(answer_text, list): user.id,
answer_text = "\n".join(answer_text) search_space_id,
langchain_chat_history.append(AIMessage(content=answer_text)) session,
research_mode,
selected_connectors,
langchain_chat_history,
search_mode_str,
document_ids_to_add_in_context,
)
)
response = StreamingResponse(stream_connector_search_results( response.headers["x-vercel-ai-data-stream"] = "v1"
user_query,
user.id,
search_space_id, # Already converted to int in lines 32-37
session,
research_mode,
selected_connectors,
langchain_chat_history,
search_mode_str,
document_ids_to_add_in_context
))
response.headers['x-vercel-ai-data-stream'] = 'v1'
return response return response

View file

@ -23,17 +23,138 @@ class StreamingService:
"content": [] "content": []
} }
] ]
# It is used to send annotations to the frontend
# DEPRECATED: This sends the full annotation array every time (inefficient)
def _format_annotations(self) -> str: def _format_annotations(self) -> str:
""" """
Format the annotations as a string Format the annotations as a string
DEPRECATED: This method sends the full annotation state every time.
Use the delta formatters instead for optimal streaming.
Returns: Returns:
str: The formatted annotations string str: The formatted annotations string
""" """
return f'8:{json.dumps(self.message_annotations)}\n' return f'8:{json.dumps(self.message_annotations)}\n'
# It is used to end Streaming def format_terminal_info_delta(self, text: str, message_type: str = "info") -> str:
"""
Format a single terminal info message as a delta annotation
Args:
text: The terminal message text
message_type: The message type (info, error, success, etc.)
Returns:
str: The formatted annotation delta string
"""
message = {"id": self.terminal_idx, "text": text, "type": message_type}
self.terminal_idx += 1
# Update internal state for reference
self.message_annotations[0]["content"].append(message)
# Return only the delta annotation
annotation = {"type": "TERMINAL_INFO", "content": [message]}
return f"8:[{json.dumps(annotation)}]\n"
def format_sources_delta(self, sources: List[Dict[str, Any]]) -> str:
"""
Format sources as a delta annotation
Args:
sources: List of source objects
Returns:
str: The formatted annotation delta string
"""
# Update internal state
self.message_annotations[1]["content"] = sources
# Return only the delta annotation
annotation = {"type": "SOURCES", "content": sources}
return f"8:[{json.dumps(annotation)}]\n"
def format_answer_delta(self, answer_chunk: str) -> str:
"""
Format a single answer chunk as a delta annotation
Args:
answer_chunk: The new answer chunk to add
Returns:
str: The formatted annotation delta string
"""
# Update internal state by appending the chunk
if isinstance(self.message_annotations[2]["content"], list):
self.message_annotations[2]["content"].append(answer_chunk)
else:
self.message_annotations[2]["content"] = [answer_chunk]
# Return only the delta annotation with the new chunk
annotation = {"type": "ANSWER", "content": [answer_chunk]}
return f"8:[{json.dumps(annotation)}]\n"
def format_answer_annotation(self, answer_lines: List[str]) -> str:
"""
Format the complete answer as a replacement annotation
Args:
answer_lines: Complete list of answer lines
Returns:
str: The formatted annotation string
"""
# Update internal state
self.message_annotations[2]["content"] = answer_lines
# Return the full answer annotation
annotation = {"type": "ANSWER", "content": answer_lines}
return f"8:[{json.dumps(annotation)}]\n"
def format_further_questions_delta(
self, further_questions: List[Dict[str, Any]]
) -> str:
"""
Format further questions as a delta annotation
Args:
further_questions: List of further question objects
Returns:
str: The formatted annotation delta string
"""
# Update internal state
self.message_annotations[3]["content"] = further_questions
# Return only the delta annotation
annotation = {"type": "FURTHER_QUESTIONS", "content": further_questions}
return f"8:[{json.dumps(annotation)}]\n"
def format_text_chunk(self, text: str) -> str:
"""
Format a text chunk using the text stream part
Args:
text: The text chunk to stream
Returns:
str: The formatted text part string
"""
return f"0:{json.dumps(text)}\n"
def format_error(self, error_message: str) -> str:
"""
Format an error using the error stream part
Args:
error_message: The error message
Returns:
str: The formatted error part string
"""
return f"3:{json.dumps(error_message)}\n"
def format_completion(self, prompt_tokens: int = 156, completion_tokens: int = 204) -> str: def format_completion(self, prompt_tokens: int = 156, completion_tokens: int = 204) -> str:
""" """
Format a completion message Format a completion message
@ -56,7 +177,12 @@ class StreamingService:
} }
return f'd:{json.dumps(completion_data)}\n' return f'd:{json.dumps(completion_data)}\n'
# DEPRECATED METHODS: Keep for backward compatibility but mark as deprecated
def only_update_terminal(self, text: str, message_type: str = "info") -> str: def only_update_terminal(self, text: str, message_type: str = "info") -> str:
"""
DEPRECATED: Use format_terminal_info_delta() instead for optimal streaming
"""
self.message_annotations[0]["content"].append({ self.message_annotations[0]["content"].append({
"id": self.terminal_idx, "id": self.terminal_idx,
"text": text, "text": text,
@ -66,16 +192,22 @@ class StreamingService:
return self.message_annotations return self.message_annotations
def only_update_sources(self, sources: List[Dict[str, Any]]) -> str: def only_update_sources(self, sources: List[Dict[str, Any]]) -> str:
"""
DEPRECATED: Use format_sources_delta() instead for optimal streaming
"""
self.message_annotations[1]["content"] = sources self.message_annotations[1]["content"] = sources
return self.message_annotations return self.message_annotations
def only_update_answer(self, answer: List[str]) -> str: def only_update_answer(self, answer: List[str]) -> str:
"""
DEPRECATED: Use format_answer_delta() or format_answer_annotation() instead for optimal streaming
"""
self.message_annotations[2]["content"] = answer self.message_annotations[2]["content"] = answer
return self.message_annotations return self.message_annotations
def only_update_further_questions(self, further_questions: List[Dict[str, Any]]) -> str: def only_update_further_questions(self, further_questions: List[Dict[str, Any]]) -> str:
""" """
Update the further questions annotation DEPRECATED: Use format_further_questions_delta() instead for optimal streaming
Args: Args:
further_questions: List of further question objects with id and question fields further_questions: List of further question objects with id and question fields

View file

@ -83,9 +83,8 @@ async def stream_connector_search_results(
config=config, config=config,
stream_mode="custom", stream_mode="custom",
): ):
# If the chunk contains a 'yeild_value' key, print its value if isinstance(chunk, dict):
# Note: there's a typo in 'yeild_value' in the code, but we need to match it if "yield_value" in chunk:
if isinstance(chunk, dict) and 'yeild_value' in chunk: yield chunk["yield_value"]
yield chunk['yeild_value']
yield streaming_service.format_completion() yield streaming_service.format_completion()

View file

@ -981,19 +981,16 @@ const ChatPage = () => {
const renderTerminalContent = (message: any) => { const renderTerminalContent = (message: any) => {
if (!message.annotations) return null; if (!message.annotations) return null;
// Get all TERMINAL_INFO annotations // Get all TERMINAL_INFO annotations content
const terminalInfoAnnotations = (message.annotations as any[]).filter( const terminalInfoAnnotations = (message.annotations as any[]).map(item => {
(a) => a.type === "TERMINAL_INFO", if(item.type === "TERMINAL_INFO") {
); return item.content.map((a: any) => a.text)
// Get the latest TERMINAL_INFO annotation }
const latestTerminalInfo = }).flat().filter(Boolean)
terminalInfoAnnotations.length > 0
? terminalInfoAnnotations[terminalInfoAnnotations.length - 1]
: null;
// Render the content of the latest TERMINAL_INFO annotation // Render the content of the latest TERMINAL_INFO annotation
return latestTerminalInfo?.content.map((item: any, idx: number) => ( return terminalInfoAnnotations.map((item: any, idx: number) => (
<div key={idx} className="py-0.5 flex items-start text-gray-300"> <div key={idx} className="py-0.5 flex items-start text-gray-300">
<span className="text-gray-500 text-xs mr-2 w-10 flex-shrink-0"> <span className="text-gray-500 text-xs mr-2 w-10 flex-shrink-0">
[{String(idx).padStart(2, "0")}: [{String(idx).padStart(2, "0")}:
@ -1008,7 +1005,7 @@ const ChatPage = () => {
${item.type === "warning" ? "text-yellow-300" : ""} ${item.type === "warning" ? "text-yellow-300" : ""}
`} `}
> >
{item.text} {item}
</span> </span>
</div> </div>
)); ));