diff --git a/backend/app/controller/chat_controller.py b/backend/app/controller/chat_controller.py
index a983acfd2..b7fc78262 100644
--- a/backend/app/controller/chat_controller.py
+++ b/backend/app/controller/chat_controller.py
@@ -204,14 +204,19 @@ def supplement(id: str, data: SupplementChat):
@traceroot.trace()
def stop(id: str):
"""stop the task"""
- chat_logger.warning("Stopping chat session", extra={"task_id": id})
+ chat_logger.info("=" * 80)
+ chat_logger.info("π [STOP-BUTTON] DELETE /chat/{id} request received from frontend")
+ chat_logger.info(f"[STOP-BUTTON] project_id/task_id: {id}")
+ chat_logger.info("=" * 80)
try:
task_lock = get_task_lock(id)
+ chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}")
+ chat_logger.info(f"[STOP-BUTTON] Queueing ActionStopData(Action.stop) to task_lock queue")
asyncio.run(task_lock.put_queue(ActionStopData(action=Action.stop)))
- chat_logger.info("Chat stop signal sent", extra={"task_id": id})
+ chat_logger.info(f"[STOP-BUTTON] β
ActionStopData queued successfully, this will trigger workforce.stop_gracefully()")
except Exception as e:
# Task lock may not exist if task is already finished or never started
- chat_logger.info("Task lock not found or already stopped", extra={"task_id": id, "error": str(e)})
+ chat_logger.warning(f"[STOP-BUTTON] β οΈ Task lock not found or already stopped, task_id: {id}, error: {str(e)}")
return Response(status_code=204)
@@ -282,18 +287,33 @@ def remove_task(project_id: str, task_id: str):
@router.post("/chat/{project_id}/skip-task", name="skip task in workforce")
@traceroot.trace()
def skip_task(project_id: str):
- """Skip a task in the workforce"""
- chat_logger.info(f"Skipping task in workforce for project_id: {project_id}")
+ """
+ Skip/Stop current task execution while preserving context.
+ This endpoint is called when user clicks the Stop button.
+
+ Behavior:
+ - Stops workforce gracefully
+ - Marks task as done
+ - Preserves conversation_history and last_task_result in task_lock
+ - Sends 'end' event to frontend
+ - Keeps SSE connection alive for multi-turn conversation
+ """
+ chat_logger.info("=" * 80)
+ chat_logger.info(f"π [STOP-BUTTON] SKIP-TASK request received from frontend (User clicked Stop)")
+ chat_logger.info(f"[STOP-BUTTON] project_id: {project_id}")
+ chat_logger.info("=" * 80)
task_lock = get_task_lock(project_id)
+ chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}")
try:
- # Queue the skip task action
+ # Queue the skip task action - this will preserve context for multi-turn
skip_task_action = ActionSkipTaskData(project_id=project_id)
+ chat_logger.info(f"[STOP-BUTTON] Queueing ActionSkipTaskData (preserves context, marks as done)")
asyncio.run(task_lock.put_queue(skip_task_action))
- chat_logger.info(f"Task skip request queued for project_id: {project_id}")
+ chat_logger.info(f"[STOP-BUTTON] β
Skip request queued - task will stop gracefully and preserve context")
return Response(status_code=201)
except Exception as e:
- chat_logger.error(f"Error skipping task for project_id: {project_id}: {e}")
+ chat_logger.error(f"[STOP-BUTTON] β Error skipping task for project_id: {project_id}: {e}")
raise UserException(code.error, f"Failed to skip task: {str(e)}")
diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py
index 2c2b028e6..0ac7969d2 100644
--- a/backend/app/service/chat_service.py
+++ b/backend/app/service/chat_service.py
@@ -268,23 +268,34 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
summary_task_content = "" # Track task summary
loop_iteration = 0
- logger.info("Starting step_solve", extra={"project_id": options.project_id, "task_id": options.task_id})
+ logger.info("=" * 80)
+ logger.info("π [LIFECYCLE] step_solve STARTED", extra={"project_id": options.project_id, "task_id": options.task_id})
+ logger.info("=" * 80)
logger.debug("Step solve options", extra={"task_id": options.task_id, "model_platform": options.model_platform})
while True:
loop_iteration += 1
+ logger.debug(f"[LIFECYCLE] step_solve loop iteration #{loop_iteration}", extra={"project_id": options.project_id, "task_id": options.task_id})
if await request.is_disconnected():
- logger.warning(f"Client disconnected for project {options.project_id}")
+ logger.warning("=" * 80)
+ logger.warning(f"β οΈ [LIFECYCLE] CLIENT DISCONNECTED for project {options.project_id}")
+ logger.warning("=" * 80)
if workforce is not None:
+ logger.info(f"[LIFECYCLE] Stopping workforce due to client disconnect, workforce._running={workforce._running}")
if workforce._running:
workforce.stop()
workforce.stop_gracefully()
+ logger.info(f"[LIFECYCLE] Workforce stopped after client disconnect")
+ else:
+ logger.info(f"[LIFECYCLE] Workforce is None, no need to stop")
task_lock.status = Status.done
try:
await delete_task_lock(task_lock.id)
+ logger.info(f"[LIFECYCLE] Task lock deleted after client disconnect")
except Exception as e:
logger.error(f"Error deleting task lock on disconnect: {e}")
+ logger.info(f"[LIFECYCLE] Breaking out of step_solve loop due to client disconnect")
break
try:
item = await task_lock.get_queue()
@@ -295,16 +306,23 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
try:
if item.action == Action.improve or start_event_loop:
+ logger.info("=" * 80)
+ logger.info(f"π¬ [NEW-QUESTION] Action.improve received or start_event_loop", extra={"project_id": options.project_id, "start_event_loop": start_event_loop})
+ logger.info(f"[NEW-QUESTION] Current workforce state: workforce={'None' if workforce is None else f'exists(id={id(workforce)})'}")
+ logger.info(f"[NEW-QUESTION] Current camel_task state: camel_task={'None' if camel_task is None else f'exists(id={camel_task.id})'}")
+ logger.info("=" * 80)
# from viztracer import VizTracer
# tracer = VizTracer()
# tracer.start()
if start_event_loop is True:
question = options.question
+ logger.info(f"[NEW-QUESTION] Initial question from options.question: '{question[:100]}...'")
start_event_loop = False
else:
assert isinstance(item, ActionImproveData)
question = item.data
+ logger.info(f"[NEW-QUESTION] Follow-up question from ActionImproveData: '{question[:100]}...'")
is_exceeded, total_length = check_conversation_history_length(task_lock)
if is_exceeded:
@@ -321,10 +339,14 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
if len(options.attaches) > 0:
# Questions with attachments always need workforce
is_complex_task = True
+ logger.info(f"[NEW-QUESTION] Has attachments, treating as complex task")
else:
+ logger.info(f"[NEW-QUESTION] Calling question_confirm to determine complexity")
is_complex_task = await question_confirm(question_agent, question, task_lock)
+ logger.info(f"[NEW-QUESTION] question_confirm result: is_complex={is_complex_task}")
if not is_complex_task:
+ logger.info(f"[NEW-QUESTION] β
Simple question, providing direct answer without workforce")
simple_answer_prompt = f"{build_conversation_context(task_lock, header='=== Previous Conversation ===')}User Query: {question}\n\nProvide a direct, helpful answer to this simple question."
try:
@@ -359,42 +381,70 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
except Exception as e:
logger.error(f"Error cleaning up folder: {e}")
else:
+ logger.info(f"[NEW-QUESTION] π§ Complex task, creating workforce and decomposing")
# Update the sync_step with new task_id
if hasattr(item, 'new_task_id') and item.new_task_id:
set_current_task_id(options.project_id, item.new_task_id)
# Reset summary generation flag for new tasks to ensure proper summaries
task_lock.summary_generated = False
- logger.info("Reset summary_generated flag for new task", extra={"project_id": options.project_id, "new_task_id": item.new_task_id})
+ logger.info("[NEW-QUESTION] Reset summary_generated flag for new task", extra={"project_id": options.project_id, "new_task_id": item.new_task_id})
+ logger.info(f"[NEW-QUESTION] Sending 'confirmed' SSE to frontend")
yield sse_json("confirmed", {"question": question})
-
+
+ logger.info(f"[NEW-QUESTION] Building context for coordinator")
context_for_coordinator = build_context_for_workforce(task_lock, options)
- (workforce, mcp) = await construct_workforce(options)
- for new_agent in options.new_agents:
- workforce.add_single_agent_worker(
- format_agent_description(new_agent), await new_agent_model(new_agent, options)
- )
+ # Check if workforce exists - if so, reuse it (agents are preserved)
+ # Otherwise create new workforce
+ if workforce is not None:
+ logger.info(f"[NEW-QUESTION] π Workforce exists (id={id(workforce)}), state={workforce._state.name}, _running={workforce._running}")
+ logger.info(f"[NEW-QUESTION] β
Reusing existing workforce with preserved agents")
+ # Workforce is already stopped from skip_task, ready for new decomposition
+ else:
+ logger.info(f"[NEW-QUESTION] π Creating NEW workforce instance (workforce=None)")
+ (workforce, mcp) = await construct_workforce(options)
+ logger.info(f"[NEW-QUESTION] β
NEW Workforce instance created, id={id(workforce)}")
+ for new_agent in options.new_agents:
+ workforce.add_single_agent_worker(
+ format_agent_description(new_agent), await new_agent_model(new_agent, options)
+ )
task_lock.status = Status.confirmed
- clean_task_content = question + options.summary_prompt
- camel_task = Task(content=clean_task_content, id=options.task_id)
- if len(options.attaches) > 0:
- camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
+ # If camel_task already exists (from previous paused task), add new question as subtask
+ # Otherwise, create a new camel_task
+ if camel_task is not None:
+ logger.info(f"[NEW-QUESTION] π camel_task exists (id={camel_task.id}), adding new question as context")
+ # Update the task content with new question
+ clean_task_content = question + options.summary_prompt
+ logger.info(f"[NEW-QUESTION] Updating existing camel_task content with new question")
+ # We keep the existing task structure but update content for new decomposition
+ camel_task = Task(content=clean_task_content, id=options.task_id)
+ if len(options.attaches) > 0:
+ camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
+ else:
+ clean_task_content = question + options.summary_prompt
+ logger.info(f"[NEW-QUESTION] Creating NEW camel_task with id={options.task_id}")
+ camel_task = Task(content=clean_task_content, id=options.task_id)
+ if len(options.attaches) > 0:
+ camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
+ logger.info(f"[NEW-QUESTION] π§© Starting task decomposition via workforce.eigent_make_sub_tasks")
sub_tasks = await asyncio.to_thread(
workforce.eigent_make_sub_tasks,
camel_task,
context_for_coordinator
)
+ logger.info(f"[NEW-QUESTION] β
Task decomposed into {len(sub_tasks)} subtasks")
+ logger.info(f"[NEW-QUESTION] Generating task summary")
summary_task_agent = task_summary_agent(options)
try:
summary_task_content = await asyncio.wait_for(
summary_task(summary_task_agent, camel_task), timeout=10
)
task_lock.summary_generated = True
- logger.info("Generated summary for task", extra={"project_id": options.project_id})
+ logger.info("[NEW-QUESTION] β
Summary generated successfully", extra={"project_id": options.project_id})
except asyncio.TimeoutError:
logger.warning("summary_task timeout", extra={"project_id": options.project_id, "task_id": options.task_id})
# Fallback to a minimal summary to unblock UI
@@ -408,7 +458,10 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
summary_task_content = f"{fallback_name}|{fallback_summary}"
task_lock.summary_generated = True
+ logger.info(f"[NEW-QUESTION] π€ Sending to_sub_tasks SSE to frontend (task card)")
+ logger.info(f"[NEW-QUESTION] to_sub_tasks data: task_id={camel_task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(camel_task.subtasks)}")
yield to_sub_tasks(camel_task, summary_task_content)
+ logger.info(f"[NEW-QUESTION] β
to_sub_tasks SSE sent")
# tracer.stop()
# tracer.save("trace.json")
@@ -464,11 +517,64 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
}
yield sse_json("remove_task", returnData)
elif item.action == Action.skip_task:
+ logger.info("=" * 80)
+ logger.info(f"π [LIFECYCLE] SKIP_TASK action received (User clicked Stop button)", extra={"project_id": options.project_id, "item_project_id": item.project_id})
+ logger.info("=" * 80)
+
+ # Prevent duplicate skip processing
+ if task_lock.status == Status.done:
+ logger.warning(f"β οΈ [LIFECYCLE] SKIP_TASK received but task already marked as done. Ignoring.")
+ continue
+
if workforce is not None and item.project_id == options.project_id:
- if workforce._state.name == 'PAUSED':
- # Resume paused workforce to skip the task
- workforce.resume()
- workforce.skip_gracefully()
+ logger.info(f"[LIFECYCLE] Workforce exists (id={id(workforce)}), state={workforce._state.name}, _running={workforce._running}")
+
+ # Stop workforce completely
+ logger.info(f"[LIFECYCLE] π Stopping workforce")
+ if workforce._running:
+ # Import correct BaseWorkforce from camel
+ from camel.societies.workforce.workforce import Workforce as BaseWorkforce
+ BaseWorkforce.stop(workforce)
+ logger.info(f"[LIFECYCLE] β
BaseWorkforce.stop() completed, state={workforce._state.name}, _running={workforce._running}")
+
+ workforce.stop_gracefully()
+ logger.info(f"[LIFECYCLE] β
Workforce stopped gracefully")
+
+ # Clear workforce to avoid state issues
+ # Next question will create fresh workforce
+ workforce = None
+ logger.info(f"[LIFECYCLE] Workforce set to None, will be recreated on next question")
+ else:
+ logger.warning(f"[LIFECYCLE] Cannot skip: workforce is None or project_id mismatch")
+
+ # Mark task as done and preserve context (like Action.end does)
+ task_lock.status = Status.done
+ end_message = "Task stoppedTask stopped by user"
+ task_lock.last_task_result = end_message
+
+ # Add to conversation history (like normal end does)
+ if camel_task is not None:
+ task_content: str = camel_task.content
+ if "=== CURRENT TASK ===" in task_content:
+ task_content = task_content.split("=== CURRENT TASK ===")[-1].strip()
+ else:
+ task_content: str = f"Task {options.task_id}"
+
+ task_lock.add_conversation('task_result', {
+ 'task_content': task_content,
+ 'task_result': end_message,
+ 'working_directory': get_working_directory(options, task_lock)
+ })
+
+ # Clear camel_task as well (workforce is cleared, so camel_task should be too)
+ camel_task = None
+ logger.info(f"[LIFECYCLE] β
Task marked as done, workforce and camel_task cleared, ready for multi-turn")
+
+ # Send end event to frontend with string format (matching normal end event format)
+ yield sse_json("end", end_message)
+ logger.info(f"[LIFECYCLE] Sent 'end' SSE event to frontend")
+
+ # Continue loop to accept new questions (don't break, don't delete task_lock)
elif item.action == Action.start:
# Check conversation history length before starting task
is_exceeded, total_length = check_conversation_history_length(task_lock)
@@ -504,11 +610,15 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
yield sse_json("task_state", item.data)
elif item.action == Action.new_task_state:
+ logger.info("=" * 80)
+ logger.info(f"π [LIFECYCLE] NEW_TASK_STATE action received (Multi-turn)", extra={"project_id": options.project_id})
+ logger.info("=" * 80)
# Log new task state details
new_task_id = item.data.get('task_id', 'unknown')
new_task_state = item.data.get('state', 'unknown')
new_task_result = item.data.get('result', '')
+ logger.info(f"[LIFECYCLE] New task details: task_id={new_task_id}, state={new_task_state}")
if camel_task is None:
logger.error(f"NEW_TASK_STATE action received but camel_task is None for project {options.project_id}, task {new_task_id}")
@@ -548,13 +658,18 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
# Then handle multi-turn processing
if workforce is not None and new_task_content:
+ logger.info(f"[LIFECYCLE] Multi-turn: workforce exists (id={id(workforce)}), pausing for question confirmation")
task_lock.status = Status.confirming
workforce.pause()
+ logger.info(f"[LIFECYCLE] Multi-turn: workforce paused, state={workforce._state.name}")
try:
+ logger.info(f"[LIFECYCLE] Multi-turn: calling question_confirm for new task")
is_multi_turn_complex = await question_confirm(question_agent, new_task_content, task_lock)
+ logger.info(f"[LIFECYCLE] Multi-turn: question_confirm result: is_complex={is_multi_turn_complex}")
if not is_multi_turn_complex:
+ logger.info(f"[LIFECYCLE] Multi-turn: task is simple, providing direct answer without workforce")
simple_answer_prompt = f"{build_conversation_context(task_lock, header='=== Previous Conversation ===')}User Query: {new_task_content}\n\nProvide a direct, helpful answer to this simple question."
try:
@@ -569,22 +684,28 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
logger.error(f"Error generating simple answer in multi-turn: {e}")
yield sse_json("wait_confirm", {"content": "I encountered an error while processing your question.", "question": new_task_content})
+ logger.info(f"[LIFECYCLE] Multi-turn: simple answer provided, resuming workforce")
workforce.resume()
+ logger.info(f"[LIFECYCLE] Multi-turn: workforce resumed, continuing to next iteration")
continue # This continues the main while loop, waiting for next action
# Update the sync_step with new task_id before sending new task sse events
+ logger.info(f"[LIFECYCLE] Multi-turn: task is complex, setting new task_id={task_id}")
set_current_task_id(options.project_id, task_id)
-
+
yield sse_json("confirmed", {"question": new_task_content})
task_lock.status = Status.confirmed
+ logger.info(f"[LIFECYCLE] Multi-turn: building context for workforce")
context_for_multi_turn = build_context_for_workforce(task_lock, options)
+ logger.info(f"[LIFECYCLE] Multi-turn: calling workforce.handle_decompose_append_task for new task decomposition")
new_sub_tasks = await workforce.handle_decompose_append_task(
camel_task,
reset=False,
coordinator_context=context_for_multi_turn
)
+ logger.info(f"[LIFECYCLE] Multi-turn: task decomposed into {len(new_sub_tasks)} subtasks")
# Generate proper LLM summary for multi-turn tasks instead of hardcoded fallback
try:
@@ -682,11 +803,16 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
)
workforce.resume()
elif item.action == Action.end:
- logger.info(f"Processing END action for project {options.project_id}, task {options.task_id}, camel_task exists: {camel_task is not None}, current status: {task_lock.status}")
-
+ logger.info("=" * 80)
+ logger.info(f"π [LIFECYCLE] END action received for project {options.project_id}, task {options.task_id}")
+ logger.info(f"[LIFECYCLE] camel_task exists: {camel_task is not None}, current status: {task_lock.status}, workforce exists: {workforce is not None}")
+ if workforce is not None:
+ logger.info(f"[LIFECYCLE] Workforce state at END: _state={workforce._state.name}, _running={workforce._running}")
+ logger.info("=" * 80)
+
# Prevent duplicate end processing
if task_lock.status == Status.done:
- logger.warning(f"END action received but task already marked as done for project {options.project_id}, task {options.task_id}. Ignoring duplicate END action.")
+ logger.warning(f"β οΈ [LIFECYCLE] END action received but task already marked as done. Ignoring duplicate END action.")
continue
if camel_task is None:
@@ -718,17 +844,20 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
yield sse_json("end", final_result)
if workforce is not None:
+ logger.info(f"[LIFECYCLE] π Calling workforce.stop_gracefully() for project {options.project_id}, workforce id={id(workforce)}")
workforce.stop_gracefully()
- logger.info(f"Workforce stopped gracefully for project {options.project_id}")
+ logger.info(f"[LIFECYCLE] β
Workforce stopped gracefully for project {options.project_id}")
workforce = None
+ logger.info(f"[LIFECYCLE] Workforce set to None")
else:
- logger.warning(f"Workforce already None at end action for project {options.project_id}")
+ logger.warning(f"[LIFECYCLE] β οΈ Workforce already None at end action for project {options.project_id}")
camel_task = None
+ logger.info(f"[LIFECYCLE] camel_task set to None")
if question_agent is not None:
question_agent.reset()
- logger.info(f"Reset question_agent for project {options.project_id}")
+ logger.info(f"[LIFECYCLE] question_agent reset for project {options.project_id}")
elif item.action == Action.supplement:
# Check if this might be a misrouted second question
@@ -752,14 +881,23 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
workforce.pause()
yield sse_json(Action.budget_not_enough, {"message": "budget not enouth"})
elif item.action == Action.stop:
+ logger.info("=" * 80)
+ logger.info(f"βΉοΈ [LIFECYCLE] STOP action received for project {options.project_id}")
+ logger.info("=" * 80)
if workforce is not None:
+ logger.info(f"[LIFECYCLE] Workforce exists (id={id(workforce)}), _running={workforce._running}, _state={workforce._state.name}")
if workforce._running:
+ logger.info(f"[LIFECYCLE] Calling workforce.stop() because _running=True")
workforce.stop()
+ logger.info(f"[LIFECYCLE] workforce.stop() completed")
+ logger.info(f"[LIFECYCLE] Calling workforce.stop_gracefully()")
workforce.stop_gracefully()
- logger.info(f"Workforce stopped for project {options.project_id}")
+ logger.info(f"[LIFECYCLE] β
Workforce stopped for project {options.project_id}")
else:
- logger.warning(f"Workforce is None at stop action for project {options.project_id}")
+ logger.warning(f"[LIFECYCLE] β οΈ Workforce is None at stop action for project {options.project_id}")
+ logger.info(f"[LIFECYCLE] Deleting task lock")
await delete_task_lock(task_lock.id)
+ logger.info(f"[LIFECYCLE] Task lock deleted, breaking out of loop")
break
else:
logger.warning(f"Unknown action: {item.action}")
@@ -796,13 +934,17 @@ async def install_mcp(
def to_sub_tasks(task: Task, summary_task_content: str):
- return sse_json(
+ logger.info(f"[TO-SUB-TASKS] π Creating to_sub_tasks SSE event")
+ logger.info(f"[TO-SUB-TASKS] task.id={task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(task.subtasks)}")
+ result = sse_json(
"to_sub_tasks",
{
"summary_task": summary_task_content,
"sub_tasks": tree_sub_tasks(task.subtasks),
},
)
+ logger.info(f"[TO-SUB-TASKS] β
to_sub_tasks SSE event created")
+ return result
def tree_sub_tasks(sub_tasks: list[Task], depth: int = 0):
diff --git a/backend/app/utils/workforce.py b/backend/app/utils/workforce.py
index 52cda36d4..c9793ec10 100644
--- a/backend/app/utils/workforce.py
+++ b/backend/app/utils/workforce.py
@@ -44,14 +44,11 @@ class Workforce(BaseWorkforce):
use_structured_output_handler: bool = True,
) -> None:
self.api_task_id = api_task_id
- logger.info("Initializing workforce", extra={
- "api_task_id": api_task_id,
- "description": description[:100] + "..." if len(description) > 100 else description,
- "children_count": len(children) if children else 0,
- "graceful_shutdown_timeout": graceful_shutdown_timeout,
- "share_memory": share_memory,
- "use_structured_output_handler": use_structured_output_handler
- })
+ logger.info("=" * 80)
+ logger.info("π [WF-LIFECYCLE] Workforce.__init__ STARTED", extra={"api_task_id": api_task_id})
+ logger.info(f"[WF-LIFECYCLE] Workforce id will be: {id(self)}")
+ logger.info(f"[WF-LIFECYCLE] Init params: graceful_shutdown_timeout={graceful_shutdown_timeout}, share_memory={share_memory}")
+ logger.info("=" * 80)
super().__init__(
description=description,
children=children,
@@ -62,6 +59,7 @@ class Workforce(BaseWorkforce):
share_memory=share_memory,
use_structured_output_handler=use_structured_output_handler,
)
+ logger.info(f"[WF-LIFECYCLE] β
Workforce.__init__ COMPLETED, id={id(self)}")
def eigent_make_sub_tasks(self, task: Task, coordinator_context: str = ""):
"""
@@ -72,58 +70,73 @@ class Workforce(BaseWorkforce):
coordinator_context: Optional context ONLY for coordinator agent during decomposition.
This context will NOT be passed to subtasks or worker agents.
"""
- logger.info("Starting task decomposition", extra={
+ logger.info("=" * 80)
+ logger.info("π§© [DECOMPOSE] eigent_make_sub_tasks CALLED", extra={
"api_task_id": self.api_task_id,
- "task_id": task.id,
- "task_content": task.content[:200] + "..." if len(task.content) > 200 else task.content,
- "has_coordinator_context": bool(coordinator_context)
+ "workforce_id": id(self),
+ "task_id": task.id
})
+ logger.info(f"[DECOMPOSE] Task content preview: '{task.content[:200]}...'")
+ logger.info(f"[DECOMPOSE] Has coordinator context: {bool(coordinator_context)}")
+ logger.info(f"[DECOMPOSE] Current workforce state: {self._state.name}, _running: {self._running}")
+ logger.info("=" * 80)
if not validate_task_content(task.content, task.id):
task.state = TaskState.FAILED
task.result = "Task failed: Invalid or empty content provided"
- logger.warning("Task rejected: Invalid or empty content", extra={
+ logger.warning("β [DECOMPOSE] Task rejected: Invalid or empty content", extra={
"task_id": task.id,
"content_preview": task.content[:50] + "..." if len(task.content) > 50 else task.content
})
raise UserException(code.error, task.result)
+ logger.info(f"[DECOMPOSE] Resetting workforce state")
self.reset()
self._task = task
self.set_channel(TaskChannel())
self._state = WorkforceState.RUNNING
task.state = TaskState.OPEN
+ logger.info(f"[DECOMPOSE] Workforce reset complete, state: {self._state.name}")
+ logger.info(f"[DECOMPOSE] Calling handle_decompose_append_task")
subtasks = asyncio.run(self.handle_decompose_append_task(task, reset=False, coordinator_context=coordinator_context))
- logger.info("Task decomposition completed", extra={
+ logger.info("=" * 80)
+ logger.info(f"β
[DECOMPOSE] Task decomposition COMPLETED", extra={
"api_task_id": self.api_task_id,
"task_id": task.id,
"subtasks_count": len(subtasks)
})
+ logger.info("=" * 80)
return subtasks
async def eigent_start(self, subtasks: list[Task]):
"""start the workforce"""
- logger.info("Starting workforce execution", extra={
- "api_task_id": self.api_task_id,
- "subtasks_count": len(subtasks)
- })
+ logger.info("=" * 80)
+ logger.info("βΆοΈ [WF-LIFECYCLE] eigent_start CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
+ logger.info(f"[WF-LIFECYCLE] Starting workforce execution with {len(subtasks)} subtasks")
+ logger.info(f"[WF-LIFECYCLE] Current workforce state: {self._state.name}, _running: {self._running}")
+ logger.info("=" * 80)
self._pending_tasks.extendleft(reversed(subtasks))
# Save initial snapshot
self.save_snapshot("Initial task decomposition")
try:
+ logger.info(f"[WF-LIFECYCLE] Calling base class start() method")
await self.start()
+ logger.info(f"[WF-LIFECYCLE] β
Base class start() method completed")
except Exception as e:
- logger.error("Error in workforce execution", extra={
+ logger.error(f"[WF-LIFECYCLE] β Error in workforce execution: {e}", extra={
"api_task_id": self.api_task_id,
"error": str(e)
}, exc_info=True)
self._state = WorkforceState.STOPPED
+ logger.info(f"[WF-LIFECYCLE] Workforce state set to STOPPED after error")
raise
finally:
+ logger.info(f"[WF-LIFECYCLE] eigent_start finally block, current state: {self._state.name}")
if self._state != WorkforceState.STOPPED:
self._state = WorkforceState.IDLE
+ logger.info(f"[WF-LIFECYCLE] Workforce state set to IDLE")
async def handle_decompose_append_task(
self, task: Task, reset: bool = True, coordinator_context: str = ""
@@ -140,23 +153,27 @@ class Workforce(BaseWorkforce):
Returns:
List[Task]: The decomposed subtasks or the original task
"""
+ logger.info(f"[DECOMPOSE] handle_decompose_append_task CALLED, task_id={task.id}, reset={reset}")
+
if not validate_task_content(task.content, task.id):
task.state = TaskState.FAILED
task.result = "Task failed: Invalid or empty content provided"
logger.warning(
- f"Task {task.id} rejected: Invalid or empty content. "
+ f"[DECOMPOSE] Task {task.id} rejected: Invalid or empty content. "
f"Content preview: '{task.content}'"
)
return [task]
if reset and self._state != WorkforceState.RUNNING:
+ logger.info(f"[DECOMPOSE] Resetting workforce (reset={reset}, state={self._state.name})")
self.reset()
- logger.info("Workforce reset before handling task.")
+ logger.info("[DECOMPOSE] Workforce reset complete")
self._task = task
task.state = TaskState.FAILED
if coordinator_context:
+ logger.info(f"[DECOMPOSE] Adding coordinator context to task")
original_content = task.content
task_with_context = coordinator_context
if coordinator_context:
@@ -164,24 +181,30 @@ class Workforce(BaseWorkforce):
task_with_context += original_content
task.content = task_with_context
+ logger.info(f"[DECOMPOSE] Calling _decompose_task with context")
subtasks_result = self._decompose_task(task)
task.content = original_content
else:
+ logger.info(f"[DECOMPOSE] Calling _decompose_task without context")
subtasks_result = self._decompose_task(task)
+ logger.info(f"[DECOMPOSE] _decompose_task returned, processing results")
if isinstance(subtasks_result, Generator):
subtasks = []
for new_tasks in subtasks_result:
subtasks.extend(new_tasks)
+ logger.info(f"[DECOMPOSE] Collected {len(subtasks)} subtasks from generator")
else:
subtasks = subtasks_result
+ logger.info(f"[DECOMPOSE] Got {len(subtasks) if subtasks else 0} subtasks directly")
if subtasks:
self._pending_tasks.extendleft(reversed(subtasks))
- logger.info(f"Appended {len(subtasks)} subtasks to pending tasks")
+ logger.info(f"[DECOMPOSE] β
Appended {len(subtasks)} subtasks to pending tasks")
if not subtasks:
+ logger.warning(f"[DECOMPOSE] No subtasks returned, creating fallback task")
fallback_task = Task(
content=task.content,
id=f"{task.id}.1",
@@ -189,6 +212,7 @@ class Workforce(BaseWorkforce):
)
task.subtasks = [fallback_task]
subtasks = [fallback_task]
+ logger.info(f"[DECOMPOSE] Created fallback task: {fallback_task.id}")
return subtasks
@@ -358,10 +382,48 @@ class Workforce(BaseWorkforce):
return result
def stop(self) -> None:
+ logger.info("=" * 80)
+ logger.info(f"βΉοΈ [WF-LIFECYCLE] stop() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
+ logger.info(f"[WF-LIFECYCLE] Current state before stop: {self._state.name}, _running: {self._running}")
+ logger.info("=" * 80)
super().stop()
+ logger.info(f"[WF-LIFECYCLE] super().stop() completed, new state: {self._state.name}")
task_lock = get_task_lock(self.api_task_id)
task = asyncio.create_task(task_lock.put_queue(ActionEndData()))
task_lock.add_background_task(task)
+ logger.info(f"[WF-LIFECYCLE] β
ActionEndData queued")
+
+ def stop_gracefully(self) -> None:
+ logger.info("=" * 80)
+ logger.info(f"π [WF-LIFECYCLE] stop_gracefully() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
+ logger.info(f"[WF-LIFECYCLE] Current state before stop_gracefully: {self._state.name}, _running: {self._running}")
+ logger.info("=" * 80)
+ super().stop_gracefully()
+ logger.info(f"[WF-LIFECYCLE] β
super().stop_gracefully() completed, new state: {self._state.name}, _running: {self._running}")
+
+ def skip_gracefully(self) -> None:
+ logger.info("=" * 80)
+ logger.info(f"βοΈ [WF-LIFECYCLE] skip_gracefully() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
+ logger.info(f"[WF-LIFECYCLE] Current state before skip_gracefully: {self._state.name}, _running: {self._running}")
+ logger.info("=" * 80)
+ super().skip_gracefully()
+ logger.info(f"[WF-LIFECYCLE] β
super().skip_gracefully() completed, new state: {self._state.name}, _running: {self._running}")
+
+ def pause(self) -> None:
+ logger.info("=" * 80)
+ logger.info(f"βΈοΈ [WF-LIFECYCLE] pause() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
+ logger.info(f"[WF-LIFECYCLE] Current state before pause: {self._state.name}, _running: {self._running}")
+ logger.info("=" * 80)
+ super().pause()
+ logger.info(f"[WF-LIFECYCLE] β
super().pause() completed, new state: {self._state.name}, _running: {self._running}")
+
+ def resume(self) -> None:
+ logger.info("=" * 80)
+ logger.info(f"βΆοΈ [WF-LIFECYCLE] resume() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)})
+ logger.info(f"[WF-LIFECYCLE] Current state before resume: {self._state.name}, _running: {self._running}")
+ logger.info("=" * 80)
+ super().resume()
+ logger.info(f"[WF-LIFECYCLE] β
super().resume() completed, new state: {self._state.name}, _running: {self._running}")
async def cleanup(self) -> None:
r"""Clean up resources when workforce is done"""
diff --git a/src/components/ChatBox/index.tsx b/src/components/ChatBox/index.tsx
index 5cf5e72e4..44a69e48c 100644
--- a/src/components/ChatBox/index.tsx
+++ b/src/components/ChatBox/index.tsx
@@ -208,6 +208,8 @@ export default function ChatBox(): JSX.Element {
const nextTaskId = generateUniqueId()
chatStore.setNextTaskId(nextTaskId);
+ // Use improve endpoint (POST /chat/{id}) - {id} is project_id
+ // This reuses the existing SSE connection and step_solve loop
fetchPost(`/chat/${projectStore.activeProjectId}`, {
question: tempMessageContent,
task_id: nextTaskId
@@ -417,43 +419,57 @@ export default function ChatBox(): JSX.Element {
setIsPauseResumeLoading(false);
};
- // Skip to next task handler
+ // Stop task handler - triggers Action.skip_task which preserves context
const handleSkip = async () => {
const taskId = chatStore.activeTaskId as string;
+ console.log("=" .repeat(80));
+ console.log("π [STOP-BUTTON] handleSkip CALLED from frontend");
+ console.log(`[STOP-BUTTON] taskId: ${taskId}, projectId: ${projectStore.activeProjectId}`);
+ console.log("=" .repeat(80));
setIsPauseResumeLoading(true);
try {
- // First, try to notify backend to skip the task
+ // Call skip-task endpoint to trigger Action.skip_task
+ // This will stop the task gracefully while preserving context for multi-turn
+ console.log(`[STOP-BUTTON] Sending POST request to /chat/${projectStore.activeProjectId}/skip-task`);
await fetchPost(`/chat/${projectStore.activeProjectId}/skip-task`, {
project_id: projectStore.activeProjectId
});
+ console.log("[STOP-BUTTON] β
Backend skip-task request successful");
- // Only stop local task if backend call succeeds
- chatStore.stopTask(taskId);
+ // DO NOT call chatStore.stopTask here!
+ // Keep SSE connection alive to receive "end" event from backend
+ // The "end" event will set status to 'finished' and allow multi-turn conversation
+ console.log("[STOP-BUTTON] β οΈ SSE connection kept alive, waiting for backend 'end' event");
+
+ // Only set isPending to false so UI shows task is stopped
chatStore.setIsPending(taskId, false);
+ console.log("[STOP-BUTTON] β
Task marked as not pending, SSE connection remains open");
toast.success("Task stopped successfully", {
closeButton: true,
});
} catch (error) {
- console.error("Failed to skip task:", error);
+ console.error("[STOP-BUTTON] β Failed to stop task:", error);
- // If backend call failed, still try to stop local task as fallback
- // but with different messaging to user
+ // If backend call failed, close SSE connection as fallback
+ console.log("[STOP-BUTTON] Backend call failed, closing SSE connection as fallback");
try {
chatStore.stopTask(taskId);
chatStore.setIsPending(taskId, false);
+ console.log("[STOP-BUTTON] β οΈ SSE connection closed due to backend failure");
toast.warning("Task stopped locally, but backend notification failed. Backend task may continue running.", {
closeButton: true,
duration: 5000,
});
} catch (localError) {
- console.error("Failed to stop task locally:", localError);
+ console.error("[STOP-BUTTON] β Failed to stop task locally:", localError);
toast.error("Failed to stop task completely. Please refresh the page.", {
closeButton: true,
});
}
} finally {
+ console.log("[STOP-BUTTON] handleSkip completed");
setIsPauseResumeLoading(false);
}
};
diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts
index a26bc3a78..20882b8f2 100644
--- a/src/store/chatStore.ts
+++ b/src/store/chatStore.ts
@@ -572,23 +572,25 @@ const chatStore = (initial?: Partial) => createStore()(
const lockedTaskId = getCurrentTaskId();
const currentTask = getCurrentChatStore().tasks[lockedTaskId];
- // Only ignore messages if:
- // 1. The task doesn't exist, OR
- // 2. The task is finished AND it's not a task-switching event
+ // Only ignore messages if task is finished and not a valid post-completion event
+ // Valid events after task completion:
+ // - Task switching: confirmed, new_task_state, end
+ // - Multi-turn simple answer: wait_confirm
const isTaskSwitchingEvent = agentMessages.step === "confirmed" ||
agentMessages.step === "new_task_state" ||
agentMessages.step === "end";
- // More robust check - only ignore if task doesn't exist OR
- // task is finished and it's not a legitimate flow-control event
+ const isMultiTurnSimpleAnswer = agentMessages.step === "wait_confirm";
+
if (!currentTask) {
console.log(`Task ${lockedTaskId} not found, ignoring SSE message for step: ${agentMessages.step}`);
return;
}
- if (currentTask.status === 'finished' && !isTaskSwitchingEvent) {
- // Only ignore non-essential messages for finished tasks
- // Allow flow control messages through even for finished tasks
+ if (currentTask.status === 'finished' && !isTaskSwitchingEvent && !isMultiTurnSimpleAnswer) {
+ // Ignore messages for finished tasks except:
+ // 1. Task switching events (create new chatStore)
+ // 2. Simple answer events (direct response without new chatStore)
console.log(`Ignoring SSE message for finished task ${lockedTaskId}, step: ${agentMessages.step}`);
return;
}