preserve task.lock (#764)

This commit is contained in:
Wendong-Fan 2025-11-23 02:05:23 +08:00 committed by GitHub
commit 300ccc3af5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 316 additions and 74 deletions

View file

@ -204,14 +204,19 @@ def supplement(id: str, data: SupplementChat):
@traceroot.trace()
def stop(id: str):
"""stop the task"""
chat_logger.warning("Stopping chat session", extra={"task_id": id})
chat_logger.info("=" * 80)
chat_logger.info("🛑 [STOP-BUTTON] DELETE /chat/{id} request received from frontend")
chat_logger.info(f"[STOP-BUTTON] project_id/task_id: {id}")
chat_logger.info("=" * 80)
try:
task_lock = get_task_lock(id)
chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}")
chat_logger.info(f"[STOP-BUTTON] Queueing ActionStopData(Action.stop) to task_lock queue")
asyncio.run(task_lock.put_queue(ActionStopData(action=Action.stop)))
chat_logger.info("Chat stop signal sent", extra={"task_id": id})
chat_logger.info(f"[STOP-BUTTON] ✅ ActionStopData queued successfully, this will trigger workforce.stop_gracefully()")
except Exception as e:
# Task lock may not exist if task is already finished or never started
chat_logger.info("Task lock not found or already stopped", extra={"task_id": id, "error": str(e)})
chat_logger.warning(f"[STOP-BUTTON] ⚠️ Task lock not found or already stopped, task_id: {id}, error: {str(e)}")
return Response(status_code=204)
@ -282,18 +287,33 @@ def remove_task(project_id: str, task_id: str):
@router.post("/chat/{project_id}/skip-task", name="skip task in workforce")
@traceroot.trace()
def skip_task(project_id: str):
"""Skip a task in the workforce"""
chat_logger.info(f"Skipping task in workforce for project_id: {project_id}")
"""
Skip/Stop current task execution while preserving context.
This endpoint is called when user clicks the Stop button.
Behavior:
- Stops workforce gracefully
- Marks task as done
- Preserves conversation_history and last_task_result in task_lock
- Sends 'end' event to frontend
- Keeps SSE connection alive for multi-turn conversation
"""
chat_logger.info("=" * 80)
chat_logger.info(f"🛑 [STOP-BUTTON] SKIP-TASK request received from frontend (User clicked Stop)")
chat_logger.info(f"[STOP-BUTTON] project_id: {project_id}")
chat_logger.info("=" * 80)
task_lock = get_task_lock(project_id)
chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}")
try:
# Queue the skip task action
# Queue the skip task action - this will preserve context for multi-turn
skip_task_action = ActionSkipTaskData(project_id=project_id)
chat_logger.info(f"[STOP-BUTTON] Queueing ActionSkipTaskData (preserves context, marks as done)")
asyncio.run(task_lock.put_queue(skip_task_action))
chat_logger.info(f"Task skip request queued for project_id: {project_id}")
chat_logger.info(f"[STOP-BUTTON] ✅ Skip request queued - task will stop gracefully and preserve context")
return Response(status_code=201)
except Exception as e:
chat_logger.error(f"Error skipping task for project_id: {project_id}: {e}")
chat_logger.error(f"[STOP-BUTTON] ❌ Error skipping task for project_id: {project_id}: {e}")
raise UserException(code.error, f"Failed to skip task: {str(e)}")

View file

@ -268,23 +268,34 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
summary_task_content = "" # Track task summary
loop_iteration = 0
logger.info("Starting step_solve", extra={"project_id": options.project_id, "task_id": options.task_id})
logger.info("=" * 80)
logger.info("🚀 [LIFECYCLE] step_solve STARTED", extra={"project_id": options.project_id, "task_id": options.task_id})
logger.info("=" * 80)
logger.debug("Step solve options", extra={"task_id": options.task_id, "model_platform": options.model_platform})
while True:
loop_iteration += 1
logger.debug(f"[LIFECYCLE] step_solve loop iteration #{loop_iteration}", extra={"project_id": options.project_id, "task_id": options.task_id})
if await request.is_disconnected():
logger.warning(f"Client disconnected for project {options.project_id}")
logger.warning("=" * 80)
logger.warning(f"⚠️ [LIFECYCLE] CLIENT DISCONNECTED for project {options.project_id}")
logger.warning("=" * 80)
if workforce is not None:
logger.info(f"[LIFECYCLE] Stopping workforce due to client disconnect, workforce._running={workforce._running}")
if workforce._running:
workforce.stop()
workforce.stop_gracefully()
logger.info(f"[LIFECYCLE] Workforce stopped after client disconnect")
else:
logger.info(f"[LIFECYCLE] Workforce is None, no need to stop")
task_lock.status = Status.done
try:
await delete_task_lock(task_lock.id)
logger.info(f"[LIFECYCLE] Task lock deleted after client disconnect")
except Exception as e:
logger.error(f"Error deleting task lock on disconnect: {e}")
logger.info(f"[LIFECYCLE] Breaking out of step_solve loop due to client disconnect")
break
try:
item = await task_lock.get_queue()
@ -295,16 +306,23 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
try:
if item.action == Action.improve or start_event_loop:
logger.info("=" * 80)
logger.info(f"💬 [NEW-QUESTION] Action.improve received or start_event_loop", extra={"project_id": options.project_id, "start_event_loop": start_event_loop})
logger.info(f"[NEW-QUESTION] Current workforce state: workforce={'None' if workforce is None else f'exists(id={id(workforce)})'}")
logger.info(f"[NEW-QUESTION] Current camel_task state: camel_task={'None' if camel_task is None else f'exists(id={camel_task.id})'}")
logger.info("=" * 80)
# from viztracer import VizTracer
# tracer = VizTracer()
# tracer.start()
if start_event_loop is True:
question = options.question
logger.info(f"[NEW-QUESTION] Initial question from options.question: '{question[:100]}...'")
start_event_loop = False
else:
assert isinstance(item, ActionImproveData)
question = item.data
logger.info(f"[NEW-QUESTION] Follow-up question from ActionImproveData: '{question[:100]}...'")
is_exceeded, total_length = check_conversation_history_length(task_lock)
if is_exceeded:
@ -321,10 +339,14 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
if len(options.attaches) > 0:
# Questions with attachments always need workforce
is_complex_task = True
logger.info(f"[NEW-QUESTION] Has attachments, treating as complex task")
else:
logger.info(f"[NEW-QUESTION] Calling question_confirm to determine complexity")
is_complex_task = await question_confirm(question_agent, question, task_lock)
logger.info(f"[NEW-QUESTION] question_confirm result: is_complex={is_complex_task}")
if not is_complex_task:
logger.info(f"[NEW-QUESTION] ✅ Simple question, providing direct answer without workforce")
simple_answer_prompt = f"{build_conversation_context(task_lock, header='=== Previous Conversation ===')}User Query: {question}\n\nProvide a direct, helpful answer to this simple question."
try:
@ -359,42 +381,70 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
except Exception as e:
logger.error(f"Error cleaning up folder: {e}")
else:
logger.info(f"[NEW-QUESTION] 🔧 Complex task, creating workforce and decomposing")
# Update the sync_step with new task_id
if hasattr(item, 'new_task_id') and item.new_task_id:
set_current_task_id(options.project_id, item.new_task_id)
# Reset summary generation flag for new tasks to ensure proper summaries
task_lock.summary_generated = False
logger.info("Reset summary_generated flag for new task", extra={"project_id": options.project_id, "new_task_id": item.new_task_id})
logger.info("[NEW-QUESTION] Reset summary_generated flag for new task", extra={"project_id": options.project_id, "new_task_id": item.new_task_id})
logger.info(f"[NEW-QUESTION] Sending 'confirmed' SSE to frontend")
yield sse_json("confirmed", {"question": question})
logger.info(f"[NEW-QUESTION] Building context for coordinator")
context_for_coordinator = build_context_for_workforce(task_lock, options)
(workforce, mcp) = await construct_workforce(options)
for new_agent in options.new_agents:
workforce.add_single_agent_worker(
format_agent_description(new_agent), await new_agent_model(new_agent, options)
)
# Check if workforce exists - if so, reuse it (agents are preserved)
# Otherwise create new workforce
if workforce is not None:
logger.info(f"[NEW-QUESTION] 🔄 Workforce exists (id={id(workforce)}), state={workforce._state.name}, _running={workforce._running}")
logger.info(f"[NEW-QUESTION] ✅ Reusing existing workforce with preserved agents")
# Workforce is already stopped from skip_task, ready for new decomposition
else:
logger.info(f"[NEW-QUESTION] 🏭 Creating NEW workforce instance (workforce=None)")
(workforce, mcp) = await construct_workforce(options)
logger.info(f"[NEW-QUESTION] ✅ NEW Workforce instance created, id={id(workforce)}")
for new_agent in options.new_agents:
workforce.add_single_agent_worker(
format_agent_description(new_agent), await new_agent_model(new_agent, options)
)
task_lock.status = Status.confirmed
clean_task_content = question + options.summary_prompt
camel_task = Task(content=clean_task_content, id=options.task_id)
if len(options.attaches) > 0:
camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
# If camel_task already exists (from previous paused task), add new question as subtask
# Otherwise, create a new camel_task
if camel_task is not None:
logger.info(f"[NEW-QUESTION] 🔄 camel_task exists (id={camel_task.id}), adding new question as context")
# Update the task content with new question
clean_task_content = question + options.summary_prompt
logger.info(f"[NEW-QUESTION] Updating existing camel_task content with new question")
# We keep the existing task structure but update content for new decomposition
camel_task = Task(content=clean_task_content, id=options.task_id)
if len(options.attaches) > 0:
camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
else:
clean_task_content = question + options.summary_prompt
logger.info(f"[NEW-QUESTION] Creating NEW camel_task with id={options.task_id}")
camel_task = Task(content=clean_task_content, id=options.task_id)
if len(options.attaches) > 0:
camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
logger.info(f"[NEW-QUESTION] 🧩 Starting task decomposition via workforce.eigent_make_sub_tasks")
sub_tasks = await asyncio.to_thread(
workforce.eigent_make_sub_tasks,
camel_task,
context_for_coordinator
)
logger.info(f"[NEW-QUESTION] ✅ Task decomposed into {len(sub_tasks)} subtasks")
logger.info(f"[NEW-QUESTION] Generating task summary")
summary_task_agent = task_summary_agent(options)
try:
summary_task_content = await asyncio.wait_for(
summary_task(summary_task_agent, camel_task), timeout=10
)
task_lock.summary_generated = True
logger.info("Generated summary for task", extra={"project_id": options.project_id})
logger.info("[NEW-QUESTION] ✅ Summary generated successfully", extra={"project_id": options.project_id})
except asyncio.TimeoutError:
logger.warning("summary_task timeout", extra={"project_id": options.project_id, "task_id": options.task_id})
# Fallback to a minimal summary to unblock UI
@ -408,7 +458,10 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
summary_task_content = f"{fallback_name}|{fallback_summary}"
task_lock.summary_generated = True
logger.info(f"[NEW-QUESTION] 📤 Sending to_sub_tasks SSE to frontend (task card)")
logger.info(f"[NEW-QUESTION] to_sub_tasks data: task_id={camel_task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(camel_task.subtasks)}")
yield to_sub_tasks(camel_task, summary_task_content)
logger.info(f"[NEW-QUESTION] ✅ to_sub_tasks SSE sent")
# tracer.stop()
# tracer.save("trace.json")
@ -464,11 +517,64 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
}
yield sse_json("remove_task", returnData)
elif item.action == Action.skip_task:
logger.info("=" * 80)
logger.info(f"🛑 [LIFECYCLE] SKIP_TASK action received (User clicked Stop button)", extra={"project_id": options.project_id, "item_project_id": item.project_id})
logger.info("=" * 80)
# Prevent duplicate skip processing
if task_lock.status == Status.done:
logger.warning(f"⚠️ [LIFECYCLE] SKIP_TASK received but task already marked as done. Ignoring.")
continue
if workforce is not None and item.project_id == options.project_id:
if workforce._state.name == 'PAUSED':
# Resume paused workforce to skip the task
workforce.resume()
workforce.skip_gracefully()
logger.info(f"[LIFECYCLE] Workforce exists (id={id(workforce)}), state={workforce._state.name}, _running={workforce._running}")
# Stop workforce completely
logger.info(f"[LIFECYCLE] 🛑 Stopping workforce")
if workforce._running:
# Import correct BaseWorkforce from camel
from camel.societies.workforce.workforce import Workforce as BaseWorkforce
BaseWorkforce.stop(workforce)
logger.info(f"[LIFECYCLE] ✅ BaseWorkforce.stop() completed, state={workforce._state.name}, _running={workforce._running}")
workforce.stop_gracefully()
logger.info(f"[LIFECYCLE] ✅ Workforce stopped gracefully")
# Clear workforce to avoid state issues
# Next question will create fresh workforce
workforce = None
logger.info(f"[LIFECYCLE] Workforce set to None, will be recreated on next question")
else:
logger.warning(f"[LIFECYCLE] Cannot skip: workforce is None or project_id mismatch")
# Mark task as done and preserve context (like Action.end does)
task_lock.status = Status.done
end_message = "<summary>Task stopped</summary>Task stopped by user"
task_lock.last_task_result = end_message
# Add to conversation history (like normal end does)
if camel_task is not None:
task_content: str = camel_task.content
if "=== CURRENT TASK ===" in task_content:
task_content = task_content.split("=== CURRENT TASK ===")[-1].strip()
else:
task_content: str = f"Task {options.task_id}"
task_lock.add_conversation('task_result', {
'task_content': task_content,
'task_result': end_message,
'working_directory': get_working_directory(options, task_lock)
})
# Clear camel_task as well (workforce is cleared, so camel_task should be too)
camel_task = None
logger.info(f"[LIFECYCLE] ✅ Task marked as done, workforce and camel_task cleared, ready for multi-turn")
# Send end event to frontend with string format (matching normal end event format)
yield sse_json("end", end_message)
logger.info(f"[LIFECYCLE] Sent 'end' SSE event to frontend")
# Continue loop to accept new questions (don't break, don't delete task_lock)
elif item.action == Action.start:
# Check conversation history length before starting task
is_exceeded, total_length = check_conversation_history_length(task_lock)
@ -504,11 +610,15 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
yield sse_json("task_state", item.data)
elif item.action == Action.new_task_state:
logger.info("=" * 80)
logger.info(f"🔄 [LIFECYCLE] NEW_TASK_STATE action received (Multi-turn)", extra={"project_id": options.project_id})
logger.info("=" * 80)
# Log new task state details
new_task_id = item.data.get('task_id', 'unknown')
new_task_state = item.data.get('state', 'unknown')
new_task_result = item.data.get('result', '')
logger.info(f"[LIFECYCLE] New task details: task_id={new_task_id}, state={new_task_state}")
if camel_task is None:
logger.error(f"NEW_TASK_STATE action received but camel_task is None for project {options.project_id}, task {new_task_id}")
@ -548,13 +658,18 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
# Then handle multi-turn processing
if workforce is not None and new_task_content:
logger.info(f"[LIFECYCLE] Multi-turn: workforce exists (id={id(workforce)}), pausing for question confirmation")
task_lock.status = Status.confirming
workforce.pause()
logger.info(f"[LIFECYCLE] Multi-turn: workforce paused, state={workforce._state.name}")
try:
logger.info(f"[LIFECYCLE] Multi-turn: calling question_confirm for new task")
is_multi_turn_complex = await question_confirm(question_agent, new_task_content, task_lock)
logger.info(f"[LIFECYCLE] Multi-turn: question_confirm result: is_complex={is_multi_turn_complex}")
if not is_multi_turn_complex:
logger.info(f"[LIFECYCLE] Multi-turn: task is simple, providing direct answer without workforce")
simple_answer_prompt = f"{build_conversation_context(task_lock, header='=== Previous Conversation ===')}User Query: {new_task_content}\n\nProvide a direct, helpful answer to this simple question."
try:
@ -569,22 +684,28 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
logger.error(f"Error generating simple answer in multi-turn: {e}")
yield sse_json("wait_confirm", {"content": "I encountered an error while processing your question.", "question": new_task_content})
logger.info(f"[LIFECYCLE] Multi-turn: simple answer provided, resuming workforce")
workforce.resume()
logger.info(f"[LIFECYCLE] Multi-turn: workforce resumed, continuing to next iteration")
continue # This continues the main while loop, waiting for next action
# Update the sync_step with new task_id before sending new task sse events
logger.info(f"[LIFECYCLE] Multi-turn: task is complex, setting new task_id={task_id}")
set_current_task_id(options.project_id, task_id)
yield sse_json("confirmed", {"question": new_task_content})
task_lock.status = Status.confirmed
logger.info(f"[LIFECYCLE] Multi-turn: building context for workforce")
context_for_multi_turn = build_context_for_workforce(task_lock, options)
logger.info(f"[LIFECYCLE] Multi-turn: calling workforce.handle_decompose_append_task for new task decomposition")
new_sub_tasks = await workforce.handle_decompose_append_task(
camel_task,
reset=False,
coordinator_context=context_for_multi_turn
)
logger.info(f"[LIFECYCLE] Multi-turn: task decomposed into {len(new_sub_tasks)} subtasks")
# Generate proper LLM summary for multi-turn tasks instead of hardcoded fallback
try:
@ -682,11 +803,16 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
)
workforce.resume()
elif item.action == Action.end:
logger.info(f"Processing END action for project {options.project_id}, task {options.task_id}, camel_task exists: {camel_task is not None}, current status: {task_lock.status}")
logger.info("=" * 80)
logger.info(f"🏁 [LIFECYCLE] END action received for project {options.project_id}, task {options.task_id}")
logger.info(f"[LIFECYCLE] camel_task exists: {camel_task is not None}, current status: {task_lock.status}, workforce exists: {workforce is not None}")
if workforce is not None:
logger.info(f"[LIFECYCLE] Workforce state at END: _state={workforce._state.name}, _running={workforce._running}")
logger.info("=" * 80)
# Prevent duplicate end processing
if task_lock.status == Status.done:
logger.warning(f"END action received but task already marked as done for project {options.project_id}, task {options.task_id}. Ignoring duplicate END action.")
logger.warning(f"⚠️ [LIFECYCLE] END action received but task already marked as done. Ignoring duplicate END action.")
continue
if camel_task is None:
@ -718,17 +844,20 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
yield sse_json("end", final_result)
if workforce is not None:
logger.info(f"[LIFECYCLE] 🛑 Calling workforce.stop_gracefully() for project {options.project_id}, workforce id={id(workforce)}")
workforce.stop_gracefully()
logger.info(f"Workforce stopped gracefully for project {options.project_id}")
logger.info(f"[LIFECYCLE] ✅ Workforce stopped gracefully for project {options.project_id}")
workforce = None
logger.info(f"[LIFECYCLE] Workforce set to None")
else:
logger.warning(f"Workforce already None at end action for project {options.project_id}")
logger.warning(f"[LIFECYCLE] ⚠️ Workforce already None at end action for project {options.project_id}")
camel_task = None
logger.info(f"[LIFECYCLE] camel_task set to None")
if question_agent is not None:
question_agent.reset()
logger.info(f"Reset question_agent for project {options.project_id}")
logger.info(f"[LIFECYCLE] question_agent reset for project {options.project_id}")
elif item.action == Action.supplement:
# Check if this might be a misrouted second question
@ -752,14 +881,23 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
workforce.pause()
yield sse_json(Action.budget_not_enough, {"message": "budget not enouth"})
elif item.action == Action.stop:
logger.info("=" * 80)
logger.info(f"⏹️ [LIFECYCLE] STOP action received for project {options.project_id}")
logger.info("=" * 80)
if workforce is not None:
logger.info(f"[LIFECYCLE] Workforce exists (id={id(workforce)}), _running={workforce._running}, _state={workforce._state.name}")
if workforce._running:
logger.info(f"[LIFECYCLE] Calling workforce.stop() because _running=True")
workforce.stop()
logger.info(f"[LIFECYCLE] workforce.stop() completed")
logger.info(f"[LIFECYCLE] Calling workforce.stop_gracefully()")
workforce.stop_gracefully()
logger.info(f"Workforce stopped for project {options.project_id}")
logger.info(f"[LIFECYCLE] ✅ Workforce stopped for project {options.project_id}")
else:
logger.warning(f"Workforce is None at stop action for project {options.project_id}")
logger.warning(f"[LIFECYCLE] ⚠️ Workforce is None at stop action for project {options.project_id}")
logger.info(f"[LIFECYCLE] Deleting task lock")
await delete_task_lock(task_lock.id)
logger.info(f"[LIFECYCLE] Task lock deleted, breaking out of loop")
break
else:
logger.warning(f"Unknown action: {item.action}")
@ -796,13 +934,17 @@ async def install_mcp(
def to_sub_tasks(task: Task, summary_task_content: str):
return sse_json(
logger.info(f"[TO-SUB-TASKS] 📋 Creating to_sub_tasks SSE event")
logger.info(f"[TO-SUB-TASKS] task.id={task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(task.subtasks)}")
result = sse_json(
"to_sub_tasks",
{
"summary_task": summary_task_content,
"sub_tasks": tree_sub_tasks(task.subtasks),
},
)
logger.info(f"[TO-SUB-TASKS] ✅ to_sub_tasks SSE event created")
return result
def tree_sub_tasks(sub_tasks: list[Task], depth: int = 0):

View file

@ -44,14 +44,11 @@ class Workforce(BaseWorkforce):
use_structured_output_handler: bool = True,
) -> None:
self.api_task_id = api_task_id
logger.info("Initializing workforce", extra={
"api_task_id": api_task_id,
"description": description[:100] + "..." if len(description) > 100 else description,
"children_count": len(children) if children else 0,
"graceful_shutdown_timeout": graceful_shutdown_timeout,
"share_memory": share_memory,
"use_structured_output_handler": use_structured_output_handler
})
logger.info("=" * 80)
logger.info("🏭 [WF-LIFECYCLE] Workforce.__init__ STARTED", extra={"api_task_id": api_task_id})
logger.info(f"[WF-LIFECYCLE] Workforce id will be: {id(self)}")
logger.info(f"[WF-LIFECYCLE] Init params: graceful_shutdown_timeout={graceful_shutdown_timeout}, share_memory={share_memory}")
logger.info("=" * 80)
super().__init__(
description=description,
children=children,
@ -62,6 +59,7 @@ class Workforce(BaseWorkforce):
share_memory=share_memory,
use_structured_output_handler=use_structured_output_handler,
)
logger.info(f"[WF-LIFECYCLE] ✅ Workforce.__init__ COMPLETED, id={id(self)}")
def eigent_make_sub_tasks(self, task: Task, coordinator_context: str = ""):
"""
@ -72,58 +70,73 @@ class Workforce(BaseWorkforce):
coordinator_context: Optional context ONLY for coordinator agent during decomposition.
This context will NOT be passed to subtasks or worker agents.
"""
logger.info("Starting task decomposition", extra={
logger.info("=" * 80)
logger.info("🧩 [DECOMPOSE] eigent_make_sub_tasks CALLED", extra={
"api_task_id": self.api_task_id,
"task_id": task.id,
"task_content": task.content[:200] + "..." if len(task.content) > 200 else task.content,
"has_coordinator_context": bool(coordinator_context)
"workforce_id": id(self),
"task_id": task.id
})
logger.info(f"[DECOMPOSE] Task content preview: '{task.content[:200]}...'")
logger.info(f"[DECOMPOSE] Has coordinator context: {bool(coordinator_context)}")
logger.info(f"[DECOMPOSE] Current workforce state: {self._state.name}, _running: {self._running}")
logger.info("=" * 80)
if not validate_task_content(task.content, task.id):
task.state = TaskState.FAILED
task.result = "Task failed: Invalid or empty content provided"
logger.warning("Task rejected: Invalid or empty content", extra={
logger.warning("❌ [DECOMPOSE] Task rejected: Invalid or empty content", extra={
"task_id": task.id,
"content_preview": task.content[:50] + "..." if len(task.content) > 50 else task.content
})
raise UserException(code.error, task.result)
logger.info(f"[DECOMPOSE] Resetting workforce state")
self.reset()
self._task = task
self.set_channel(TaskChannel())
self._state = WorkforceState.RUNNING
task.state = TaskState.OPEN
logger.info(f"[DECOMPOSE] Workforce reset complete, state: {self._state.name}")
logger.info(f"[DECOMPOSE] Calling handle_decompose_append_task")
subtasks = asyncio.run(self.handle_decompose_append_task(task, reset=False, coordinator_context=coordinator_context))
logger.info("Task decomposition completed", extra={
logger.info("=" * 80)
logger.info(f"✅ [DECOMPOSE] Task decomposition COMPLETED", extra={
"api_task_id": self.api_task_id,
"task_id": task.id,
"subtasks_count": len(subtasks)
})
logger.info("=" * 80)
return subtasks
async def eigent_start(self, subtasks: list[Task]):
"""start the workforce"""
logger.info("Starting workforce execution", extra={
"api_task_id": self.api_task_id,
"subtasks_count": len(subtasks)
})
logger.info("=" * 80)
logger.info("▶️ [WF-LIFECYCLE] eigent_start CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
logger.info(f"[WF-LIFECYCLE] Starting workforce execution with {len(subtasks)} subtasks")
logger.info(f"[WF-LIFECYCLE] Current workforce state: {self._state.name}, _running: {self._running}")
logger.info("=" * 80)
self._pending_tasks.extendleft(reversed(subtasks))
# Save initial snapshot
self.save_snapshot("Initial task decomposition")
try:
logger.info(f"[WF-LIFECYCLE] Calling base class start() method")
await self.start()
logger.info(f"[WF-LIFECYCLE] ✅ Base class start() method completed")
except Exception as e:
logger.error("Error in workforce execution", extra={
logger.error(f"[WF-LIFECYCLE] ❌ Error in workforce execution: {e}", extra={
"api_task_id": self.api_task_id,
"error": str(e)
}, exc_info=True)
self._state = WorkforceState.STOPPED
logger.info(f"[WF-LIFECYCLE] Workforce state set to STOPPED after error")
raise
finally:
logger.info(f"[WF-LIFECYCLE] eigent_start finally block, current state: {self._state.name}")
if self._state != WorkforceState.STOPPED:
self._state = WorkforceState.IDLE
logger.info(f"[WF-LIFECYCLE] Workforce state set to IDLE")
async def handle_decompose_append_task(
self, task: Task, reset: bool = True, coordinator_context: str = ""
@ -140,23 +153,27 @@ class Workforce(BaseWorkforce):
Returns:
List[Task]: The decomposed subtasks or the original task
"""
logger.info(f"[DECOMPOSE] handle_decompose_append_task CALLED, task_id={task.id}, reset={reset}")
if not validate_task_content(task.content, task.id):
task.state = TaskState.FAILED
task.result = "Task failed: Invalid or empty content provided"
logger.warning(
f"Task {task.id} rejected: Invalid or empty content. "
f"[DECOMPOSE] Task {task.id} rejected: Invalid or empty content. "
f"Content preview: '{task.content}'"
)
return [task]
if reset and self._state != WorkforceState.RUNNING:
logger.info(f"[DECOMPOSE] Resetting workforce (reset={reset}, state={self._state.name})")
self.reset()
logger.info("Workforce reset before handling task.")
logger.info("[DECOMPOSE] Workforce reset complete")
self._task = task
task.state = TaskState.FAILED
if coordinator_context:
logger.info(f"[DECOMPOSE] Adding coordinator context to task")
original_content = task.content
task_with_context = coordinator_context
if coordinator_context:
@ -164,24 +181,30 @@ class Workforce(BaseWorkforce):
task_with_context += original_content
task.content = task_with_context
logger.info(f"[DECOMPOSE] Calling _decompose_task with context")
subtasks_result = self._decompose_task(task)
task.content = original_content
else:
logger.info(f"[DECOMPOSE] Calling _decompose_task without context")
subtasks_result = self._decompose_task(task)
logger.info(f"[DECOMPOSE] _decompose_task returned, processing results")
if isinstance(subtasks_result, Generator):
subtasks = []
for new_tasks in subtasks_result:
subtasks.extend(new_tasks)
logger.info(f"[DECOMPOSE] Collected {len(subtasks)} subtasks from generator")
else:
subtasks = subtasks_result
logger.info(f"[DECOMPOSE] Got {len(subtasks) if subtasks else 0} subtasks directly")
if subtasks:
self._pending_tasks.extendleft(reversed(subtasks))
logger.info(f"Appended {len(subtasks)} subtasks to pending tasks")
logger.info(f"[DECOMPOSE] ✅ Appended {len(subtasks)} subtasks to pending tasks")
if not subtasks:
logger.warning(f"[DECOMPOSE] No subtasks returned, creating fallback task")
fallback_task = Task(
content=task.content,
id=f"{task.id}.1",
@ -189,6 +212,7 @@ class Workforce(BaseWorkforce):
)
task.subtasks = [fallback_task]
subtasks = [fallback_task]
logger.info(f"[DECOMPOSE] Created fallback task: {fallback_task.id}")
return subtasks
@ -358,10 +382,48 @@ class Workforce(BaseWorkforce):
return result
def stop(self) -> None:
logger.info("=" * 80)
logger.info(f"⏹️ [WF-LIFECYCLE] stop() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
logger.info(f"[WF-LIFECYCLE] Current state before stop: {self._state.name}, _running: {self._running}")
logger.info("=" * 80)
super().stop()
logger.info(f"[WF-LIFECYCLE] super().stop() completed, new state: {self._state.name}")
task_lock = get_task_lock(self.api_task_id)
task = asyncio.create_task(task_lock.put_queue(ActionEndData()))
task_lock.add_background_task(task)
logger.info(f"[WF-LIFECYCLE] ✅ ActionEndData queued")
def stop_gracefully(self) -> None:
logger.info("=" * 80)
logger.info(f"🛑 [WF-LIFECYCLE] stop_gracefully() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
logger.info(f"[WF-LIFECYCLE] Current state before stop_gracefully: {self._state.name}, _running: {self._running}")
logger.info("=" * 80)
super().stop_gracefully()
logger.info(f"[WF-LIFECYCLE] ✅ super().stop_gracefully() completed, new state: {self._state.name}, _running: {self._running}")
def skip_gracefully(self) -> None:
logger.info("=" * 80)
logger.info(f"⏭️ [WF-LIFECYCLE] skip_gracefully() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
logger.info(f"[WF-LIFECYCLE] Current state before skip_gracefully: {self._state.name}, _running: {self._running}")
logger.info("=" * 80)
super().skip_gracefully()
logger.info(f"[WF-LIFECYCLE] ✅ super().skip_gracefully() completed, new state: {self._state.name}, _running: {self._running}")
def pause(self) -> None:
logger.info("=" * 80)
logger.info(f"⏸️ [WF-LIFECYCLE] pause() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
logger.info(f"[WF-LIFECYCLE] Current state before pause: {self._state.name}, _running: {self._running}")
logger.info("=" * 80)
super().pause()
logger.info(f"[WF-LIFECYCLE] ✅ super().pause() completed, new state: {self._state.name}, _running: {self._running}")
def resume(self) -> None:
logger.info("=" * 80)
logger.info(f"▶️ [WF-LIFECYCLE] resume() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
logger.info(f"[WF-LIFECYCLE] Current state before resume: {self._state.name}, _running: {self._running}")
logger.info("=" * 80)
super().resume()
logger.info(f"[WF-LIFECYCLE] ✅ super().resume() completed, new state: {self._state.name}, _running: {self._running}")
async def cleanup(self) -> None:
r"""Clean up resources when workforce is done"""

View file

@ -208,6 +208,8 @@ export default function ChatBox(): JSX.Element {
const nextTaskId = generateUniqueId()
chatStore.setNextTaskId(nextTaskId);
// Use improve endpoint (POST /chat/{id}) - {id} is project_id
// This reuses the existing SSE connection and step_solve loop
fetchPost(`/chat/${projectStore.activeProjectId}`, {
question: tempMessageContent,
task_id: nextTaskId
@ -417,43 +419,57 @@ export default function ChatBox(): JSX.Element {
setIsPauseResumeLoading(false);
};
// Skip to next task handler
// Stop task handler - triggers Action.skip_task which preserves context
const handleSkip = async () => {
const taskId = chatStore.activeTaskId as string;
console.log("=" .repeat(80));
console.log("🛑 [STOP-BUTTON] handleSkip CALLED from frontend");
console.log(`[STOP-BUTTON] taskId: ${taskId}, projectId: ${projectStore.activeProjectId}`);
console.log("=" .repeat(80));
setIsPauseResumeLoading(true);
try {
// First, try to notify backend to skip the task
// Call skip-task endpoint to trigger Action.skip_task
// This will stop the task gracefully while preserving context for multi-turn
console.log(`[STOP-BUTTON] Sending POST request to /chat/${projectStore.activeProjectId}/skip-task`);
await fetchPost(`/chat/${projectStore.activeProjectId}/skip-task`, {
project_id: projectStore.activeProjectId
});
console.log("[STOP-BUTTON] ✅ Backend skip-task request successful");
// Only stop local task if backend call succeeds
chatStore.stopTask(taskId);
// DO NOT call chatStore.stopTask here!
// Keep SSE connection alive to receive "end" event from backend
// The "end" event will set status to 'finished' and allow multi-turn conversation
console.log("[STOP-BUTTON] ⚠️ SSE connection kept alive, waiting for backend 'end' event");
// Only set isPending to false so UI shows task is stopped
chatStore.setIsPending(taskId, false);
console.log("[STOP-BUTTON] ✅ Task marked as not pending, SSE connection remains open");
toast.success("Task stopped successfully", {
closeButton: true,
});
} catch (error) {
console.error("Failed to skip task:", error);
console.error("[STOP-BUTTON] ❌ Failed to stop task:", error);
// If backend call failed, still try to stop local task as fallback
// but with different messaging to user
// If backend call failed, close SSE connection as fallback
console.log("[STOP-BUTTON] Backend call failed, closing SSE connection as fallback");
try {
chatStore.stopTask(taskId);
chatStore.setIsPending(taskId, false);
console.log("[STOP-BUTTON] ⚠️ SSE connection closed due to backend failure");
toast.warning("Task stopped locally, but backend notification failed. Backend task may continue running.", {
closeButton: true,
duration: 5000,
});
} catch (localError) {
console.error("Failed to stop task locally:", localError);
console.error("[STOP-BUTTON] ❌ Failed to stop task locally:", localError);
toast.error("Failed to stop task completely. Please refresh the page.", {
closeButton: true,
});
}
} finally {
console.log("[STOP-BUTTON] handleSkip completed");
setIsPauseResumeLoading(false);
}
};

View file

@ -572,23 +572,25 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
const lockedTaskId = getCurrentTaskId();
const currentTask = getCurrentChatStore().tasks[lockedTaskId];
// Only ignore messages if:
// 1. The task doesn't exist, OR
// 2. The task is finished AND it's not a task-switching event
// Only ignore messages if task is finished and not a valid post-completion event
// Valid events after task completion:
// - Task switching: confirmed, new_task_state, end
// - Multi-turn simple answer: wait_confirm
const isTaskSwitchingEvent = agentMessages.step === "confirmed" ||
agentMessages.step === "new_task_state" ||
agentMessages.step === "end";
// More robust check - only ignore if task doesn't exist OR
// task is finished and it's not a legitimate flow-control event
const isMultiTurnSimpleAnswer = agentMessages.step === "wait_confirm";
if (!currentTask) {
console.log(`Task ${lockedTaskId} not found, ignoring SSE message for step: ${agentMessages.step}`);
return;
}
if (currentTask.status === 'finished' && !isTaskSwitchingEvent) {
// Only ignore non-essential messages for finished tasks
// Allow flow control messages through even for finished tasks
if (currentTask.status === 'finished' && !isTaskSwitchingEvent && !isMultiTurnSimpleAnswer) {
// Ignore messages for finished tasks except:
// 1. Task switching events (create new chatStore)
// 2. Simple answer events (direct response without new chatStore)
console.log(`Ignoring SSE message for finished task ${lockedTaskId}, step: ${agentMessages.step}`);
return;
}