mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-24 13:43:45 +00:00
380 lines
14 KiB
Python
380 lines
14 KiB
Python
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
|
|
|
|
from typing import Any, Dict, List, Optional, Type, Union
|
|
|
|
from fastapi import APIRouter, FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from camel.agents.chat_agent import ChatAgent
|
|
from camel.messages import BaseMessage
|
|
from camel.models import ModelFactory
|
|
from camel.toolkits import FunctionTool
|
|
from camel.types import RoleType
|
|
|
|
|
|
class InitRequest(BaseModel):
|
|
r"""Request schema for initializing a ChatAgent via the OpenAPI server.
|
|
|
|
Defines the configuration used to create a new agent, including the model,
|
|
system message, tool names, and generation parameters.
|
|
|
|
Args:
|
|
model_type (Optional[str]): The model type to use. Should match a key
|
|
supported by the model manager, e.g., "gpt-4o-mini".
|
|
(default: :obj:`"gpt-4o-mini"`)
|
|
model_platform (Optional[str]): The model platform to use.
|
|
(default: :obj:`"openai"`)
|
|
tools_names (Optional[List[str]]): A list of tool names to load from
|
|
the tool registry. These tools will be available to the agent.
|
|
(default: :obj:`None`)
|
|
external_tools (Optional[List[Dict[str, Any]]]): Tool definitions
|
|
provided directly as dictionaries, bypassing the registry.
|
|
Currently not supported. (default: :obj:`None`)
|
|
agent_id (str): The unique identifier for the agent. Must be provided
|
|
explicitly to support multi-agent routing and control.
|
|
system_message (Optional[str]): The system prompt for the agent,
|
|
describing its behavior or role. (default: :obj:`None`)
|
|
message_window_size (Optional[int]): The number of recent messages to
|
|
retain in memory for context. (default: :obj:`None`)
|
|
token_limit (Optional[int]): The token budget for contextual memory.
|
|
(default: :obj:`None`)
|
|
output_language (Optional[str]): Preferred output language for the
|
|
agent's replies. (default: :obj:`None`)
|
|
max_iteration (Optional[int]): Maximum number of model
|
|
calling iterations allowed per step. If `None` (default), there's
|
|
no explicit limit. If `1`, it performs a single model call. If `N
|
|
> 1`, it allows up to N model calls. (default: :obj:`None`)
|
|
"""
|
|
|
|
model_type: Optional[str] = "gpt-4o-mini"
|
|
model_platform: Optional[str] = "openai"
|
|
|
|
tools_names: Optional[List[str]] = None
|
|
external_tools: Optional[List[Dict[str, Any]]] = None
|
|
|
|
agent_id: str # Required: explicitly set agent_id to
|
|
# support future multi-agent and permission control
|
|
|
|
system_message: Optional[str] = None
|
|
message_window_size: Optional[int] = None
|
|
token_limit: Optional[int] = None
|
|
output_language: Optional[str] = None
|
|
max_iteration: Optional[int] = None # Changed from Optional[bool] = False
|
|
|
|
|
|
class StepRequest(BaseModel):
|
|
r"""Request schema for sending a user message to a ChatAgent.
|
|
|
|
Supports plain text input or structured message dictionaries, with an
|
|
optional response format for controlling output structure.
|
|
|
|
Args:
|
|
input_message (Union[str, Dict[str, Any]]): The user message to send.
|
|
Can be a plain string or a message dict with role, content, etc.
|
|
response_format (Optional[str]): Optional format name that maps to a
|
|
registered response schema. Not currently in use.
|
|
(default: :obj:`None`)
|
|
"""
|
|
|
|
input_message: Union[str, Dict[str, Any]]
|
|
response_format: Optional[str] = None # reserved, not used yet
|
|
|
|
|
|
class ChatAgentOpenAPIServer:
|
|
r"""A FastAPI server wrapper for managing ChatAgents via OpenAPI routes.
|
|
|
|
This server exposes a versioned REST API for interacting with CAMEL
|
|
agents, supporting initialization, message passing, memory inspection,
|
|
and optional tool usage. It supports multi-agent use cases by mapping
|
|
unique agent IDs to active ChatAgent instances.
|
|
|
|
Typical usage includes initializing agents with system prompts and tools,
|
|
exchanging messages using /step or /astep endpoints, and inspecting agent
|
|
memory with /history.
|
|
|
|
Supports pluggable tool and response format registries for customizing
|
|
agent behavior or output schemas.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
tool_registry: Optional[Dict[str, List[FunctionTool]]] = None,
|
|
response_format_registry: Optional[Dict[str, Type[BaseModel]]] = None,
|
|
):
|
|
r"""Initializes the OpenAPI server for managing ChatAgents.
|
|
|
|
Sets up internal agent storage, tool and response format registries,
|
|
and prepares versioned API routes.
|
|
|
|
Args:
|
|
tool_registry (Optional[Dict[str, List[FunctionTool]]]): A mapping
|
|
from tool names to lists of FunctionTool instances available
|
|
to agents via the "tools_names" field. If not provided, an
|
|
empty registry is used. (default: :obj:`None`)
|
|
response_format_registry (Optional[Dict[str, Type[BaseModel]]]):
|
|
A mapping from format names to Pydantic output schemas for
|
|
structured response parsing. Used for controlling the format
|
|
of step results. (default: :obj:`None`)
|
|
"""
|
|
|
|
# Initialize FastAPI app and agent
|
|
self.app = FastAPI(title="CAMEL OpenAPI-compatible Server")
|
|
self.agents: Dict[str, ChatAgent] = {}
|
|
self.tool_registry = tool_registry or {}
|
|
self.response_format_registry = response_format_registry or {}
|
|
self._setup_routes()
|
|
|
|
def _parse_input_message_for_step(
|
|
self, raw: Union[str, dict]
|
|
) -> BaseMessage:
|
|
r"""Parses raw input into a BaseMessage object.
|
|
|
|
Args:
|
|
raw (str or dict): User input as plain text or dict.
|
|
|
|
Returns:
|
|
BaseMessage: Parsed input message.
|
|
"""
|
|
if isinstance(raw, str):
|
|
return BaseMessage.make_user_message(role_name="User", content=raw)
|
|
elif isinstance(raw, dict):
|
|
if isinstance(raw.get("role_type"), str):
|
|
raw["role_type"] = RoleType(raw["role_type"].lower())
|
|
return BaseMessage(**raw)
|
|
raise HTTPException(
|
|
status_code=400, detail="Unsupported input format."
|
|
)
|
|
|
|
def _resolve_response_format_for_step(
|
|
self, name: Optional[str]
|
|
) -> Optional[Type[BaseModel]]:
|
|
r"""Resolves the response format by name.
|
|
|
|
Args:
|
|
name (str or None): Optional format name.
|
|
|
|
Returns:
|
|
Optional[Type[BaseModel]]: Response schema class.
|
|
"""
|
|
if name is None:
|
|
return None
|
|
if name not in self.response_format_registry:
|
|
raise HTTPException(
|
|
status_code=400, detail=f"Unknown response_format: {name}"
|
|
)
|
|
return self.response_format_registry[name]
|
|
|
|
def _setup_routes(self):
|
|
r"""Registers OpenAPI endpoints for agent creation and interaction.
|
|
|
|
This includes routes for initializing agents (/init), sending
|
|
messages (/step and /astep), resetting agent memory (/reset), and
|
|
retrieving conversation history (/history). All routes are added
|
|
under the /v1/agents namespace.
|
|
"""
|
|
|
|
router = APIRouter(prefix="/v1/agents")
|
|
|
|
@router.post("/init")
|
|
def init_agent(request: InitRequest):
|
|
r"""Initializes a ChatAgent instance with a model,
|
|
system message, and optional tools.
|
|
|
|
Args:
|
|
request (InitRequest): The agent config including
|
|
model, tools, system message, and agent ID.
|
|
|
|
Returns:
|
|
dict: A message with the agent ID and status.
|
|
"""
|
|
|
|
agent_id = request.agent_id
|
|
if agent_id in self.agents:
|
|
return {
|
|
"agent_id": agent_id,
|
|
"message": "Agent already exists.",
|
|
}
|
|
|
|
model_type = request.model_type
|
|
model_platform = request.model_platform
|
|
|
|
model = ModelFactory.create(
|
|
model_platform=model_platform, # type: ignore[arg-type]
|
|
model_type=model_type, # type: ignore[arg-type]
|
|
)
|
|
|
|
# tools lookup
|
|
tools = []
|
|
if request.tools_names:
|
|
for name in request.tools_names:
|
|
if name in self.tool_registry:
|
|
tools.extend(self.tool_registry[name])
|
|
else:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Tool '{name}' " f"not found in registry",
|
|
)
|
|
|
|
# system message
|
|
system_message = request.system_message
|
|
|
|
agent = ChatAgent(
|
|
model=model,
|
|
tools=tools, # type: ignore[arg-type]
|
|
external_tools=request.external_tools, # type: ignore[arg-type]
|
|
system_message=system_message,
|
|
message_window_size=request.message_window_size,
|
|
token_limit=request.token_limit,
|
|
output_language=request.output_language,
|
|
max_iteration=request.max_iteration,
|
|
agent_id=agent_id,
|
|
)
|
|
|
|
self.agents[agent_id] = agent
|
|
return {"agent_id": agent_id, "message": "Agent initialized."}
|
|
|
|
@router.post("/astep/{agent_id}")
|
|
async def astep_agent(agent_id: str, request: StepRequest):
|
|
r"""Runs one async step of agent response.
|
|
|
|
Args:
|
|
agent_id (str): The ID of the target agent.
|
|
request (StepRequest): The input message.
|
|
|
|
Returns:
|
|
dict: The model response in serialized form.
|
|
"""
|
|
|
|
if agent_id not in self.agents:
|
|
raise HTTPException(status_code=404, detail="Agent not found.")
|
|
|
|
agent = self.agents[agent_id]
|
|
input_message = self._parse_input_message_for_step(
|
|
request.input_message
|
|
)
|
|
format_cls = self._resolve_response_format_for_step(
|
|
request.response_format
|
|
)
|
|
|
|
try:
|
|
response = await agent.astep(
|
|
input_message=input_message, response_format=format_cls
|
|
)
|
|
return response.model_dump()
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Unexpected error during async step: {e!s}",
|
|
)
|
|
|
|
@router.get("/list_agent_ids")
|
|
def list_agent_ids():
|
|
r"""Returns a list of all active agent IDs.
|
|
|
|
Returns:
|
|
dict: A dictionary containing all registered agent IDs.
|
|
"""
|
|
return {"agent_ids": list(self.agents.keys())}
|
|
|
|
@router.post("/delete/{agent_id}")
|
|
def delete_agent(agent_id: str):
|
|
r"""Deletes an agent from the server.
|
|
|
|
Args:
|
|
agent_id (str): The ID of the agent to delete.
|
|
|
|
Returns:
|
|
dict: A confirmation message upon successful deletion.
|
|
"""
|
|
if agent_id not in self.agents:
|
|
raise HTTPException(status_code=404, detail="Agent not found.")
|
|
|
|
del self.agents[agent_id]
|
|
return {"message": f"Agent {agent_id} deleted."}
|
|
|
|
@router.post("/step/{agent_id}")
|
|
def step_agent(agent_id: str, request: StepRequest):
|
|
r"""Runs one step of synchronous agent response.
|
|
|
|
Args:
|
|
agent_id (str): The ID of the target agent.
|
|
request (StepRequest): The input message.
|
|
|
|
Returns:
|
|
dict: The model response in serialized form.
|
|
"""
|
|
if agent_id not in self.agents:
|
|
raise HTTPException(status_code=404, detail="Agent not found.")
|
|
|
|
agent = self.agents[agent_id]
|
|
input_message = self._parse_input_message_for_step(
|
|
request.input_message
|
|
)
|
|
format_cls = self._resolve_response_format_for_step(
|
|
request.response_format
|
|
)
|
|
try:
|
|
response = agent.step(
|
|
input_message=input_message, response_format=format_cls
|
|
)
|
|
return response.model_dump()
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Unexpected error during step: {e!s}",
|
|
)
|
|
|
|
@router.post("/reset/{agent_id}")
|
|
def reset_agent(agent_id: str):
|
|
r"""Clears memory for a specific agent.
|
|
|
|
Args:
|
|
agent_id (str): The ID of the agent to reset.
|
|
|
|
Returns:
|
|
dict: A message confirming reset success.
|
|
"""
|
|
if agent_id not in self.agents:
|
|
raise HTTPException(status_code=404, detail="Agent not found.")
|
|
self.agents[agent_id].reset()
|
|
return {"message": f"Agent {agent_id} reset."}
|
|
|
|
@router.get("/history/{agent_id}")
|
|
def get_agent_chat_history(agent_id: str):
|
|
r"""Returns the chat history of an agent.
|
|
|
|
Args:
|
|
agent_id (str): The ID of the agent to query.
|
|
|
|
Returns:
|
|
list: The list of conversation messages.
|
|
"""
|
|
if agent_id not in self.agents:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Agent {agent_id} not found."
|
|
)
|
|
return self.agents[agent_id].chat_history
|
|
|
|
# Register all routes to the main FastAPI app
|
|
self.app.include_router(router)
|
|
|
|
def get_app(self) -> FastAPI:
|
|
r"""Returns the FastAPI app instance.
|
|
|
|
Returns:
|
|
FastAPI: The wrapped application object.
|
|
"""
|
|
return self.app
|