chore: add time out for question_confirm (#526)

This commit is contained in:
Wendong-Fan 2025-10-20 04:47:01 +08:00 committed by GitHub
commit e4aa00cf00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -88,7 +88,11 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
assert isinstance(item, ActionImproveData)
question = item.data
if len(question) < 12 and len(options.attaches) == 0:
confirm = await question_confirm(question_agent, question)
messages, confirm = await question_confirm(
question_agent, question, timeout=8.0, task_id=options.task_id
)
for msg in messages:
yield msg
else:
confirm = True
@ -109,7 +113,21 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
sub_tasks = await asyncio.to_thread(workforce.eigent_make_sub_tasks, camel_task)
summary_task_content = await summary_task(summary_task_agent, camel_task)
try:
summary_task_content = await asyncio.wait_for(
summary_task(summary_task_agent, camel_task), timeout=10
)
except asyncio.TimeoutError:
logger.warning(f"summary_task timeout for task {options.task_id}")
# Fallback to a minimal summary to unblock UI
fallback_name = "Task"
content_preview = camel_task.content if hasattr(camel_task, "content") else ""
if content_preview is None:
content_preview = ""
fallback_summary = (
(content_preview[:80] + "...") if len(content_preview) > 80 else content_preview
)
summary_task_content = f"{fallback_name}|{fallback_summary}"
yield to_sub_tasks(camel_task, summary_task_content)
# tracer.stop()
# tracer.save("trace.json")
@ -287,8 +305,27 @@ def add_sub_tasks(camel_task: Task, update_tasks: list[TaskContent]):
)
async def question_confirm(agent: ListenChatAgent, prompt: str) -> str | Literal[True]:
prompt = f"""
async def question_confirm(
agent: ListenChatAgent,
prompt: str,
timeout: float = 8.0,
task_id: str = ""
) -> tuple[list, str | Literal[True]]:
"""
Confirm whether a question requires workforce processing.
Args:
agent: The agent to use for classification
prompt: The user's question
timeout: Timeout in seconds (default: 8.0)
task_id: Task ID for logging
Returns:
Tuple of (messages_to_yield, confirm_result)
- messages_to_yield: List of SSE messages to yield (empty if no timeout)
- confirm_result: True to proceed with workforce, or sse_json for user confirmation
"""
analysis_prompt = f"""
> **Your Role:** You are a highly capable agent. Your primary function is to analyze a user's request and determine the appropriate course of action.
>
> **Your Process:**
@ -303,12 +340,26 @@ async def question_confirm(agent: ListenChatAgent, prompt: str) -> str | Literal
> * **For a Simple Query:** Provide a direct and helpful response.
> * **For a Complex Task:** Your *only* response should be "yes". This will trigger a specialized workforce to handle the task. Do not include any other text, punctuation, or pleasantries.
"""
resp = agent.step(prompt)
logger.info(f"resp: {agent.chat_history}")
if resp.msgs[0].content.lower() != "yes":
return sse_json("wait_confirm", {"content": resp.msgs[0].content})
else:
return True
try:
resp = await asyncio.wait_for(
asyncio.to_thread(agent.step, analysis_prompt),
timeout=timeout
)
logger.info(f"resp: {agent.chat_history}")
if resp.msgs[0].content.lower() != "yes":
return ([], sse_json("wait_confirm", {"content": resp.msgs[0].content}))
else:
return ([], True)
except asyncio.TimeoutError:
logger.warning(f"question_confirm timeout for task {task_id}")
notice = sse_json(
"notice",
{"notice": "Quick classification timed out. Responding directly."}
)
return ([notice], True)
async def summary_task(agent: ListenChatAgent, task: Task) -> str: