# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import asyncio import datetime import logging from collections.abc import Awaitable, Callable from camel.agents.chat_agent import ( AsyncStreamingChatAgentResponse, ChatAgentResponse, ) from camel.societies.workforce.prompts import PROCESS_TASK_PROMPT from camel.societies.workforce.single_agent_worker import ( SingleAgentWorker as BaseSingleAgentWorker, ) from camel.societies.workforce.utils import TaskResult from camel.tasks.task import Task, TaskState, is_task_result_insufficient from camel.utils.context_utils import ContextUtility from colorama import Fore from app.agent.listen_chat_agent import ListenChatAgent logger = logging.getLogger("single_agent_worker") class SingleAgentWorker(BaseSingleAgentWorker): def __init__( self, description: str, worker: ListenChatAgent, use_agent_pool: bool = True, pool_initial_size: int = 0, # Changed from 1 to 0 to avoid pre-creating clones that waste CDP resources pool_max_size: int = 10, auto_scale_pool: bool = True, use_structured_output_handler: bool = True, context_utility: ContextUtility | None = None, enable_workflow_memory: bool = False, ) -> None: logger.info( "Initializing SingleAgentWorker", extra={ "description": description, "worker_agent_name": worker.agent_name, "use_agent_pool": use_agent_pool, "pool_max_size": pool_max_size, "enable_workflow_memory": enable_workflow_memory, }, ) super().__init__( description=description, worker=worker, use_agent_pool=use_agent_pool, pool_initial_size=pool_initial_size, pool_max_size=pool_max_size, auto_scale_pool=auto_scale_pool, use_structured_output_handler=use_structured_output_handler, context_utility=context_utility, enable_workflow_memory=enable_workflow_memory, ) self.worker = worker # change type hint async def _process_task( self, task: Task, dependencies: list[Task], stream_callback: Callable[ ["ChatAgentResponse"], Awaitable[None] | None ] | None = None, ) -> TaskState: r"""Processes a task with its dependencies using an efficient agent management system. This method asynchronously processes a given task, considering its dependencies, by sending a generated prompt to a worker agent. Uses an agent pool for efficiency when enabled, or falls back to cloning when pool is disabled. Args: task (Task): The task to process, which includes necessary details like content and type. dependencies (List[Task]): Tasks that the given task depends on. Returns: TaskState: `TaskState.DONE` if processed successfully, otherwise `TaskState.FAILED`. """ # Log task details before getting agent (for clone tracking) task_content_preview = ( task.content[:100] + "..." if len(task.content) > 100 else task.content ) logger.debug( f"[TASK REQUEST] Requesting agent for task_id={task.id}, content_preview='{task_content_preview}'" ) # Get agent efficiently (from pool or by cloning) worker_agent = await self._get_worker_agent() worker_agent.process_task_id = task.id # type: ignore rewrite line logger.info( "Starting task processing", extra={ "task_id": task.id, "worker_agent_id": worker_agent.agent_id, "dependencies_count": len(dependencies), }, ) response_content = "" final_response = None try: dependency_tasks_info = self._get_dep_tasks_info(dependencies) prompt = PROCESS_TASK_PROMPT.format( content=task.content, parent_task_content=task.parent.content if task.parent else "", dependency_tasks_info=dependency_tasks_info, additional_info=task.additional_info, ) if self.use_structured_output_handler and self.structured_handler: # Use structured output handler for prompt-based extraction enhanced_prompt = self.structured_handler.generate_structured_prompt( base_prompt=prompt, schema=TaskResult, examples=[ { "content": "I have successfully completed the task...", "failed": False, } ], additional_instructions="Ensure you provide a clear " "description of what was done and whether the task " "succeeded or failed.", ) response = await worker_agent.astep(enhanced_prompt) # Handle streaming response if isinstance(response, AsyncStreamingChatAgentResponse): # With stream_accumulate=False, we need to accumulate delta content accumulated_content = "" last_chunk = None chunk_count = 0 async for chunk in response: chunk_count += 1 last_chunk = chunk if stream_callback: maybe = stream_callback(chunk) if asyncio.iscoroutine(maybe): await maybe if chunk.msg and chunk.msg.content: accumulated_content += chunk.msg.content logger.info( f"Streaming complete: {chunk_count} chunks, content_length={len(accumulated_content)}" ) response_content = accumulated_content # Store usage info from last chunk for later use response._last_chunk_info = ( last_chunk.info if last_chunk else {} ) else: # Regular ChatAgentResponse response_content = ( response.msg.content if response.msg else "" ) task_result = ( self.structured_handler.parse_structured_response( response_text=response_content, schema=TaskResult, fallback_values={ "content": "Task processing failed", "failed": True, }, ) ) else: # Use native structured output if supported response = await worker_agent.astep( prompt, response_format=TaskResult ) # Handle streaming response for native output (shouldn't happen now but keep for safety) if isinstance(response, AsyncStreamingChatAgentResponse): task_result = None # With stream_accumulate=False, we need to accumulate delta content accumulated_content = "" last_chunk = None async for chunk in response: last_chunk = chunk if stream_callback: maybe = stream_callback(chunk) if asyncio.iscoroutine(maybe): await maybe if chunk.msg: if chunk.msg.content: accumulated_content += chunk.msg.content if chunk.msg.parsed: task_result = chunk.msg.parsed response_content = accumulated_content # Store usage info from last chunk for later use response._last_chunk_info = ( last_chunk.info if last_chunk else {} ) # If no parsed result found in streaming, create fallback if task_result is None: task_result = TaskResult( content="Failed to parse streaming response", failed=True, ) else: # Regular ChatAgentResponse task_result = response.msg.parsed response_content = ( response.msg.content if response.msg else "" ) # Get token usage from the response if isinstance(response, AsyncStreamingChatAgentResponse): # For streaming responses, get info from last chunk captured during iteration chunk_info = getattr(response, "_last_chunk_info", {}) usage_info = chunk_info.get("usage") or chunk_info.get( "token_usage" ) else: usage_info = response.info.get("usage") or response.info.get( "token_usage" ) total_tokens = ( usage_info.get("total_tokens", 0) if usage_info else 0 ) # collect conversation from working agent to # accumulator for workflow memory # Only transfer memory if workflow memory is enabled if self.enable_workflow_memory: accumulator = self._get_conversation_accumulator() # transfer all memory records from working agent to accumulator try: # retrieve all context records from the working agent work_records = worker_agent.memory.retrieve() # write these records to the accumulator's memory memory_records = [ record.memory_record for record in work_records ] accumulator.memory.write_records(memory_records) logger.debug( f"Transferred {len(memory_records)} memory records to accumulator" ) except Exception as e: logger.warning( f"Failed to transfer conversation to accumulator: {e}" ) except Exception as e: logger.error( f"Error processing task {task.id}: {type(e).__name__}: {e}" ) # Store error information in task result task.result = f"{type(e).__name__}: {e!s}" return TaskState.FAILED finally: # Return agent to pool or let it be garbage collected await self._return_worker_agent(worker_agent) # Populate additional_info with worker attempt details if task.additional_info is None: task.additional_info = {} # Create worker attempt details with descriptive keys # Use final_response if available (streaming), otherwise use response response_for_info = ( final_response if final_response is not None else response ) worker_attempt_details = { "agent_id": getattr( worker_agent, "agent_id", worker_agent.role_name ), "original_worker_id": getattr( self.worker, "agent_id", self.worker.role_name ), "timestamp": str(datetime.datetime.now()), "description": f"Attempt by " f"{getattr(worker_agent, 'agent_id', worker_agent.role_name)} " f"(from pool/clone of " f"{getattr(self.worker, 'agent_id', self.worker.role_name)}) " f"to process task: {task.content}", "response_content": response_content[:50], "tool_calls": str( response_for_info.info.get("tool_calls", []) if response_for_info and hasattr(response_for_info, "info") else [] )[:50], "total_tokens": total_tokens, } # Store the worker attempt in additional_info if "worker_attempts" not in task.additional_info: task.additional_info["worker_attempts"] = [] task.additional_info["worker_attempts"].append(worker_attempt_details) # Store the actual token usage for this specific task task.additional_info["token_usage"] = {"total_tokens": total_tokens} print(f"======\n{Fore.GREEN}Response from {self}:{Fore.RESET}") logger.info(f"Response from {self}:") if not self.use_structured_output_handler: # Handle native structured output parsing if task_result is None: logger.error( "Error in worker step execution: Invalid task result" ) print( f"{Fore.RED}Error in worker step execution: Invalid task result{Fore.RESET}" ) task_result = TaskResult( content="Failed to generate valid task result.", failed=True, ) color = Fore.RED if task_result.failed else Fore.GREEN # type: ignore[union-attr] print( f"\n{color}{task_result.content}{Fore.RESET}\n======", # type: ignore[union-attr] ) if task_result.failed: # type: ignore[union-attr] logger.error(f"{task_result.content}") # type: ignore[union-attr] else: logger.info(f"{task_result.content}") # type: ignore[union-attr] task.result = task_result.content # type: ignore[union-attr] if task_result.failed: # type: ignore[union-attr] return TaskState.FAILED if is_task_result_insufficient(task): logger.warning( f"Task {task.id}: Content validation failed - task marked as failed" ) return TaskState.FAILED return TaskState.DONE