# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
import re
from enum import Enum
from types import GeneratorType
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Literal,
Optional,
Union,
)
from PIL import Image
from pydantic import BaseModel, ConfigDict, Field
if TYPE_CHECKING:
from camel.agents import ChatAgent
from camel.responses import ChatAgentResponse
import uuid
from camel.logger import get_logger
from camel.messages import BaseMessage
from camel.prompts import TextPrompt
# Note: validate_task_content moved here to avoid circular imports
from .task_prompt import (
TASK_COMPOSE_PROMPT,
TASK_DECOMPOSE_PROMPT,
TASK_EVOLVE_PROMPT,
)
logger = get_logger(__name__)
class TaskValidationMode(Enum):
r"""Validation modes for different use cases."""
INPUT = "input" # For validating task content before processing
OUTPUT = "output" # For validating task results after completion
def validate_task_content(
content: str,
task_id: str = "unknown",
min_length: int = 1,
mode: TaskValidationMode = TaskValidationMode.INPUT,
check_failure_patterns: bool = True,
) -> bool:
r"""Unified validation for task content and results to avoid silent
failures. Performs comprehensive checks to ensure content meets quality
standards.
Args:
content (str): The task content or result to validate.
task_id (str): Task ID for logging purposes.
(default: :obj:`"unknown"`)
min_length (int): Minimum content length after stripping whitespace.
(default: :obj:`1`)
mode (TaskValidationMode): Validation mode - INPUT for task content,
OUTPUT for task results. (default: :obj:`TaskValidationMode.INPUT`)
check_failure_patterns (bool): Whether to check for failure indicators
in the content. Only effective in OUTPUT mode.
(default: :obj:`True`)
Returns:
bool: True if content passes validation, False otherwise.
"""
# 1: Content must not be None
if content is None:
logger.warning(f"Task {task_id}: None content rejected")
return False
# 2: Content must not be empty after stripping whitespace
stripped_content = content.strip()
if not stripped_content:
logger.warning(
f"Task {task_id}: Empty or whitespace-only content rejected."
)
return False
# 3: Content must meet minimum meaningful length
if len(stripped_content) < min_length:
logger.warning(
f"Task {task_id}: Content too short ({len(stripped_content)} "
f"chars < {min_length} minimum). Content preview: "
f"'{stripped_content}'"
)
return False
# 4: For OUTPUT mode, check for failure patterns if enabled
if mode == TaskValidationMode.OUTPUT and check_failure_patterns:
content_lower = stripped_content.lower()
# Check for explicit failure indicators
failure_indicators = [
"i cannot complete",
"i cannot do",
"task failed",
"unable to complete",
"cannot be completed",
"failed to complete",
"i cannot",
"not possible",
"impossible to",
"cannot perform",
]
if any(indicator in content_lower for indicator in failure_indicators):
logger.warning(
f"Task {task_id}: Failure indicator detected in result. "
f"Content preview: '{stripped_content}'"
)
return False
# Check for responses that are just error messages or refusals
if content_lower.startswith(("error", "failed", "cannot", "unable")):
logger.warning(
f"Task {task_id}: Error/refusal pattern detected at start. "
f"Content preview: '{stripped_content}'"
)
return False
# All validation checks passed
logger.debug(
f"Task {task_id}: {mode.value} validation passed "
f"({len(stripped_content)} chars)"
)
return True
def is_task_result_insufficient(task: "Task") -> bool:
r"""Check if a task result is insufficient and should be treated as failed.
This is a convenience wrapper around validate_task_content for backward
compatibility and semantic clarity when checking task results.
Args:
task (Task): The task to check.
Returns:
bool: True if the result is insufficient, False otherwise.
"""
if not hasattr(task, 'result') or task.result is None:
return True
return not validate_task_content(
content=task.result,
task_id=task.id,
mode=TaskValidationMode.OUTPUT,
check_failure_patterns=True,
)
def parse_response(
response: str, task_id: Optional[str] = None
) -> List["Task"]:
r"""Parse Tasks from a response.
Args:
response (str): The model response.
task_id (str, optional): a parent task id,
the default value is "0"
Returns:
List[Task]: A list of tasks which is :obj:`Task` instance.
"""
pattern = "(.*?)"
tasks_content = re.findall(pattern, response, re.DOTALL)
tasks = []
if task_id is None:
task_id = "0"
for i, content in enumerate(tasks_content, 1):
stripped_content = content.strip()
# validate subtask content before creating the task
if validate_task_content(stripped_content, f"{task_id}.{i}"):
tasks.append(Task(content=stripped_content, id=f"{task_id}.{i}"))
else:
logger.warning(
f"Skipping invalid subtask {task_id}.{i} "
f"during decomposition: "
f"Content '{stripped_content}' failed validation"
)
return tasks
class TaskState(str, Enum):
OPEN = "OPEN"
RUNNING = "RUNNING"
DONE = "DONE"
FAILED = "FAILED"
DELETED = "DELETED"
@classmethod
def states(cls):
return [s.value for s in cls]
class Task(BaseModel):
r"""Task is specific assignment that can be passed to a agent.
Attributes:
content (str): string content for task.
id (str): An unique string identifier for the task. This should
ideally be provided by the provider/model which created the task.
(default: :obj:`uuid.uuid4()`)
state (TaskState): The state which should be OPEN, RUNNING, DONE or
DELETED. (default: :obj:`TaskState.FAILED`)
type (Optional[str]): task type. (default: :obj:`None`)
parent (Optional[Task]): The parent task, None for root task.
(default: :obj:`None`)
subtasks (List[Task]): The childrent sub-tasks for the task.
(default: :obj:`[]`)
result (Optional[str]): The answer for the task.
(default: :obj:`""`)
failure_count (int): The failure count for the task.
(default: :obj:`0`)
assigned_worker_id (Optional[str]): The ID of the worker assigned to
this task. (default: :obj:`None`)
dependencies (List[Task]): The dependencies for the task.
(default: :obj:`[]`)
additional_info (Optional[Dict[str, Any]]): Additional information for
the task. (default: :obj:`None`)
image_list (Optional[List[Union[Image.Image, str]]]): Optional list
of PIL Image objects or image URLs (strings) associated with the
task. (default: :obj:`None`)
image_detail (Literal["auto", "low", "high"]): Detail level of the
images associated with the task. (default: :obj:`auto`)
video_bytes (Optional[bytes]): Optional bytes of a video associated
with the task. (default: :obj:`None`)
video_detail (Literal["auto", "low", "high"]): Detail level of the
videos associated with the task. (default: :obj:`auto`)
"""
content: str
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
state: TaskState = (
TaskState.FAILED
) # TODO: Add logic for OPEN in workforce.py
type: Optional[str] = None
parent: Optional["Task"] = None
subtasks: List["Task"] = []
result: Optional[str] = ""
failure_count: int = 0
assigned_worker_id: Optional[str] = None
dependencies: List["Task"] = []
additional_info: Optional[Dict[str, Any]] = None
image_list: Optional[List[Union[Image.Image, str]]] = None
image_detail: Literal["auto", "low", "high"] = "auto"
video_bytes: Optional[bytes] = None
video_detail: Literal["auto", "low", "high"] = "auto"
model_config = ConfigDict(arbitrary_types_allowed=True)
def __repr__(self) -> str:
r"""Return a string representation of the task."""
content_preview = self.content
return (
f"Task(id='{self.id}', content='{content_preview}', "
f"state='{self.state.value}')"
)
@classmethod
def from_message(cls, message: BaseMessage) -> "Task":
r"""Create a task from a message.
Args:
message (BaseMessage): The message to the task.
Returns:
Task
"""
return cls(content=message.content, id="0")
@staticmethod
def to_message():
r"""Convert a Task to a Message."""
# TODO
pass
def reset(self):
r"""Reset Task to initial state."""
self.state = (
TaskState.FAILED
) # TODO: Add logic for OPEN in workforce.py
self.result = ""
def update_result(self, result: str):
r"""Set task result and mark the task as DONE.
Args:
result (str): The task result.
"""
self.result = result
self.set_state(TaskState.DONE)
def set_id(self, id: str):
r"""Set the id of the task.
Args:
id (str): The id of the task.
"""
self.id = id
def set_state(self, state: TaskState):
r"""Recursively set the state of the task and its subtasks.
Args:
state (TaskState): The giving state.
"""
self.state = state
if state == TaskState.DONE:
for subtask in self.subtasks:
if subtask.state != TaskState.DELETED:
subtask.set_state(state)
elif state == TaskState.RUNNING and self.parent:
self.parent.set_state(state)
def add_subtask(self, task: "Task"):
r"""Add a subtask to the current task.
Args:
task (Task): The subtask to be added.
"""
task.parent = self
self.subtasks.append(task)
def remove_subtask(self, id: str):
r"""Remove a subtask from the current task.
Args:
id (str): The id of the subtask to be removed.
"""
self.subtasks = [task for task in self.subtasks if task.id != id]
def get_running_task(self) -> Optional["Task"]:
r"""Get RUNNING task."""
for sub in self.subtasks:
if sub.state == TaskState.RUNNING:
return sub.get_running_task()
if self.state == TaskState.RUNNING:
return self
return None
def to_string(self, indent: str = "", state: bool = False) -> str:
r"""Convert task to a string.
Args:
indent (str): The ident for hierarchical tasks.
state (bool): Include or not task state.
Returns:
str: The printable task string.
"""
if state:
_str = f"{indent}[{self.state}] Task {self.id}: {self.content}\n"
else:
_str = f"{indent}Task {self.id}: {self.content}\n"
for subtask in self.subtasks:
_str += subtask.to_string(indent + " ", state)
return _str
def get_result(self, indent: str = "") -> str:
r"""Get task result to a string.
Args:
indent (str): The ident for hierarchical tasks.
Returns:
str: The printable task string.
"""
_str = f"{indent}Task {self.id} result: {self.result}\n"
for subtask in self.subtasks:
_str += subtask.get_result(indent + " ")
return _str
def decompose(
self,
agent: "ChatAgent",
prompt: Optional[str] = None,
task_parser: Callable[[str, str], List["Task"]] = parse_response,
stream_callback: Optional[
Callable[["ChatAgentResponse"], None]
] = None,
) -> Union[List["Task"], Generator[List["Task"], None, None]]:
r"""Decompose a task to a list of sub-tasks. Automatically detects
streaming or non-streaming based on agent configuration.
Args:
agent (ChatAgent): An agent that used to decompose the task.
prompt (str, optional): A prompt to decompose the task. If not
provided, the default prompt will be used.
task_parser (Callable[[str, str], List[Task]], optional): A
function to extract Task from response. If not provided,
the default parse_response will be used.
stream_callback (Callable[[ChatAgentResponse], None], optional): A
callback function that receives each chunk (ChatAgentResponse)
during streaming. This allows tracking the decomposition
progress in real-time.
Returns:
Union[List[Task], Generator[List[Task], None, None]]: If agent is
configured for streaming, returns a generator that yields lists
of new tasks as they are parsed. Otherwise returns a list of
all tasks.
"""
role_name = agent.role_name
content = prompt or TASK_DECOMPOSE_PROMPT.format(
role_name=role_name,
content=self.content,
)
msg = BaseMessage.make_user_message(
role_name=role_name, content=content
)
response = agent.step(msg)
# Auto-detect streaming based on response type
from camel.agents.chat_agent import StreamingChatAgentResponse
is_streaming = isinstance(
response, StreamingChatAgentResponse
) or isinstance(response, GeneratorType)
if (
not is_streaming
and hasattr(response, "__iter__")
and not hasattr(response, "msg")
):
is_streaming = True
if is_streaming:
return self._decompose_streaming(
response, task_parser, stream_callback=stream_callback
)
return self._decompose_non_streaming(response, task_parser)
def _decompose_streaming(
self,
response: Iterable,
task_parser: Callable[[str, str], List["Task"]],
stream_callback: Optional[
Callable[["ChatAgentResponse"], None]
] = None,
) -> Generator[List["Task"], None, None]:
r"""Handle streaming response for task decomposition.
Args:
response: Streaming response from agent
task_parser: Function to parse tasks from response
stream_callback (Callable[[ChatAgentResponse], None], optional): A
callback function that receives each chunk (ChatAgentResponse)
during streaming.
Yields:
List[Task]: New tasks as they are parsed from streaming response
"""
accumulated_content = ""
yielded_count = 0
# Process streaming response
for chunk in response:
accumulated_content = chunk.msg.content
if stream_callback:
try:
stream_callback(chunk)
except Exception:
logger.warning(
"stream_callback failed during decomposition",
exc_info=True,
)
# Try to parse partial tasks from accumulated content
try:
current_tasks = self._parse_partial_tasks(accumulated_content)
# Yield new tasks if we have more than previously yielded
if len(current_tasks) > yielded_count:
new_tasks = current_tasks[yielded_count:]
for task in new_tasks:
task.additional_info = self.additional_info
task.parent = self
yield new_tasks
yielded_count = len(current_tasks)
except Exception:
# If parsing fails, continue accumulating
continue
# Final complete parsing
final_tasks = task_parser(accumulated_content, self.id)
for task in final_tasks:
task.additional_info = self.additional_info
task.parent = self
self.subtasks = final_tasks
def _decompose_non_streaming(
self, response, task_parser: Callable[[str, str], List["Task"]]
) -> List["Task"]:
r"""Handle non-streaming response for task decomposition.
Args:
response: Regular response from agent
task_parser: Function to parse tasks from response
Returns:
List[Task]: All parsed tasks
"""
tasks = task_parser(response.msg.content, self.id)
for task in tasks:
task.additional_info = self.additional_info
task.parent = self
self.subtasks = tasks
return tasks
def _parse_partial_tasks(self, response: str) -> List["Task"]:
r"""Parse tasks from potentially incomplete response.
Args:
response: Partial response content
Returns:
List[Task]: Tasks parsed from complete blocks
"""
pattern = r"(.*?)"
tasks_content = re.findall(pattern, response, re.DOTALL)
tasks = []
task_id = self.id or "0"
for i, content in enumerate(tasks_content, 1):
stripped_content = content.strip()
if validate_task_content(stripped_content, f"{task_id}.{i}"):
tasks.append(
Task(content=stripped_content, id=f"{task_id}.{i}")
)
else:
logger.warning(
f"Skipping invalid subtask {task_id}.{i} "
f"during streaming decomposition: "
f"Content '{stripped_content}' failed validation"
)
return tasks
def compose(
self,
agent: "ChatAgent",
template: TextPrompt = TASK_COMPOSE_PROMPT,
result_parser: Optional[Callable[[str], str]] = None,
):
r"""compose task result by the sub-tasks.
Args:
agent (ChatAgent): An agent that used to compose the task result.
template (TextPrompt, optional): The prompt template to compose
task. If not provided, the default template will be used.
result_parser (Callable[[str, str], List[Task]], optional): A
function to extract Task from response.
"""
if not self.subtasks:
return
sub_tasks_result = self.get_result()
role_name = agent.role_name
content = template.format(
role_name=role_name,
content=self.content,
additional_info=self.additional_info,
image_list=self.image_list,
image_detail=self.image_detail,
video_bytes=self.video_bytes,
video_detail=self.video_detail,
other_results=sub_tasks_result,
)
msg = BaseMessage.make_user_message(
role_name=role_name, content=content
)
response = agent.step(msg)
result = response.msg.content
if result_parser:
result = result_parser(result)
self.update_result(result)
def get_depth(self) -> int:
r"""Get current task depth."""
if self.parent is None:
return 1
return 1 + self.parent.get_depth()
class TaskManager:
r"""TaskManager is used to manage tasks.
Attributes:
root_task: The root task.
tasks: The ordered tasks.
task_map: A map for task.id to Task.
current_task_id: The current "RUNNING" task.id.
Args:
task (Task): The root Task.
"""
def __init__(self, task: Task):
self.root_task: Task = task
self.current_task_id: str = task.id
self.tasks: List[Task] = [task]
self.task_map: Dict[str, Task] = {task.id: task}
def gen_task_id(self) -> str:
r"""Generate a new task id."""
return f"{len(self.tasks)}"
def exist(self, task_id: str) -> bool:
r"""Check if a task with the given id exists."""
return task_id in self.task_map
@property
def current_task(self) -> Optional[Task]:
r"""Get the current task."""
return self.task_map.get(self.current_task_id, None)
@staticmethod
def topological_sort(tasks: List[Task]) -> List[Task]:
r"""Sort a list of tasks by topological way.
Args:
tasks (List[Task]): The giving list of tasks.
Returns:
The sorted list of tasks.
"""
stack = []
visited = set()
# recursive visit the vertices
def visit(task: Task):
if task.id in visited:
return
visited.add(task.id)
# go deep for dependencies
for sub_task in task.subtasks:
visit(sub_task)
# add current task to stack which have no dependencies.
stack.append(task)
for task in tasks:
visit(task)
return stack
@staticmethod
def set_tasks_dependence(
root: Task,
others: List[Task],
type: Literal["serial", "parallel"] = "parallel",
):
r"""Set relationship between root task and other tasks.
Two relationships are currently supported: serial and parallel.
`serial` : root -> other1 -> other2
`parallel`: root -> other1
-> other2
Args:
root (Task): A root task.
others (List[Task]): A list of tasks.
"""
# filter the root task in the others to avoid self-loop dependence.
others = [other for other in others if other != root]
if len(others) == 0:
return
if type == "parallel":
for other in others:
root.add_subtask(other)
else:
parent = root
for child in others:
parent.add_subtask(child)
parent = child
def add_tasks(self, tasks: Union[Task, List[Task]]) -> None:
r"""self.tasks and self.task_map will be updated by the input tasks."""
if not tasks:
return
if not isinstance(tasks, List):
tasks = [tasks]
for task in tasks:
assert not self.exist(task.id), f"`{task.id}` already existed."
self.tasks = self.topological_sort(self.tasks + tasks)
self.task_map = {task.id: task for task in self.tasks}
def evolve(
self,
task: Task,
agent: "ChatAgent",
template: Optional[TextPrompt] = None,
task_parser: Optional[Callable[[str, str], List[Task]]] = None,
) -> Optional[Task]:
r"""Evolve a task to a new task.
Evolve is only used for data generation.
Args:
task (Task): A given task.
agent (ChatAgent): An agent that used to evolve the task.
template (TextPrompt, optional): A prompt template to evolve task.
If not provided, the default template will be used.
task_parser (Callable, optional): A function to extract Task from
response. If not provided, the default parser will be used.
Returns:
Task: The created :obj:`Task` instance or None.
"""
if template is None:
template = TASK_EVOLVE_PROMPT
role_name = agent.role_name
content = template.format(role_name=role_name, content=task.content)
msg = BaseMessage.make_user_message(
role_name=role_name,
content=content,
image_list=task.image_list,
image_detail=task.image_detail,
video_bytes=task.video_bytes,
video_detail=task.video_detail,
)
response = agent.step(msg)
if task_parser is None:
task_parser = parse_response
tasks = task_parser(response.msg.content, task.id)
if tasks:
return tasks[0]
return None