eigent/backend/camel/services/agent_openapi_server.py
2026-03-31 17:20:08 +08:00

380 lines
14 KiB
Python

# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
from typing import Any, Dict, List, Optional, Type, Union
from fastapi import APIRouter, FastAPI, HTTPException
from pydantic import BaseModel
from camel.agents.chat_agent import ChatAgent
from camel.messages import BaseMessage
from camel.models import ModelFactory
from camel.toolkits import FunctionTool
from camel.types import RoleType
class InitRequest(BaseModel):
r"""Request schema for initializing a ChatAgent via the OpenAPI server.
Defines the configuration used to create a new agent, including the model,
system message, tool names, and generation parameters.
Args:
model_type (Optional[str]): The model type to use. Should match a key
supported by the model manager, e.g., "gpt-4o-mini".
(default: :obj:`"gpt-4o-mini"`)
model_platform (Optional[str]): The model platform to use.
(default: :obj:`"openai"`)
tools_names (Optional[List[str]]): A list of tool names to load from
the tool registry. These tools will be available to the agent.
(default: :obj:`None`)
external_tools (Optional[List[Dict[str, Any]]]): Tool definitions
provided directly as dictionaries, bypassing the registry.
Currently not supported. (default: :obj:`None`)
agent_id (str): The unique identifier for the agent. Must be provided
explicitly to support multi-agent routing and control.
system_message (Optional[str]): The system prompt for the agent,
describing its behavior or role. (default: :obj:`None`)
message_window_size (Optional[int]): The number of recent messages to
retain in memory for context. (default: :obj:`None`)
token_limit (Optional[int]): The token budget for contextual memory.
(default: :obj:`None`)
output_language (Optional[str]): Preferred output language for the
agent's replies. (default: :obj:`None`)
max_iteration (Optional[int]): Maximum number of model
calling iterations allowed per step. If `None` (default), there's
no explicit limit. If `1`, it performs a single model call. If `N
> 1`, it allows up to N model calls. (default: :obj:`None`)
"""
model_type: Optional[str] = "gpt-4o-mini"
model_platform: Optional[str] = "openai"
tools_names: Optional[List[str]] = None
external_tools: Optional[List[Dict[str, Any]]] = None
agent_id: str # Required: explicitly set agent_id to
# support future multi-agent and permission control
system_message: Optional[str] = None
message_window_size: Optional[int] = None
token_limit: Optional[int] = None
output_language: Optional[str] = None
max_iteration: Optional[int] = None # Changed from Optional[bool] = False
class StepRequest(BaseModel):
r"""Request schema for sending a user message to a ChatAgent.
Supports plain text input or structured message dictionaries, with an
optional response format for controlling output structure.
Args:
input_message (Union[str, Dict[str, Any]]): The user message to send.
Can be a plain string or a message dict with role, content, etc.
response_format (Optional[str]): Optional format name that maps to a
registered response schema. Not currently in use.
(default: :obj:`None`)
"""
input_message: Union[str, Dict[str, Any]]
response_format: Optional[str] = None # reserved, not used yet
class ChatAgentOpenAPIServer:
r"""A FastAPI server wrapper for managing ChatAgents via OpenAPI routes.
This server exposes a versioned REST API for interacting with CAMEL
agents, supporting initialization, message passing, memory inspection,
and optional tool usage. It supports multi-agent use cases by mapping
unique agent IDs to active ChatAgent instances.
Typical usage includes initializing agents with system prompts and tools,
exchanging messages using /step or /astep endpoints, and inspecting agent
memory with /history.
Supports pluggable tool and response format registries for customizing
agent behavior or output schemas.
"""
def __init__(
self,
tool_registry: Optional[Dict[str, List[FunctionTool]]] = None,
response_format_registry: Optional[Dict[str, Type[BaseModel]]] = None,
):
r"""Initializes the OpenAPI server for managing ChatAgents.
Sets up internal agent storage, tool and response format registries,
and prepares versioned API routes.
Args:
tool_registry (Optional[Dict[str, List[FunctionTool]]]): A mapping
from tool names to lists of FunctionTool instances available
to agents via the "tools_names" field. If not provided, an
empty registry is used. (default: :obj:`None`)
response_format_registry (Optional[Dict[str, Type[BaseModel]]]):
A mapping from format names to Pydantic output schemas for
structured response parsing. Used for controlling the format
of step results. (default: :obj:`None`)
"""
# Initialize FastAPI app and agent
self.app = FastAPI(title="CAMEL OpenAPI-compatible Server")
self.agents: Dict[str, ChatAgent] = {}
self.tool_registry = tool_registry or {}
self.response_format_registry = response_format_registry or {}
self._setup_routes()
def _parse_input_message_for_step(
self, raw: Union[str, dict]
) -> BaseMessage:
r"""Parses raw input into a BaseMessage object.
Args:
raw (str or dict): User input as plain text or dict.
Returns:
BaseMessage: Parsed input message.
"""
if isinstance(raw, str):
return BaseMessage.make_user_message(role_name="User", content=raw)
elif isinstance(raw, dict):
if isinstance(raw.get("role_type"), str):
raw["role_type"] = RoleType(raw["role_type"].lower())
return BaseMessage(**raw)
raise HTTPException(
status_code=400, detail="Unsupported input format."
)
def _resolve_response_format_for_step(
self, name: Optional[str]
) -> Optional[Type[BaseModel]]:
r"""Resolves the response format by name.
Args:
name (str or None): Optional format name.
Returns:
Optional[Type[BaseModel]]: Response schema class.
"""
if name is None:
return None
if name not in self.response_format_registry:
raise HTTPException(
status_code=400, detail=f"Unknown response_format: {name}"
)
return self.response_format_registry[name]
def _setup_routes(self):
r"""Registers OpenAPI endpoints for agent creation and interaction.
This includes routes for initializing agents (/init), sending
messages (/step and /astep), resetting agent memory (/reset), and
retrieving conversation history (/history). All routes are added
under the /v1/agents namespace.
"""
router = APIRouter(prefix="/v1/agents")
@router.post("/init")
def init_agent(request: InitRequest):
r"""Initializes a ChatAgent instance with a model,
system message, and optional tools.
Args:
request (InitRequest): The agent config including
model, tools, system message, and agent ID.
Returns:
dict: A message with the agent ID and status.
"""
agent_id = request.agent_id
if agent_id in self.agents:
return {
"agent_id": agent_id,
"message": "Agent already exists.",
}
model_type = request.model_type
model_platform = request.model_platform
model = ModelFactory.create(
model_platform=model_platform, # type: ignore[arg-type]
model_type=model_type, # type: ignore[arg-type]
)
# tools lookup
tools = []
if request.tools_names:
for name in request.tools_names:
if name in self.tool_registry:
tools.extend(self.tool_registry[name])
else:
raise HTTPException(
status_code=400,
detail=f"Tool '{name}' " f"not found in registry",
)
# system message
system_message = request.system_message
agent = ChatAgent(
model=model,
tools=tools, # type: ignore[arg-type]
external_tools=request.external_tools, # type: ignore[arg-type]
system_message=system_message,
message_window_size=request.message_window_size,
token_limit=request.token_limit,
output_language=request.output_language,
max_iteration=request.max_iteration,
agent_id=agent_id,
)
self.agents[agent_id] = agent
return {"agent_id": agent_id, "message": "Agent initialized."}
@router.post("/astep/{agent_id}")
async def astep_agent(agent_id: str, request: StepRequest):
r"""Runs one async step of agent response.
Args:
agent_id (str): The ID of the target agent.
request (StepRequest): The input message.
Returns:
dict: The model response in serialized form.
"""
if agent_id not in self.agents:
raise HTTPException(status_code=404, detail="Agent not found.")
agent = self.agents[agent_id]
input_message = self._parse_input_message_for_step(
request.input_message
)
format_cls = self._resolve_response_format_for_step(
request.response_format
)
try:
response = await agent.astep(
input_message=input_message, response_format=format_cls
)
return response.model_dump()
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unexpected error during async step: {e!s}",
)
@router.get("/list_agent_ids")
def list_agent_ids():
r"""Returns a list of all active agent IDs.
Returns:
dict: A dictionary containing all registered agent IDs.
"""
return {"agent_ids": list(self.agents.keys())}
@router.post("/delete/{agent_id}")
def delete_agent(agent_id: str):
r"""Deletes an agent from the server.
Args:
agent_id (str): The ID of the agent to delete.
Returns:
dict: A confirmation message upon successful deletion.
"""
if agent_id not in self.agents:
raise HTTPException(status_code=404, detail="Agent not found.")
del self.agents[agent_id]
return {"message": f"Agent {agent_id} deleted."}
@router.post("/step/{agent_id}")
def step_agent(agent_id: str, request: StepRequest):
r"""Runs one step of synchronous agent response.
Args:
agent_id (str): The ID of the target agent.
request (StepRequest): The input message.
Returns:
dict: The model response in serialized form.
"""
if agent_id not in self.agents:
raise HTTPException(status_code=404, detail="Agent not found.")
agent = self.agents[agent_id]
input_message = self._parse_input_message_for_step(
request.input_message
)
format_cls = self._resolve_response_format_for_step(
request.response_format
)
try:
response = agent.step(
input_message=input_message, response_format=format_cls
)
return response.model_dump()
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unexpected error during step: {e!s}",
)
@router.post("/reset/{agent_id}")
def reset_agent(agent_id: str):
r"""Clears memory for a specific agent.
Args:
agent_id (str): The ID of the agent to reset.
Returns:
dict: A message confirming reset success.
"""
if agent_id not in self.agents:
raise HTTPException(status_code=404, detail="Agent not found.")
self.agents[agent_id].reset()
return {"message": f"Agent {agent_id} reset."}
@router.get("/history/{agent_id}")
def get_agent_chat_history(agent_id: str):
r"""Returns the chat history of an agent.
Args:
agent_id (str): The ID of the agent to query.
Returns:
list: The list of conversation messages.
"""
if agent_id not in self.agents:
raise HTTPException(
status_code=404, detail=f"Agent {agent_id} not found."
)
return self.agents[agent_id].chat_history
# Register all routes to the main FastAPI app
self.app.include_router(router)
def get_app(self) -> FastAPI:
r"""Returns the FastAPI app instance.
Returns:
FastAPI: The wrapped application object.
"""
return self.app