From 90858a850014e7e2fbf39a6c8ee2e979e06f2a32 Mon Sep 17 00:00:00 2001 From: puzhen <1303385763@qq.com> Date: Fri, 21 Nov 2025 23:47:46 +0800 Subject: [PATCH 1/3] update --- backend/app/controller/chat_controller.py | 22 +++-- backend/app/service/chat_service.py | 109 +++++++++++++++++++--- backend/app/utils/workforce.py | 106 ++++++++++++++++----- src/components/ChatBox/index.tsx | 23 +++-- 4 files changed, 210 insertions(+), 50 deletions(-) diff --git a/backend/app/controller/chat_controller.py b/backend/app/controller/chat_controller.py index a983acfd2..197ef2e90 100644 --- a/backend/app/controller/chat_controller.py +++ b/backend/app/controller/chat_controller.py @@ -204,14 +204,19 @@ def supplement(id: str, data: SupplementChat): @traceroot.trace() def stop(id: str): """stop the task""" - chat_logger.warning("Stopping chat session", extra={"task_id": id}) + chat_logger.info("=" * 80) + chat_logger.info("πŸ›‘ [STOP-BUTTON] DELETE /chat/{id} request received from frontend") + chat_logger.info(f"[STOP-BUTTON] project_id/task_id: {id}") + chat_logger.info("=" * 80) try: task_lock = get_task_lock(id) + chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}") + chat_logger.info(f"[STOP-BUTTON] Queueing ActionStopData(Action.stop) to task_lock queue") asyncio.run(task_lock.put_queue(ActionStopData(action=Action.stop))) - chat_logger.info("Chat stop signal sent", extra={"task_id": id}) + chat_logger.info(f"[STOP-BUTTON] βœ… ActionStopData queued successfully, this will trigger workforce.stop_gracefully()") except Exception as e: # Task lock may not exist if task is already finished or never started - chat_logger.info("Task lock not found or already stopped", extra={"task_id": id, "error": str(e)}) + chat_logger.warning(f"[STOP-BUTTON] ⚠️ Task lock not found or already stopped, task_id: {id}, error: {str(e)}") return Response(status_code=204) @@ -283,17 +288,22 @@ def remove_task(project_id: str, task_id: str): @traceroot.trace() def skip_task(project_id: str): """Skip a task in the workforce""" - chat_logger.info(f"Skipping task in workforce for project_id: {project_id}") + chat_logger.info("=" * 80) + chat_logger.info(f"πŸ›‘ [STOP-BUTTON] SKIP-TASK request received from frontend") + chat_logger.info(f"[STOP-BUTTON] project_id: {project_id}") + chat_logger.info("=" * 80) task_lock = get_task_lock(project_id) + chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}") try: # Queue the skip task action skip_task_action = ActionSkipTaskData(project_id=project_id) + chat_logger.info(f"[STOP-BUTTON] Queueing ActionSkipTaskData to task_lock queue") asyncio.run(task_lock.put_queue(skip_task_action)) - chat_logger.info(f"Task skip request queued for project_id: {project_id}") + chat_logger.info(f"[STOP-BUTTON] βœ… Task skip request queued successfully for project_id: {project_id}") return Response(status_code=201) except Exception as e: - chat_logger.error(f"Error skipping task for project_id: {project_id}: {e}") + chat_logger.error(f"[STOP-BUTTON] ❌ Error skipping task for project_id: {project_id}: {e}") raise UserException(code.error, f"Failed to skip task: {str(e)}") diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index 2c2b028e6..ce0ba277d 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -268,23 +268,34 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): summary_task_content = "" # Track task summary loop_iteration = 0 - logger.info("Starting step_solve", extra={"project_id": options.project_id, "task_id": options.task_id}) + logger.info("=" * 80) + logger.info("πŸš€ [LIFECYCLE] step_solve STARTED", extra={"project_id": options.project_id, "task_id": options.task_id}) + logger.info("=" * 80) logger.debug("Step solve options", extra={"task_id": options.task_id, "model_platform": options.model_platform}) while True: loop_iteration += 1 + logger.debug(f"[LIFECYCLE] step_solve loop iteration #{loop_iteration}", extra={"project_id": options.project_id, "task_id": options.task_id}) if await request.is_disconnected(): - logger.warning(f"Client disconnected for project {options.project_id}") + logger.warning("=" * 80) + logger.warning(f"⚠️ [LIFECYCLE] CLIENT DISCONNECTED for project {options.project_id}") + logger.warning("=" * 80) if workforce is not None: + logger.info(f"[LIFECYCLE] Stopping workforce due to client disconnect, workforce._running={workforce._running}") if workforce._running: workforce.stop() workforce.stop_gracefully() + logger.info(f"[LIFECYCLE] Workforce stopped after client disconnect") + else: + logger.info(f"[LIFECYCLE] Workforce is None, no need to stop") task_lock.status = Status.done try: await delete_task_lock(task_lock.id) + logger.info(f"[LIFECYCLE] Task lock deleted after client disconnect") except Exception as e: logger.error(f"Error deleting task lock on disconnect: {e}") + logger.info(f"[LIFECYCLE] Breaking out of step_solve loop due to client disconnect") break try: item = await task_lock.get_queue() @@ -295,16 +306,23 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): try: if item.action == Action.improve or start_event_loop: + logger.info("=" * 80) + logger.info(f"πŸ’¬ [NEW-QUESTION] Action.improve received or start_event_loop", extra={"project_id": options.project_id, "start_event_loop": start_event_loop}) + logger.info(f"[NEW-QUESTION] Current workforce state: workforce={'None' if workforce is None else f'exists(id={id(workforce)})'}") + logger.info(f"[NEW-QUESTION] Current camel_task state: camel_task={'None' if camel_task is None else f'exists(id={camel_task.id})'}") + logger.info("=" * 80) # from viztracer import VizTracer # tracer = VizTracer() # tracer.start() if start_event_loop is True: question = options.question + logger.info(f"[NEW-QUESTION] Initial question from options.question: '{question[:100]}...'") start_event_loop = False else: assert isinstance(item, ActionImproveData) question = item.data + logger.info(f"[NEW-QUESTION] Follow-up question from ActionImproveData: '{question[:100]}...'") is_exceeded, total_length = check_conversation_history_length(task_lock) if is_exceeded: @@ -321,10 +339,14 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): if len(options.attaches) > 0: # Questions with attachments always need workforce is_complex_task = True + logger.info(f"[NEW-QUESTION] Has attachments, treating as complex task") else: + logger.info(f"[NEW-QUESTION] Calling question_confirm to determine complexity") is_complex_task = await question_confirm(question_agent, question, task_lock) + logger.info(f"[NEW-QUESTION] question_confirm result: is_complex={is_complex_task}") if not is_complex_task: + logger.info(f"[NEW-QUESTION] βœ… Simple question, providing direct answer without workforce") simple_answer_prompt = f"{build_conversation_context(task_lock, header='=== Previous Conversation ===')}User Query: {question}\n\nProvide a direct, helpful answer to this simple question." try: @@ -359,18 +381,23 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): except Exception as e: logger.error(f"Error cleaning up folder: {e}") else: + logger.info(f"[NEW-QUESTION] πŸ”§ Complex task, creating workforce and decomposing") # Update the sync_step with new task_id if hasattr(item, 'new_task_id') and item.new_task_id: set_current_task_id(options.project_id, item.new_task_id) # Reset summary generation flag for new tasks to ensure proper summaries task_lock.summary_generated = False - logger.info("Reset summary_generated flag for new task", extra={"project_id": options.project_id, "new_task_id": item.new_task_id}) + logger.info("[NEW-QUESTION] Reset summary_generated flag for new task", extra={"project_id": options.project_id, "new_task_id": item.new_task_id}) + logger.info(f"[NEW-QUESTION] Sending 'confirmed' SSE to frontend") yield sse_json("confirmed", {"question": question}) - + + logger.info(f"[NEW-QUESTION] Building context for coordinator") context_for_coordinator = build_context_for_workforce(task_lock, options) + logger.info(f"[NEW-QUESTION] 🏭 Creating NEW workforce instance (previous={'None' if workforce is None else f'id={id(workforce)}'})") (workforce, mcp) = await construct_workforce(options) + logger.info(f"[NEW-QUESTION] βœ… NEW Workforce instance created, id={id(workforce)}") for new_agent in options.new_agents: workforce.add_single_agent_worker( format_agent_description(new_agent), await new_agent_model(new_agent, options) @@ -378,23 +405,27 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): task_lock.status = Status.confirmed clean_task_content = question + options.summary_prompt + logger.info(f"[NEW-QUESTION] Creating camel_task with id={options.task_id}") camel_task = Task(content=clean_task_content, id=options.task_id) if len(options.attaches) > 0: camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches} + logger.info(f"[NEW-QUESTION] 🧩 Starting task decomposition via workforce.eigent_make_sub_tasks") sub_tasks = await asyncio.to_thread( workforce.eigent_make_sub_tasks, camel_task, context_for_coordinator ) + logger.info(f"[NEW-QUESTION] βœ… Task decomposed into {len(sub_tasks)} subtasks") + logger.info(f"[NEW-QUESTION] Generating task summary") summary_task_agent = task_summary_agent(options) try: summary_task_content = await asyncio.wait_for( summary_task(summary_task_agent, camel_task), timeout=10 ) task_lock.summary_generated = True - logger.info("Generated summary for task", extra={"project_id": options.project_id}) + logger.info("[NEW-QUESTION] βœ… Summary generated successfully", extra={"project_id": options.project_id}) except asyncio.TimeoutError: logger.warning("summary_task timeout", extra={"project_id": options.project_id, "task_id": options.task_id}) # Fallback to a minimal summary to unblock UI @@ -408,7 +439,10 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): summary_task_content = f"{fallback_name}|{fallback_summary}" task_lock.summary_generated = True + logger.info(f"[NEW-QUESTION] πŸ“€ Sending to_sub_tasks SSE to frontend (task card)") + logger.info(f"[NEW-QUESTION] to_sub_tasks data: task_id={camel_task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(camel_task.subtasks)}") yield to_sub_tasks(camel_task, summary_task_content) + logger.info(f"[NEW-QUESTION] βœ… to_sub_tasks SSE sent") # tracer.stop() # tracer.save("trace.json") @@ -464,11 +498,20 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): } yield sse_json("remove_task", returnData) elif item.action == Action.skip_task: + logger.info("=" * 80) + logger.info(f"πŸ›‘ [LIFECYCLE] SKIP_TASK action received", extra={"project_id": options.project_id, "item_project_id": item.project_id}) + logger.info("=" * 80) if workforce is not None and item.project_id == options.project_id: + logger.info(f"[LIFECYCLE] Workforce exists (id={id(workforce)}), state={workforce._state.name}, _running={workforce._running}") if workforce._state.name == 'PAUSED': + logger.info(f"[LIFECYCLE] Workforce is PAUSED, resuming before skip") # Resume paused workforce to skip the task workforce.resume() + logger.info(f"[LIFECYCLE] Calling workforce.skip_gracefully()") workforce.skip_gracefully() + logger.info(f"[LIFECYCLE] workforce.skip_gracefully() called successfully") + else: + logger.warning(f"[LIFECYCLE] Cannot skip: workforce is None or project_id mismatch (workforce={'not None' if workforce else 'None'}, expected_project_id={options.project_id}, item_project_id={item.project_id})") elif item.action == Action.start: # Check conversation history length before starting task is_exceeded, total_length = check_conversation_history_length(task_lock) @@ -504,11 +547,15 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): yield sse_json("task_state", item.data) elif item.action == Action.new_task_state: + logger.info("=" * 80) + logger.info(f"πŸ”„ [LIFECYCLE] NEW_TASK_STATE action received (Multi-turn)", extra={"project_id": options.project_id}) + logger.info("=" * 80) # Log new task state details new_task_id = item.data.get('task_id', 'unknown') new_task_state = item.data.get('state', 'unknown') new_task_result = item.data.get('result', '') + logger.info(f"[LIFECYCLE] New task details: task_id={new_task_id}, state={new_task_state}") if camel_task is None: logger.error(f"NEW_TASK_STATE action received but camel_task is None for project {options.project_id}, task {new_task_id}") @@ -548,13 +595,18 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): # Then handle multi-turn processing if workforce is not None and new_task_content: + logger.info(f"[LIFECYCLE] Multi-turn: workforce exists (id={id(workforce)}), pausing for question confirmation") task_lock.status = Status.confirming workforce.pause() + logger.info(f"[LIFECYCLE] Multi-turn: workforce paused, state={workforce._state.name}") try: + logger.info(f"[LIFECYCLE] Multi-turn: calling question_confirm for new task") is_multi_turn_complex = await question_confirm(question_agent, new_task_content, task_lock) + logger.info(f"[LIFECYCLE] Multi-turn: question_confirm result: is_complex={is_multi_turn_complex}") if not is_multi_turn_complex: + logger.info(f"[LIFECYCLE] Multi-turn: task is simple, providing direct answer without workforce") simple_answer_prompt = f"{build_conversation_context(task_lock, header='=== Previous Conversation ===')}User Query: {new_task_content}\n\nProvide a direct, helpful answer to this simple question." try: @@ -569,22 +621,28 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): logger.error(f"Error generating simple answer in multi-turn: {e}") yield sse_json("wait_confirm", {"content": "I encountered an error while processing your question.", "question": new_task_content}) + logger.info(f"[LIFECYCLE] Multi-turn: simple answer provided, resuming workforce") workforce.resume() + logger.info(f"[LIFECYCLE] Multi-turn: workforce resumed, continuing to next iteration") continue # This continues the main while loop, waiting for next action # Update the sync_step with new task_id before sending new task sse events + logger.info(f"[LIFECYCLE] Multi-turn: task is complex, setting new task_id={task_id}") set_current_task_id(options.project_id, task_id) - + yield sse_json("confirmed", {"question": new_task_content}) task_lock.status = Status.confirmed + logger.info(f"[LIFECYCLE] Multi-turn: building context for workforce") context_for_multi_turn = build_context_for_workforce(task_lock, options) + logger.info(f"[LIFECYCLE] Multi-turn: calling workforce.handle_decompose_append_task for new task decomposition") new_sub_tasks = await workforce.handle_decompose_append_task( camel_task, reset=False, coordinator_context=context_for_multi_turn ) + logger.info(f"[LIFECYCLE] Multi-turn: task decomposed into {len(new_sub_tasks)} subtasks") # Generate proper LLM summary for multi-turn tasks instead of hardcoded fallback try: @@ -682,11 +740,16 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): ) workforce.resume() elif item.action == Action.end: - logger.info(f"Processing END action for project {options.project_id}, task {options.task_id}, camel_task exists: {camel_task is not None}, current status: {task_lock.status}") - + logger.info("=" * 80) + logger.info(f"🏁 [LIFECYCLE] END action received for project {options.project_id}, task {options.task_id}") + logger.info(f"[LIFECYCLE] camel_task exists: {camel_task is not None}, current status: {task_lock.status}, workforce exists: {workforce is not None}") + if workforce is not None: + logger.info(f"[LIFECYCLE] Workforce state at END: _state={workforce._state.name}, _running={workforce._running}") + logger.info("=" * 80) + # Prevent duplicate end processing if task_lock.status == Status.done: - logger.warning(f"END action received but task already marked as done for project {options.project_id}, task {options.task_id}. Ignoring duplicate END action.") + logger.warning(f"⚠️ [LIFECYCLE] END action received but task already marked as done. Ignoring duplicate END action.") continue if camel_task is None: @@ -718,17 +781,20 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): yield sse_json("end", final_result) if workforce is not None: + logger.info(f"[LIFECYCLE] πŸ›‘ Calling workforce.stop_gracefully() for project {options.project_id}, workforce id={id(workforce)}") workforce.stop_gracefully() - logger.info(f"Workforce stopped gracefully for project {options.project_id}") + logger.info(f"[LIFECYCLE] βœ… Workforce stopped gracefully for project {options.project_id}") workforce = None + logger.info(f"[LIFECYCLE] Workforce set to None") else: - logger.warning(f"Workforce already None at end action for project {options.project_id}") + logger.warning(f"[LIFECYCLE] ⚠️ Workforce already None at end action for project {options.project_id}") camel_task = None + logger.info(f"[LIFECYCLE] camel_task set to None") if question_agent is not None: question_agent.reset() - logger.info(f"Reset question_agent for project {options.project_id}") + logger.info(f"[LIFECYCLE] question_agent reset for project {options.project_id}") elif item.action == Action.supplement: # Check if this might be a misrouted second question @@ -752,14 +818,23 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): workforce.pause() yield sse_json(Action.budget_not_enough, {"message": "budget not enouth"}) elif item.action == Action.stop: + logger.info("=" * 80) + logger.info(f"⏹️ [LIFECYCLE] STOP action received for project {options.project_id}") + logger.info("=" * 80) if workforce is not None: + logger.info(f"[LIFECYCLE] Workforce exists (id={id(workforce)}), _running={workforce._running}, _state={workforce._state.name}") if workforce._running: + logger.info(f"[LIFECYCLE] Calling workforce.stop() because _running=True") workforce.stop() + logger.info(f"[LIFECYCLE] workforce.stop() completed") + logger.info(f"[LIFECYCLE] Calling workforce.stop_gracefully()") workforce.stop_gracefully() - logger.info(f"Workforce stopped for project {options.project_id}") + logger.info(f"[LIFECYCLE] βœ… Workforce stopped for project {options.project_id}") else: - logger.warning(f"Workforce is None at stop action for project {options.project_id}") + logger.warning(f"[LIFECYCLE] ⚠️ Workforce is None at stop action for project {options.project_id}") + logger.info(f"[LIFECYCLE] Deleting task lock") await delete_task_lock(task_lock.id) + logger.info(f"[LIFECYCLE] Task lock deleted, breaking out of loop") break else: logger.warning(f"Unknown action: {item.action}") @@ -796,13 +871,17 @@ async def install_mcp( def to_sub_tasks(task: Task, summary_task_content: str): - return sse_json( + logger.info(f"[TO-SUB-TASKS] πŸ“‹ Creating to_sub_tasks SSE event") + logger.info(f"[TO-SUB-TASKS] task.id={task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(task.subtasks)}") + result = sse_json( "to_sub_tasks", { "summary_task": summary_task_content, "sub_tasks": tree_sub_tasks(task.subtasks), }, ) + logger.info(f"[TO-SUB-TASKS] βœ… to_sub_tasks SSE event created") + return result def tree_sub_tasks(sub_tasks: list[Task], depth: int = 0): diff --git a/backend/app/utils/workforce.py b/backend/app/utils/workforce.py index 52cda36d4..c9793ec10 100644 --- a/backend/app/utils/workforce.py +++ b/backend/app/utils/workforce.py @@ -44,14 +44,11 @@ class Workforce(BaseWorkforce): use_structured_output_handler: bool = True, ) -> None: self.api_task_id = api_task_id - logger.info("Initializing workforce", extra={ - "api_task_id": api_task_id, - "description": description[:100] + "..." if len(description) > 100 else description, - "children_count": len(children) if children else 0, - "graceful_shutdown_timeout": graceful_shutdown_timeout, - "share_memory": share_memory, - "use_structured_output_handler": use_structured_output_handler - }) + logger.info("=" * 80) + logger.info("🏭 [WF-LIFECYCLE] Workforce.__init__ STARTED", extra={"api_task_id": api_task_id}) + logger.info(f"[WF-LIFECYCLE] Workforce id will be: {id(self)}") + logger.info(f"[WF-LIFECYCLE] Init params: graceful_shutdown_timeout={graceful_shutdown_timeout}, share_memory={share_memory}") + logger.info("=" * 80) super().__init__( description=description, children=children, @@ -62,6 +59,7 @@ class Workforce(BaseWorkforce): share_memory=share_memory, use_structured_output_handler=use_structured_output_handler, ) + logger.info(f"[WF-LIFECYCLE] βœ… Workforce.__init__ COMPLETED, id={id(self)}") def eigent_make_sub_tasks(self, task: Task, coordinator_context: str = ""): """ @@ -72,58 +70,73 @@ class Workforce(BaseWorkforce): coordinator_context: Optional context ONLY for coordinator agent during decomposition. This context will NOT be passed to subtasks or worker agents. """ - logger.info("Starting task decomposition", extra={ + logger.info("=" * 80) + logger.info("🧩 [DECOMPOSE] eigent_make_sub_tasks CALLED", extra={ "api_task_id": self.api_task_id, - "task_id": task.id, - "task_content": task.content[:200] + "..." if len(task.content) > 200 else task.content, - "has_coordinator_context": bool(coordinator_context) + "workforce_id": id(self), + "task_id": task.id }) + logger.info(f"[DECOMPOSE] Task content preview: '{task.content[:200]}...'") + logger.info(f"[DECOMPOSE] Has coordinator context: {bool(coordinator_context)}") + logger.info(f"[DECOMPOSE] Current workforce state: {self._state.name}, _running: {self._running}") + logger.info("=" * 80) if not validate_task_content(task.content, task.id): task.state = TaskState.FAILED task.result = "Task failed: Invalid or empty content provided" - logger.warning("Task rejected: Invalid or empty content", extra={ + logger.warning("❌ [DECOMPOSE] Task rejected: Invalid or empty content", extra={ "task_id": task.id, "content_preview": task.content[:50] + "..." if len(task.content) > 50 else task.content }) raise UserException(code.error, task.result) + logger.info(f"[DECOMPOSE] Resetting workforce state") self.reset() self._task = task self.set_channel(TaskChannel()) self._state = WorkforceState.RUNNING task.state = TaskState.OPEN + logger.info(f"[DECOMPOSE] Workforce reset complete, state: {self._state.name}") + logger.info(f"[DECOMPOSE] Calling handle_decompose_append_task") subtasks = asyncio.run(self.handle_decompose_append_task(task, reset=False, coordinator_context=coordinator_context)) - logger.info("Task decomposition completed", extra={ + logger.info("=" * 80) + logger.info(f"βœ… [DECOMPOSE] Task decomposition COMPLETED", extra={ "api_task_id": self.api_task_id, "task_id": task.id, "subtasks_count": len(subtasks) }) + logger.info("=" * 80) return subtasks async def eigent_start(self, subtasks: list[Task]): """start the workforce""" - logger.info("Starting workforce execution", extra={ - "api_task_id": self.api_task_id, - "subtasks_count": len(subtasks) - }) + logger.info("=" * 80) + logger.info("▢️ [WF-LIFECYCLE] eigent_start CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)}) + logger.info(f"[WF-LIFECYCLE] Starting workforce execution with {len(subtasks)} subtasks") + logger.info(f"[WF-LIFECYCLE] Current workforce state: {self._state.name}, _running: {self._running}") + logger.info("=" * 80) self._pending_tasks.extendleft(reversed(subtasks)) # Save initial snapshot self.save_snapshot("Initial task decomposition") try: + logger.info(f"[WF-LIFECYCLE] Calling base class start() method") await self.start() + logger.info(f"[WF-LIFECYCLE] βœ… Base class start() method completed") except Exception as e: - logger.error("Error in workforce execution", extra={ + logger.error(f"[WF-LIFECYCLE] ❌ Error in workforce execution: {e}", extra={ "api_task_id": self.api_task_id, "error": str(e) }, exc_info=True) self._state = WorkforceState.STOPPED + logger.info(f"[WF-LIFECYCLE] Workforce state set to STOPPED after error") raise finally: + logger.info(f"[WF-LIFECYCLE] eigent_start finally block, current state: {self._state.name}") if self._state != WorkforceState.STOPPED: self._state = WorkforceState.IDLE + logger.info(f"[WF-LIFECYCLE] Workforce state set to IDLE") async def handle_decompose_append_task( self, task: Task, reset: bool = True, coordinator_context: str = "" @@ -140,23 +153,27 @@ class Workforce(BaseWorkforce): Returns: List[Task]: The decomposed subtasks or the original task """ + logger.info(f"[DECOMPOSE] handle_decompose_append_task CALLED, task_id={task.id}, reset={reset}") + if not validate_task_content(task.content, task.id): task.state = TaskState.FAILED task.result = "Task failed: Invalid or empty content provided" logger.warning( - f"Task {task.id} rejected: Invalid or empty content. " + f"[DECOMPOSE] Task {task.id} rejected: Invalid or empty content. " f"Content preview: '{task.content}'" ) return [task] if reset and self._state != WorkforceState.RUNNING: + logger.info(f"[DECOMPOSE] Resetting workforce (reset={reset}, state={self._state.name})") self.reset() - logger.info("Workforce reset before handling task.") + logger.info("[DECOMPOSE] Workforce reset complete") self._task = task task.state = TaskState.FAILED if coordinator_context: + logger.info(f"[DECOMPOSE] Adding coordinator context to task") original_content = task.content task_with_context = coordinator_context if coordinator_context: @@ -164,24 +181,30 @@ class Workforce(BaseWorkforce): task_with_context += original_content task.content = task_with_context + logger.info(f"[DECOMPOSE] Calling _decompose_task with context") subtasks_result = self._decompose_task(task) task.content = original_content else: + logger.info(f"[DECOMPOSE] Calling _decompose_task without context") subtasks_result = self._decompose_task(task) + logger.info(f"[DECOMPOSE] _decompose_task returned, processing results") if isinstance(subtasks_result, Generator): subtasks = [] for new_tasks in subtasks_result: subtasks.extend(new_tasks) + logger.info(f"[DECOMPOSE] Collected {len(subtasks)} subtasks from generator") else: subtasks = subtasks_result + logger.info(f"[DECOMPOSE] Got {len(subtasks) if subtasks else 0} subtasks directly") if subtasks: self._pending_tasks.extendleft(reversed(subtasks)) - logger.info(f"Appended {len(subtasks)} subtasks to pending tasks") + logger.info(f"[DECOMPOSE] βœ… Appended {len(subtasks)} subtasks to pending tasks") if not subtasks: + logger.warning(f"[DECOMPOSE] No subtasks returned, creating fallback task") fallback_task = Task( content=task.content, id=f"{task.id}.1", @@ -189,6 +212,7 @@ class Workforce(BaseWorkforce): ) task.subtasks = [fallback_task] subtasks = [fallback_task] + logger.info(f"[DECOMPOSE] Created fallback task: {fallback_task.id}") return subtasks @@ -358,10 +382,48 @@ class Workforce(BaseWorkforce): return result def stop(self) -> None: + logger.info("=" * 80) + logger.info(f"⏹️ [WF-LIFECYCLE] stop() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)}) + logger.info(f"[WF-LIFECYCLE] Current state before stop: {self._state.name}, _running: {self._running}") + logger.info("=" * 80) super().stop() + logger.info(f"[WF-LIFECYCLE] super().stop() completed, new state: {self._state.name}") task_lock = get_task_lock(self.api_task_id) task = asyncio.create_task(task_lock.put_queue(ActionEndData())) task_lock.add_background_task(task) + logger.info(f"[WF-LIFECYCLE] βœ… ActionEndData queued") + + def stop_gracefully(self) -> None: + logger.info("=" * 80) + logger.info(f"πŸ›‘ [WF-LIFECYCLE] stop_gracefully() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)}) + logger.info(f"[WF-LIFECYCLE] Current state before stop_gracefully: {self._state.name}, _running: {self._running}") + logger.info("=" * 80) + super().stop_gracefully() + logger.info(f"[WF-LIFECYCLE] βœ… super().stop_gracefully() completed, new state: {self._state.name}, _running: {self._running}") + + def skip_gracefully(self) -> None: + logger.info("=" * 80) + logger.info(f"⏭️ [WF-LIFECYCLE] skip_gracefully() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)}) + logger.info(f"[WF-LIFECYCLE] Current state before skip_gracefully: {self._state.name}, _running: {self._running}") + logger.info("=" * 80) + super().skip_gracefully() + logger.info(f"[WF-LIFECYCLE] βœ… super().skip_gracefully() completed, new state: {self._state.name}, _running: {self._running}") + + def pause(self) -> None: + logger.info("=" * 80) + logger.info(f"⏸️ [WF-LIFECYCLE] pause() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)}) + logger.info(f"[WF-LIFECYCLE] Current state before pause: {self._state.name}, _running: {self._running}") + logger.info("=" * 80) + super().pause() + logger.info(f"[WF-LIFECYCLE] βœ… super().pause() completed, new state: {self._state.name}, _running: {self._running}") + + def resume(self) -> None: + logger.info("=" * 80) + logger.info(f"▢️ [WF-LIFECYCLE] resume() CALLED", extra={"api_task_id": self.api_task_id, "workforce_id": id(self)}) + logger.info(f"[WF-LIFECYCLE] Current state before resume: {self._state.name}, _running: {self._running}") + logger.info("=" * 80) + super().resume() + logger.info(f"[WF-LIFECYCLE] βœ… super().resume() completed, new state: {self._state.name}, _running: {self._running}") async def cleanup(self) -> None: r"""Clean up resources when workforce is done""" diff --git a/src/components/ChatBox/index.tsx b/src/components/ChatBox/index.tsx index 5cf5e72e4..a0ce0e8de 100644 --- a/src/components/ChatBox/index.tsx +++ b/src/components/ChatBox/index.tsx @@ -417,43 +417,52 @@ export default function ChatBox(): JSX.Element { setIsPauseResumeLoading(false); }; - // Skip to next task handler + // Stop task handler (真正停歒workforce) const handleSkip = async () => { const taskId = chatStore.activeTaskId as string; + console.log("=" .repeat(80)); + console.log("πŸ›‘ [STOP-BUTTON] handleSkip CALLED from frontend"); + console.log(`[STOP-BUTTON] taskId: ${taskId}, projectId: ${projectStore.activeProjectId}`); + console.log("=" .repeat(80)); setIsPauseResumeLoading(true); try { - // First, try to notify backend to skip the task - await fetchPost(`/chat/${projectStore.activeProjectId}/skip-task`, { - project_id: projectStore.activeProjectId - }); + // Call DELETE /chat/{id} to trigger Action.stop and stop_gracefully + console.log(`[STOP-BUTTON] Sending DELETE request to /chat/${projectStore.activeProjectId}`); + await fetchDelete(`/chat/${projectStore.activeProjectId}`); + console.log("[STOP-BUTTON] βœ… Backend stop request successful"); // Only stop local task if backend call succeeds + console.log("[STOP-BUTTON] Calling chatStore.stopTask locally"); chatStore.stopTask(taskId); chatStore.setIsPending(taskId, false); + console.log("[STOP-BUTTON] βœ… Local task stopped successfully"); toast.success("Task stopped successfully", { closeButton: true, }); } catch (error) { - console.error("Failed to skip task:", error); + console.error("[STOP-BUTTON] ❌ Failed to stop task:", error); // If backend call failed, still try to stop local task as fallback // but with different messaging to user + console.log("[STOP-BUTTON] Attempting to stop task locally as fallback"); try { chatStore.stopTask(taskId); chatStore.setIsPending(taskId, false); + console.log("[STOP-BUTTON] ⚠️ Local task stopped, but backend may still be running"); toast.warning("Task stopped locally, but backend notification failed. Backend task may continue running.", { closeButton: true, duration: 5000, }); } catch (localError) { - console.error("Failed to stop task locally:", localError); + console.error("[STOP-BUTTON] ❌ Failed to stop task locally:", localError); toast.error("Failed to stop task completely. Please refresh the page.", { closeButton: true, }); } } finally { + console.log("[STOP-BUTTON] handleSkip completed"); setIsPauseResumeLoading(false); } }; From f888859afa862e20dd11db45376be193229326d4 Mon Sep 17 00:00:00 2001 From: puzhen <1303385763@qq.com> Date: Sat, 22 Nov 2025 11:38:42 +0800 Subject: [PATCH 2/3] update --- src/components/ChatBox/index.tsx | 2 ++ src/store/chatStore.ts | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/components/ChatBox/index.tsx b/src/components/ChatBox/index.tsx index a0ce0e8de..a880e5dcd 100644 --- a/src/components/ChatBox/index.tsx +++ b/src/components/ChatBox/index.tsx @@ -208,6 +208,8 @@ export default function ChatBox(): JSX.Element { const nextTaskId = generateUniqueId() chatStore.setNextTaskId(nextTaskId); + // Use improve endpoint (POST /chat/{id}) - {id} is project_id + // This reuses the existing SSE connection and step_solve loop fetchPost(`/chat/${projectStore.activeProjectId}`, { question: tempMessageContent, task_id: nextTaskId diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index a26bc3a78..20882b8f2 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -572,23 +572,25 @@ const chatStore = (initial?: Partial) => createStore()( const lockedTaskId = getCurrentTaskId(); const currentTask = getCurrentChatStore().tasks[lockedTaskId]; - // Only ignore messages if: - // 1. The task doesn't exist, OR - // 2. The task is finished AND it's not a task-switching event + // Only ignore messages if task is finished and not a valid post-completion event + // Valid events after task completion: + // - Task switching: confirmed, new_task_state, end + // - Multi-turn simple answer: wait_confirm const isTaskSwitchingEvent = agentMessages.step === "confirmed" || agentMessages.step === "new_task_state" || agentMessages.step === "end"; - // More robust check - only ignore if task doesn't exist OR - // task is finished and it's not a legitimate flow-control event + const isMultiTurnSimpleAnswer = agentMessages.step === "wait_confirm"; + if (!currentTask) { console.log(`Task ${lockedTaskId} not found, ignoring SSE message for step: ${agentMessages.step}`); return; } - if (currentTask.status === 'finished' && !isTaskSwitchingEvent) { - // Only ignore non-essential messages for finished tasks - // Allow flow control messages through even for finished tasks + if (currentTask.status === 'finished' && !isTaskSwitchingEvent && !isMultiTurnSimpleAnswer) { + // Ignore messages for finished tasks except: + // 1. Task switching events (create new chatStore) + // 2. Simple answer events (direct response without new chatStore) console.log(`Ignoring SSE message for finished task ${lockedTaskId}, step: ${agentMessages.step}`); return; } From fd04703c77165ae16c953d5c7c63fac6b6ecfeaa Mon Sep 17 00:00:00 2001 From: puzhen <1303385763@qq.com> Date: Sat, 22 Nov 2025 17:29:44 +0800 Subject: [PATCH 3/3] preserve task.lock --- backend/app/controller/chat_controller.py | 20 +++-- backend/app/service/chat_service.py | 105 +++++++++++++++++----- src/components/ChatBox/index.tsx | 31 ++++--- 3 files changed, 117 insertions(+), 39 deletions(-) diff --git a/backend/app/controller/chat_controller.py b/backend/app/controller/chat_controller.py index 197ef2e90..b7fc78262 100644 --- a/backend/app/controller/chat_controller.py +++ b/backend/app/controller/chat_controller.py @@ -287,21 +287,31 @@ def remove_task(project_id: str, task_id: str): @router.post("/chat/{project_id}/skip-task", name="skip task in workforce") @traceroot.trace() def skip_task(project_id: str): - """Skip a task in the workforce""" + """ + Skip/Stop current task execution while preserving context. + This endpoint is called when user clicks the Stop button. + + Behavior: + - Stops workforce gracefully + - Marks task as done + - Preserves conversation_history and last_task_result in task_lock + - Sends 'end' event to frontend + - Keeps SSE connection alive for multi-turn conversation + """ chat_logger.info("=" * 80) - chat_logger.info(f"πŸ›‘ [STOP-BUTTON] SKIP-TASK request received from frontend") + chat_logger.info(f"πŸ›‘ [STOP-BUTTON] SKIP-TASK request received from frontend (User clicked Stop)") chat_logger.info(f"[STOP-BUTTON] project_id: {project_id}") chat_logger.info("=" * 80) task_lock = get_task_lock(project_id) chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}") try: - # Queue the skip task action + # Queue the skip task action - this will preserve context for multi-turn skip_task_action = ActionSkipTaskData(project_id=project_id) - chat_logger.info(f"[STOP-BUTTON] Queueing ActionSkipTaskData to task_lock queue") + chat_logger.info(f"[STOP-BUTTON] Queueing ActionSkipTaskData (preserves context, marks as done)") asyncio.run(task_lock.put_queue(skip_task_action)) - chat_logger.info(f"[STOP-BUTTON] βœ… Task skip request queued successfully for project_id: {project_id}") + chat_logger.info(f"[STOP-BUTTON] βœ… Skip request queued - task will stop gracefully and preserve context") return Response(status_code=201) except Exception as e: diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index ce0ba277d..0ac7969d2 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -395,20 +395,39 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): logger.info(f"[NEW-QUESTION] Building context for coordinator") context_for_coordinator = build_context_for_workforce(task_lock, options) - logger.info(f"[NEW-QUESTION] 🏭 Creating NEW workforce instance (previous={'None' if workforce is None else f'id={id(workforce)}'})") - (workforce, mcp) = await construct_workforce(options) - logger.info(f"[NEW-QUESTION] βœ… NEW Workforce instance created, id={id(workforce)}") - for new_agent in options.new_agents: - workforce.add_single_agent_worker( - format_agent_description(new_agent), await new_agent_model(new_agent, options) - ) + # Check if workforce exists - if so, reuse it (agents are preserved) + # Otherwise create new workforce + if workforce is not None: + logger.info(f"[NEW-QUESTION] πŸ”„ Workforce exists (id={id(workforce)}), state={workforce._state.name}, _running={workforce._running}") + logger.info(f"[NEW-QUESTION] βœ… Reusing existing workforce with preserved agents") + # Workforce is already stopped from skip_task, ready for new decomposition + else: + logger.info(f"[NEW-QUESTION] 🏭 Creating NEW workforce instance (workforce=None)") + (workforce, mcp) = await construct_workforce(options) + logger.info(f"[NEW-QUESTION] βœ… NEW Workforce instance created, id={id(workforce)}") + for new_agent in options.new_agents: + workforce.add_single_agent_worker( + format_agent_description(new_agent), await new_agent_model(new_agent, options) + ) task_lock.status = Status.confirmed - clean_task_content = question + options.summary_prompt - logger.info(f"[NEW-QUESTION] Creating camel_task with id={options.task_id}") - camel_task = Task(content=clean_task_content, id=options.task_id) - if len(options.attaches) > 0: - camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches} + # If camel_task already exists (from previous paused task), add new question as subtask + # Otherwise, create a new camel_task + if camel_task is not None: + logger.info(f"[NEW-QUESTION] πŸ”„ camel_task exists (id={camel_task.id}), adding new question as context") + # Update the task content with new question + clean_task_content = question + options.summary_prompt + logger.info(f"[NEW-QUESTION] Updating existing camel_task content with new question") + # We keep the existing task structure but update content for new decomposition + camel_task = Task(content=clean_task_content, id=options.task_id) + if len(options.attaches) > 0: + camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches} + else: + clean_task_content = question + options.summary_prompt + logger.info(f"[NEW-QUESTION] Creating NEW camel_task with id={options.task_id}") + camel_task = Task(content=clean_task_content, id=options.task_id) + if len(options.attaches) > 0: + camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches} logger.info(f"[NEW-QUESTION] 🧩 Starting task decomposition via workforce.eigent_make_sub_tasks") sub_tasks = await asyncio.to_thread( @@ -499,19 +518,63 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): yield sse_json("remove_task", returnData) elif item.action == Action.skip_task: logger.info("=" * 80) - logger.info(f"πŸ›‘ [LIFECYCLE] SKIP_TASK action received", extra={"project_id": options.project_id, "item_project_id": item.project_id}) + logger.info(f"πŸ›‘ [LIFECYCLE] SKIP_TASK action received (User clicked Stop button)", extra={"project_id": options.project_id, "item_project_id": item.project_id}) logger.info("=" * 80) + + # Prevent duplicate skip processing + if task_lock.status == Status.done: + logger.warning(f"⚠️ [LIFECYCLE] SKIP_TASK received but task already marked as done. Ignoring.") + continue + if workforce is not None and item.project_id == options.project_id: logger.info(f"[LIFECYCLE] Workforce exists (id={id(workforce)}), state={workforce._state.name}, _running={workforce._running}") - if workforce._state.name == 'PAUSED': - logger.info(f"[LIFECYCLE] Workforce is PAUSED, resuming before skip") - # Resume paused workforce to skip the task - workforce.resume() - logger.info(f"[LIFECYCLE] Calling workforce.skip_gracefully()") - workforce.skip_gracefully() - logger.info(f"[LIFECYCLE] workforce.skip_gracefully() called successfully") + + # Stop workforce completely + logger.info(f"[LIFECYCLE] πŸ›‘ Stopping workforce") + if workforce._running: + # Import correct BaseWorkforce from camel + from camel.societies.workforce.workforce import Workforce as BaseWorkforce + BaseWorkforce.stop(workforce) + logger.info(f"[LIFECYCLE] βœ… BaseWorkforce.stop() completed, state={workforce._state.name}, _running={workforce._running}") + + workforce.stop_gracefully() + logger.info(f"[LIFECYCLE] βœ… Workforce stopped gracefully") + + # Clear workforce to avoid state issues + # Next question will create fresh workforce + workforce = None + logger.info(f"[LIFECYCLE] Workforce set to None, will be recreated on next question") else: - logger.warning(f"[LIFECYCLE] Cannot skip: workforce is None or project_id mismatch (workforce={'not None' if workforce else 'None'}, expected_project_id={options.project_id}, item_project_id={item.project_id})") + logger.warning(f"[LIFECYCLE] Cannot skip: workforce is None or project_id mismatch") + + # Mark task as done and preserve context (like Action.end does) + task_lock.status = Status.done + end_message = "Task stoppedTask stopped by user" + task_lock.last_task_result = end_message + + # Add to conversation history (like normal end does) + if camel_task is not None: + task_content: str = camel_task.content + if "=== CURRENT TASK ===" in task_content: + task_content = task_content.split("=== CURRENT TASK ===")[-1].strip() + else: + task_content: str = f"Task {options.task_id}" + + task_lock.add_conversation('task_result', { + 'task_content': task_content, + 'task_result': end_message, + 'working_directory': get_working_directory(options, task_lock) + }) + + # Clear camel_task as well (workforce is cleared, so camel_task should be too) + camel_task = None + logger.info(f"[LIFECYCLE] βœ… Task marked as done, workforce and camel_task cleared, ready for multi-turn") + + # Send end event to frontend with string format (matching normal end event format) + yield sse_json("end", end_message) + logger.info(f"[LIFECYCLE] Sent 'end' SSE event to frontend") + + # Continue loop to accept new questions (don't break, don't delete task_lock) elif item.action == Action.start: # Check conversation history length before starting task is_exceeded, total_length = check_conversation_history_length(task_lock) diff --git a/src/components/ChatBox/index.tsx b/src/components/ChatBox/index.tsx index a880e5dcd..44a69e48c 100644 --- a/src/components/ChatBox/index.tsx +++ b/src/components/ChatBox/index.tsx @@ -419,7 +419,7 @@ export default function ChatBox(): JSX.Element { setIsPauseResumeLoading(false); }; - // Stop task handler (真正停歒workforce) + // Stop task handler - triggers Action.skip_task which preserves context const handleSkip = async () => { const taskId = chatStore.activeTaskId as string; console.log("=" .repeat(80)); @@ -429,16 +429,22 @@ export default function ChatBox(): JSX.Element { setIsPauseResumeLoading(true); try { - // Call DELETE /chat/{id} to trigger Action.stop and stop_gracefully - console.log(`[STOP-BUTTON] Sending DELETE request to /chat/${projectStore.activeProjectId}`); - await fetchDelete(`/chat/${projectStore.activeProjectId}`); - console.log("[STOP-BUTTON] βœ… Backend stop request successful"); + // Call skip-task endpoint to trigger Action.skip_task + // This will stop the task gracefully while preserving context for multi-turn + console.log(`[STOP-BUTTON] Sending POST request to /chat/${projectStore.activeProjectId}/skip-task`); + await fetchPost(`/chat/${projectStore.activeProjectId}/skip-task`, { + project_id: projectStore.activeProjectId + }); + console.log("[STOP-BUTTON] βœ… Backend skip-task request successful"); - // Only stop local task if backend call succeeds - console.log("[STOP-BUTTON] Calling chatStore.stopTask locally"); - chatStore.stopTask(taskId); + // DO NOT call chatStore.stopTask here! + // Keep SSE connection alive to receive "end" event from backend + // The "end" event will set status to 'finished' and allow multi-turn conversation + console.log("[STOP-BUTTON] ⚠️ SSE connection kept alive, waiting for backend 'end' event"); + + // Only set isPending to false so UI shows task is stopped chatStore.setIsPending(taskId, false); - console.log("[STOP-BUTTON] βœ… Local task stopped successfully"); + console.log("[STOP-BUTTON] βœ… Task marked as not pending, SSE connection remains open"); toast.success("Task stopped successfully", { closeButton: true, @@ -446,13 +452,12 @@ export default function ChatBox(): JSX.Element { } catch (error) { console.error("[STOP-BUTTON] ❌ Failed to stop task:", error); - // If backend call failed, still try to stop local task as fallback - // but with different messaging to user - console.log("[STOP-BUTTON] Attempting to stop task locally as fallback"); + // If backend call failed, close SSE connection as fallback + console.log("[STOP-BUTTON] Backend call failed, closing SSE connection as fallback"); try { chatStore.stopTask(taskId); chatStore.setIsPending(taskId, false); - console.log("[STOP-BUTTON] ⚠️ Local task stopped, but backend may still be running"); + console.log("[STOP-BUTTON] ⚠️ SSE connection closed due to backend failure"); toast.warning("Task stopped locally, but backend notification failed. Backend task may continue running.", { closeButton: true, duration: 5000,