mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-24 22:04:09 +00:00
1004 lines
40 KiB
Python
1004 lines
40 KiB
Python
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
import json
|
|
import os
|
|
import time
|
|
import warnings
|
|
from typing import Any, Dict, List, Optional, Type, Union, cast
|
|
|
|
from openai import AsyncStream, Stream
|
|
from pydantic import BaseModel
|
|
|
|
from camel.configs import AnthropicConfig
|
|
from camel.messages import OpenAIMessage
|
|
from camel.models.base_model import BaseModelBackend
|
|
from camel.types import ChatCompletion, ChatCompletionChunk, ModelType
|
|
from camel.utils import (
|
|
AnthropicTokenCounter,
|
|
BaseTokenCounter,
|
|
api_keys_required,
|
|
dependencies_required,
|
|
get_current_agent_session_id,
|
|
update_langfuse_trace,
|
|
)
|
|
|
|
ANTHROPIC_BETA_FOR_STRUCTURED_OUTPUTS = "structured-outputs-2025-11-13"
|
|
|
|
if os.environ.get("LANGFUSE_ENABLED", "False").lower() == "true":
|
|
try:
|
|
from langfuse.decorators import observe
|
|
except ImportError:
|
|
from camel.utils import observe
|
|
elif os.environ.get("TRACEROOT_ENABLED", "False").lower() == "true":
|
|
try:
|
|
from traceroot import trace as observe # type: ignore[import]
|
|
except ImportError:
|
|
from camel.utils import observe
|
|
else:
|
|
from camel.utils import observe
|
|
|
|
|
|
def strip_trailing_whitespace_from_messages(
|
|
messages: List[OpenAIMessage],
|
|
) -> List[OpenAIMessage]:
|
|
r"""Strip trailing whitespace from all message contents in a list of
|
|
messages. This is necessary because the Anthropic API doesn't allow
|
|
trailing whitespace in message content.
|
|
|
|
Args:
|
|
messages (List[OpenAIMessage]): List of messages to process
|
|
|
|
Returns:
|
|
List[OpenAIMessage]: The processed messages with trailing whitespace
|
|
removed
|
|
"""
|
|
if not messages:
|
|
return messages
|
|
|
|
# Create a deep copy to avoid modifying the original messages
|
|
processed_messages = [dict(msg) for msg in messages]
|
|
|
|
# Process each message
|
|
for msg in processed_messages:
|
|
if "content" in msg and msg["content"] is not None:
|
|
if isinstance(msg["content"], str):
|
|
msg["content"] = msg["content"].rstrip()
|
|
elif isinstance(msg["content"], list):
|
|
# Handle content that's a list of content parts (e.g., for
|
|
# multimodal content)
|
|
for i, part in enumerate(msg["content"]):
|
|
if (
|
|
isinstance(part, dict)
|
|
and "text" in part
|
|
and isinstance(part["text"], str)
|
|
):
|
|
part["text"] = part["text"].rstrip()
|
|
elif isinstance(part, str):
|
|
msg["content"][i] = part.rstrip()
|
|
|
|
return processed_messages # type: ignore[return-value]
|
|
|
|
|
|
class AnthropicModel(BaseModelBackend):
|
|
r"""Anthropic API in a unified BaseModelBackend interface.
|
|
|
|
Args:
|
|
model_type (Union[ModelType, str]): Model for which a backend is
|
|
created, one of CLAUDE_* series.
|
|
model_config_dict (Optional[Dict[str, Any]], optional): A dictionary
|
|
that will be fed into Anthropic API. If :obj:`None`,
|
|
:obj:`AnthropicConfig().as_dict()` will be used.
|
|
(default: :obj:`None`)
|
|
api_key (Optional[str], optional): The API key for authenticating with
|
|
the Anthropic service. (default: :obj:`None`)
|
|
url (Optional[str], optional): The url to the Anthropic service.
|
|
(default: :obj:`None`)
|
|
token_counter (Optional[BaseTokenCounter], optional): Token counter to
|
|
use for the model. If not provided, :obj:`AnthropicTokenCounter`
|
|
will be used. (default: :obj:`None`)
|
|
timeout (Optional[float], optional): The timeout value in seconds for
|
|
API calls. If not provided, will fall back to the MODEL_TIMEOUT
|
|
environment variable or default to 180 seconds.
|
|
(default: :obj:`None`)
|
|
max_retries (int, optional): Maximum number of retries for API calls.
|
|
(default: :obj:`3`)
|
|
client (Optional[Any], optional): A custom synchronous Anthropic client
|
|
instance. If provided, this client will be used instead of
|
|
creating a new one. (default: :obj:`None`)
|
|
async_client (Optional[Any], optional): A custom asynchronous Anthropic
|
|
client instance. If provided, this client will be used instead of
|
|
creating a new one. (default: :obj:`None`)
|
|
use_beta_for_structured_outputs (bool, optional): Whether to use the
|
|
beta API for structured outputs. (default: :obj:`False`)
|
|
**kwargs (Any): Additional arguments to pass to the client
|
|
initialization.
|
|
"""
|
|
|
|
@api_keys_required(
|
|
[
|
|
("api_key", "ANTHROPIC_API_KEY"),
|
|
]
|
|
)
|
|
@dependencies_required('anthropic')
|
|
def __init__(
|
|
self,
|
|
model_type: Union[ModelType, str],
|
|
model_config_dict: Optional[Dict[str, Any]] = None,
|
|
api_key: Optional[str] = None,
|
|
url: Optional[str] = None,
|
|
token_counter: Optional[BaseTokenCounter] = None,
|
|
timeout: Optional[float] = None,
|
|
max_retries: int = 3,
|
|
client: Optional[Any] = None,
|
|
async_client: Optional[Any] = None,
|
|
use_beta_for_structured_outputs: bool = False,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
if model_config_dict is None:
|
|
model_config_dict = AnthropicConfig().as_dict()
|
|
api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
|
|
url = url or os.environ.get("ANTHROPIC_API_BASE_URL")
|
|
timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180))
|
|
|
|
cache_control = model_config_dict.get("cache_control")
|
|
|
|
super().__init__(
|
|
model_type=model_type,
|
|
model_config_dict=model_config_dict,
|
|
api_key=api_key,
|
|
url=url,
|
|
token_counter=token_counter,
|
|
timeout=timeout,
|
|
max_retries=max_retries,
|
|
)
|
|
|
|
# Initialize Anthropic clients
|
|
from anthropic import Anthropic, AsyncAnthropic
|
|
|
|
if client is not None:
|
|
self._client = client
|
|
else:
|
|
self._client = Anthropic(
|
|
api_key=self._api_key,
|
|
base_url=self._url,
|
|
timeout=self._timeout,
|
|
max_retries=max_retries,
|
|
**kwargs,
|
|
)
|
|
|
|
if async_client is not None:
|
|
self._async_client = async_client
|
|
else:
|
|
self._async_client = AsyncAnthropic(
|
|
api_key=self._api_key,
|
|
base_url=self._url,
|
|
timeout=self._timeout,
|
|
max_retries=max_retries,
|
|
**kwargs,
|
|
)
|
|
|
|
if cache_control is not None and cache_control not in ("5m", "1h"):
|
|
raise ValueError(
|
|
f"Invalid cache_control value: {cache_control!r}. "
|
|
f"Must be either '5m' or '1h'."
|
|
)
|
|
|
|
self._cache_control_config = None
|
|
if cache_control:
|
|
self._cache_control_config = {
|
|
"type": "ephemeral",
|
|
"ttl": cache_control,
|
|
}
|
|
|
|
self._use_beta_for_structured_outputs = use_beta_for_structured_outputs
|
|
|
|
@property
|
|
def token_counter(self) -> BaseTokenCounter:
|
|
r"""Initialize the token counter for the model backend.
|
|
|
|
Returns:
|
|
AnthropicTokenCounter: The token counter following the model's
|
|
tokenization style.
|
|
"""
|
|
if not self._token_counter:
|
|
self._token_counter = AnthropicTokenCounter(
|
|
model=str(self.model_type),
|
|
api_key=self._api_key,
|
|
base_url=self._url,
|
|
)
|
|
return self._token_counter
|
|
|
|
def _convert_openai_to_anthropic_messages(
|
|
self,
|
|
messages: List[OpenAIMessage],
|
|
) -> tuple[Optional[str], List[Dict[str, Any]]]:
|
|
r"""Convert OpenAI format messages to Anthropic format.
|
|
|
|
Args:
|
|
messages (List[OpenAIMessage]): Messages in OpenAI format.
|
|
|
|
Returns:
|
|
tuple[Optional[str], List[Dict[str, Any]]]: A tuple containing
|
|
the system message (if any) and the list of messages in
|
|
Anthropic format.
|
|
"""
|
|
from anthropic.types import MessageParam
|
|
|
|
system_message = None
|
|
anthropic_messages: List[MessageParam] = []
|
|
|
|
for msg in messages:
|
|
role = msg.get("role")
|
|
content = msg.get("content")
|
|
|
|
if role == "system":
|
|
# Anthropic uses a separate system parameter
|
|
if isinstance(content, str):
|
|
system_message = content
|
|
elif isinstance(content, list):
|
|
# Extract text from content blocks
|
|
text_parts = []
|
|
for part in content:
|
|
if isinstance(part, dict) and "text" in part:
|
|
text_parts.append(part["text"])
|
|
elif isinstance(part, str):
|
|
text_parts.append(part)
|
|
system_message = "\n".join(text_parts)
|
|
elif role == "user":
|
|
# Convert user message
|
|
if isinstance(content, str):
|
|
anthropic_messages.append(
|
|
MessageParam(role="user", content=content)
|
|
)
|
|
elif isinstance(content, list):
|
|
# Handle multimodal content
|
|
anthropic_messages.append(
|
|
MessageParam(role="user", content=content)
|
|
)
|
|
elif role == "assistant":
|
|
# Convert assistant message
|
|
assistant_content: Union[str, List[Dict[str, Any]]] = ""
|
|
|
|
if msg.get("tool_calls"):
|
|
# Handle tool calls - Anthropic uses content blocks
|
|
content_blocks = []
|
|
if content:
|
|
content_blocks.append(
|
|
{"type": "text", "text": str(content)}
|
|
)
|
|
|
|
for tool_call in msg.get("tool_calls"): # type: ignore[attr-defined]
|
|
tool_use_block = {
|
|
"type": "tool_use",
|
|
"id": tool_call.get("id", ""),
|
|
"name": tool_call.get("function", {}).get(
|
|
"name", ""
|
|
),
|
|
"input": {},
|
|
}
|
|
# Parse arguments if it's a string
|
|
arguments = tool_call.get("function", {}).get(
|
|
"arguments", "{}"
|
|
)
|
|
if isinstance(arguments, str):
|
|
try:
|
|
tool_use_block["input"] = json.loads(arguments)
|
|
except json.JSONDecodeError:
|
|
tool_use_block["input"] = {}
|
|
else:
|
|
tool_use_block["input"] = arguments
|
|
content_blocks.append(tool_use_block)
|
|
|
|
anthropic_messages.append(
|
|
MessageParam(role="assistant", content=content_blocks) # type: ignore[typeddict-item]
|
|
)
|
|
else:
|
|
if isinstance(content, str):
|
|
assistant_content = content
|
|
elif isinstance(content, list):
|
|
assistant_content = content
|
|
else:
|
|
assistant_content = str(content) if content else ""
|
|
|
|
anthropic_messages.append(
|
|
MessageParam(
|
|
role="assistant",
|
|
content=assistant_content, # type: ignore[typeddict-item]
|
|
)
|
|
)
|
|
elif role == "tool":
|
|
# Convert tool response message
|
|
tool_call_id = msg.get("tool_call_id", "")
|
|
tool_content = (
|
|
content if isinstance(content, str) else str(content)
|
|
)
|
|
anthropic_messages.append(
|
|
MessageParam(
|
|
role="user",
|
|
content=[
|
|
{ # type: ignore[list-item]
|
|
"type": "tool_result",
|
|
"tool_use_id": tool_call_id,
|
|
"content": tool_content,
|
|
}
|
|
],
|
|
)
|
|
)
|
|
|
|
return system_message, anthropic_messages # type: ignore[return-value]
|
|
|
|
def _convert_anthropic_to_openai_response(
|
|
self, response: Any, model: str
|
|
) -> ChatCompletion:
|
|
r"""Convert Anthropic API response to OpenAI ChatCompletion format.
|
|
|
|
Args:
|
|
response: The response object from Anthropic API.
|
|
model (str): The model name.
|
|
|
|
Returns:
|
|
ChatCompletion: Response in OpenAI format.
|
|
"""
|
|
# Extract message content
|
|
content = ""
|
|
tool_calls = None
|
|
|
|
if hasattr(response, "content"):
|
|
content_blocks = response.content
|
|
if content_blocks:
|
|
# Extract text content and tool calls
|
|
text_parts = []
|
|
tool_calls_list = []
|
|
for block in content_blocks:
|
|
if hasattr(block, "type"):
|
|
if block.type == "text":
|
|
if hasattr(block, "text"):
|
|
text_parts.append(block.text)
|
|
elif block.type == "tool_use":
|
|
tool_input = (
|
|
block.input if hasattr(block, "input") else {}
|
|
)
|
|
tool_calls_list.append(
|
|
{
|
|
"id": block.id
|
|
if hasattr(block, "id")
|
|
else "",
|
|
"type": "function",
|
|
"function": {
|
|
"name": block.name
|
|
if hasattr(block, "name")
|
|
else "",
|
|
"arguments": json.dumps(tool_input)
|
|
if isinstance(tool_input, dict)
|
|
else str(tool_input),
|
|
},
|
|
}
|
|
)
|
|
elif isinstance(block, dict):
|
|
if block.get("type") == "text":
|
|
text_parts.append(block.get("text", ""))
|
|
elif block.get("type") == "tool_use":
|
|
tool_input = block.get("input", {})
|
|
tool_calls_list.append(
|
|
{
|
|
"id": block.get("id", ""),
|
|
"type": "function",
|
|
"function": {
|
|
"name": block.get("name", ""),
|
|
"arguments": json.dumps(tool_input)
|
|
if isinstance(tool_input, dict)
|
|
else str(tool_input),
|
|
},
|
|
}
|
|
)
|
|
content = "".join(text_parts)
|
|
if tool_calls_list:
|
|
tool_calls = tool_calls_list
|
|
else:
|
|
content = ""
|
|
elif isinstance(response.content, str):
|
|
content = response.content
|
|
|
|
# Determine finish reason
|
|
finish_reason = None
|
|
if hasattr(response, "stop_reason"):
|
|
stop_reason = response.stop_reason
|
|
if stop_reason == "end_turn":
|
|
finish_reason = "stop"
|
|
elif stop_reason == "max_tokens":
|
|
finish_reason = "length"
|
|
elif stop_reason == "stop_sequence":
|
|
finish_reason = "stop"
|
|
elif stop_reason == "tool_use":
|
|
finish_reason = "tool_calls"
|
|
else:
|
|
finish_reason = stop_reason
|
|
|
|
# Build message dict
|
|
message_dict: Dict[str, Any] = {
|
|
"role": "assistant",
|
|
"content": content,
|
|
}
|
|
if tool_calls:
|
|
message_dict["tool_calls"] = tool_calls
|
|
|
|
# Extract usage information
|
|
usage = None
|
|
if hasattr(response, "usage"):
|
|
usage = {
|
|
"prompt_tokens": getattr(response.usage, "input_tokens", 0),
|
|
"completion_tokens": getattr(
|
|
response.usage, "output_tokens", 0
|
|
),
|
|
"total_tokens": (
|
|
getattr(response.usage, "input_tokens", 0)
|
|
+ getattr(response.usage, "output_tokens", 0)
|
|
),
|
|
}
|
|
|
|
# Create ChatCompletion
|
|
return ChatCompletion.construct(
|
|
id=getattr(response, "id", f"chatcmpl-{int(time.time())}"),
|
|
choices=[
|
|
{
|
|
"index": 0,
|
|
"message": message_dict,
|
|
"finish_reason": finish_reason,
|
|
}
|
|
],
|
|
created=int(time.time()),
|
|
model=model,
|
|
object="chat.completion",
|
|
usage=usage,
|
|
)
|
|
|
|
def _convert_anthropic_stream_to_openai_chunk(
|
|
self,
|
|
chunk: Any,
|
|
model: str,
|
|
tool_call_index: Dict[str, int],
|
|
finish_reason_sent: bool = False,
|
|
) -> ChatCompletionChunk:
|
|
r"""Convert Anthropic streaming chunk to OpenAI ChatCompletionChunk.
|
|
|
|
Args:
|
|
chunk: The streaming chunk from Anthropic API.
|
|
model (str): The model name.
|
|
tool_call_index (Dict[str, int]): A mutable dict tracking tool call
|
|
indices by their IDs, used to maintain consistent indexing
|
|
across streaming chunks.
|
|
|
|
Returns:
|
|
ChatCompletionChunk: Chunk in OpenAI format.
|
|
"""
|
|
delta_content = ""
|
|
tool_calls = None
|
|
finish_reason = None
|
|
chunk_id = ""
|
|
usage = None
|
|
|
|
if hasattr(chunk, "type"):
|
|
chunk_type = chunk.type
|
|
if chunk_type == "message_start":
|
|
# Initialize message
|
|
if hasattr(chunk, "message") and hasattr(chunk.message, "id"):
|
|
chunk_id = chunk.message.id
|
|
return ChatCompletionChunk.construct(
|
|
id=chunk_id,
|
|
choices=[{"index": 0, "delta": {}, "finish_reason": None}],
|
|
created=int(time.time()),
|
|
model=model,
|
|
object="chat.completion.chunk",
|
|
)
|
|
elif chunk_type == "content_block_start":
|
|
# Content block starting
|
|
if hasattr(chunk, "content_block"):
|
|
block = chunk.content_block
|
|
if hasattr(block, "type") and block.type == "tool_use":
|
|
# Tool use block starting
|
|
tool_id = getattr(block, "id", "")
|
|
tool_name = getattr(block, "name", "")
|
|
# Assign index for this tool call
|
|
idx = len(
|
|
[
|
|
k
|
|
for k in tool_call_index
|
|
if not k.startswith("_")
|
|
]
|
|
)
|
|
tool_call_index[tool_id] = idx
|
|
# Also map by chunk.index for content_block_delta
|
|
chunk_idx = getattr(chunk, "index", 0)
|
|
tool_call_index[f"_chunk_{chunk_idx}"] = idx
|
|
tool_calls = [
|
|
{
|
|
"index": idx,
|
|
"id": tool_id,
|
|
"type": "function",
|
|
"function": {
|
|
"name": tool_name,
|
|
"arguments": "",
|
|
},
|
|
}
|
|
]
|
|
if tool_calls is None:
|
|
return ChatCompletionChunk.construct(
|
|
id=chunk_id,
|
|
choices=[
|
|
{"index": 0, "delta": {}, "finish_reason": None}
|
|
],
|
|
created=int(time.time()),
|
|
model=model,
|
|
object="chat.completion.chunk",
|
|
)
|
|
elif chunk_type == "content_block_delta":
|
|
# Content delta
|
|
if hasattr(chunk, "delta"):
|
|
delta_obj = chunk.delta
|
|
delta_type = getattr(delta_obj, "type", "")
|
|
if delta_type == "text_delta" or hasattr(
|
|
delta_obj, "text"
|
|
):
|
|
delta_content = getattr(delta_obj, "text", "")
|
|
elif delta_type == "input_json_delta":
|
|
# Tool input arguments delta
|
|
partial_json = getattr(delta_obj, "partial_json", "")
|
|
# Get the current tool call index from chunk.index
|
|
# Map Anthropic's chunk.index to our tool call index
|
|
chunk_idx = getattr(chunk, "index", 0)
|
|
mapped_idx = tool_call_index.get(
|
|
f"_chunk_{chunk_idx}", chunk_idx
|
|
)
|
|
tool_calls = [
|
|
{
|
|
"index": mapped_idx,
|
|
"function": {
|
|
"arguments": partial_json,
|
|
},
|
|
}
|
|
]
|
|
elif chunk_type == "content_block_stop":
|
|
# Content block finished - skip
|
|
return ChatCompletionChunk.construct(
|
|
id=chunk_id,
|
|
choices=[{"index": 0, "delta": {}, "finish_reason": None}],
|
|
created=int(time.time()),
|
|
model=model,
|
|
object="chat.completion.chunk",
|
|
)
|
|
elif chunk_type == "message_delta":
|
|
# Message delta (usage info, etc.)
|
|
if hasattr(chunk, "delta") and hasattr(
|
|
chunk.delta, "stop_reason"
|
|
):
|
|
stop_reason = chunk.delta.stop_reason
|
|
if stop_reason == "end_turn":
|
|
finish_reason = "stop"
|
|
elif stop_reason == "max_tokens":
|
|
finish_reason = "length"
|
|
elif stop_reason == "stop_sequence":
|
|
finish_reason = "stop"
|
|
elif stop_reason == "tool_use":
|
|
finish_reason = "tool_calls"
|
|
# Extract usage info from message_delta
|
|
if hasattr(chunk, "usage") and chunk.usage:
|
|
usage_obj = chunk.usage
|
|
input_tokens = getattr(usage_obj, "input_tokens", 0)
|
|
output_tokens = getattr(usage_obj, "output_tokens", 0)
|
|
usage = {
|
|
"prompt_tokens": input_tokens,
|
|
"completion_tokens": output_tokens,
|
|
"total_tokens": input_tokens + output_tokens,
|
|
}
|
|
elif chunk_type == "message_stop":
|
|
# Message finished - only set finish_reason if not already sent
|
|
# This prevents duplicate finish_reason triggers in chat_agent
|
|
final_finish_reason = None if finish_reason_sent else "stop"
|
|
return ChatCompletionChunk.construct(
|
|
id=chunk_id,
|
|
choices=[
|
|
{
|
|
"index": 0,
|
|
"delta": {},
|
|
"finish_reason": final_finish_reason,
|
|
}
|
|
],
|
|
created=int(time.time()),
|
|
model=model,
|
|
object="chat.completion.chunk",
|
|
)
|
|
|
|
delta: Dict[str, Any] = {}
|
|
if delta_content:
|
|
delta["content"] = delta_content
|
|
if tool_calls:
|
|
delta["tool_calls"] = tool_calls
|
|
|
|
return ChatCompletionChunk.construct(
|
|
id=chunk_id,
|
|
choices=[
|
|
{"index": 0, "delta": delta, "finish_reason": finish_reason}
|
|
],
|
|
created=int(time.time()),
|
|
model=model,
|
|
object="chat.completion.chunk",
|
|
usage=usage,
|
|
)
|
|
|
|
def _convert_openai_tools_to_anthropic(
|
|
self, tools: Optional[List[Dict[str, Any]]]
|
|
) -> Optional[List[Dict[str, Any]]]:
|
|
r"""Convert OpenAI tools format to Anthropic tools format.
|
|
|
|
Args:
|
|
tools (Optional[List[Dict[str, Any]]]): Tools in OpenAI format.
|
|
|
|
Returns:
|
|
Optional[List[Dict[str, Any]]]: Tools in Anthropic format.
|
|
"""
|
|
if not tools:
|
|
return None
|
|
|
|
anthropic_tools = []
|
|
for tool in tools:
|
|
if "function" in tool:
|
|
func = tool["function"]
|
|
anthropic_tool = {
|
|
"name": func.get("name", ""),
|
|
"description": func.get("description", ""),
|
|
"input_schema": func.get("parameters", {}),
|
|
}
|
|
if self._use_beta_for_structured_outputs:
|
|
anthropic_tool["strict"] = func.get("strict", True)
|
|
anthropic_tools.append(anthropic_tool)
|
|
|
|
return anthropic_tools
|
|
|
|
@observe()
|
|
def _run(
|
|
self,
|
|
messages: List[OpenAIMessage],
|
|
response_format: Optional[Type[BaseModel]] = None,
|
|
tools: Optional[List[Dict[str, Any]]] = None,
|
|
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
|
|
r"""Runs inference of Anthropic chat completion.
|
|
|
|
Args:
|
|
messages (List[OpenAIMessage]): Message list with the chat history
|
|
in OpenAI API format.
|
|
response_format (Optional[Type[BaseModel]]): The format of the
|
|
response. (Not supported by Anthropic API directly)
|
|
tools (Optional[List[Dict[str, Any]]]): The schema of the tools to
|
|
use for the request.
|
|
|
|
Returns:
|
|
Union[ChatCompletion, Stream[ChatCompletionChunk]]:
|
|
`ChatCompletion` in the non-stream mode, or
|
|
`Stream[ChatCompletionChunk]` in the stream mode.
|
|
"""
|
|
if response_format is not None:
|
|
warnings.warn(
|
|
"The 'response_format' parameter is not supported by the "
|
|
"Anthropic API and will be ignored. Consider using tools "
|
|
"for structured output instead.",
|
|
UserWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
# Update Langfuse trace with current agent session and metadata
|
|
agent_session_id = get_current_agent_session_id()
|
|
if agent_session_id:
|
|
update_langfuse_trace(
|
|
session_id=agent_session_id,
|
|
metadata={
|
|
"agent_id": agent_session_id,
|
|
"model_type": str(self.model_type),
|
|
},
|
|
tags=["CAMEL-AI", str(self.model_type)],
|
|
)
|
|
|
|
# Strip trailing whitespace from messages
|
|
processed_messages = strip_trailing_whitespace_from_messages(messages)
|
|
|
|
# Convert messages to Anthropic format
|
|
system_message, anthropic_messages = (
|
|
self._convert_openai_to_anthropic_messages(processed_messages)
|
|
)
|
|
|
|
# Prepare request parameters
|
|
request_params: Dict[str, Any] = {
|
|
"model": str(self.model_type),
|
|
"messages": anthropic_messages,
|
|
"max_tokens": self.model_config_dict.get("max_tokens", None),
|
|
}
|
|
|
|
if system_message:
|
|
# if cache_control is configured, add it to the system message
|
|
if self._cache_control_config:
|
|
request_params["system"] = [
|
|
{
|
|
"type": "text",
|
|
"text": system_message,
|
|
"cache_control": self._cache_control_config,
|
|
}
|
|
]
|
|
else:
|
|
request_params["system"] = system_message
|
|
|
|
# if cache_control is configured, add it to the last message
|
|
if self._cache_control_config and request_params["messages"]:
|
|
if isinstance(request_params["messages"], list):
|
|
if isinstance(request_params["messages"][-1]["content"], str):
|
|
request_params["messages"][-1]["content"] = [
|
|
{
|
|
"type": "text",
|
|
"text": request_params["messages"][-1]["content"],
|
|
"cache_control": self._cache_control_config,
|
|
}
|
|
]
|
|
elif isinstance(
|
|
request_params["messages"][-1]["content"], list
|
|
):
|
|
if request_params["messages"][-1][
|
|
"content"
|
|
] and isinstance(
|
|
request_params["messages"][-1]["content"][-1], dict
|
|
):
|
|
request_params["messages"][-1]["content"][-1][
|
|
"cache_control"
|
|
] = self._cache_control_config
|
|
|
|
# Add config parameters
|
|
for key in ["temperature", "top_p", "top_k", "stop_sequences"]:
|
|
if key in self.model_config_dict:
|
|
request_params[key] = self.model_config_dict[key]
|
|
|
|
# Convert tools
|
|
anthropic_tools = self._convert_openai_tools_to_anthropic(tools)
|
|
if anthropic_tools:
|
|
request_params["tools"] = anthropic_tools
|
|
|
|
# Add beta for structured outputs if configured
|
|
if self._use_beta_for_structured_outputs:
|
|
request_params["betas"] = [ANTHROPIC_BETA_FOR_STRUCTURED_OUTPUTS]
|
|
create_func = self._client.beta.messages.create
|
|
else:
|
|
create_func = self._client.messages.create
|
|
|
|
# Check if streaming
|
|
is_streaming = self.model_config_dict.get("stream", False)
|
|
|
|
if is_streaming:
|
|
# Return streaming response
|
|
stream = self._call_client(
|
|
create_func,
|
|
**request_params,
|
|
stream=True,
|
|
)
|
|
return self._wrap_anthropic_stream(stream, str(self.model_type))
|
|
else:
|
|
# Return non-streaming response
|
|
response = self._call_client(create_func, **request_params)
|
|
return self._convert_anthropic_to_openai_response(
|
|
response, str(self.model_type)
|
|
)
|
|
|
|
@observe()
|
|
async def _arun(
|
|
self,
|
|
messages: List[OpenAIMessage],
|
|
response_format: Optional[Type[BaseModel]] = None,
|
|
tools: Optional[List[Dict[str, Any]]] = None,
|
|
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
|
|
r"""Runs inference of Anthropic chat completion in async mode.
|
|
|
|
Args:
|
|
messages (List[OpenAIMessage]): Message list with the chat history
|
|
in OpenAI API format.
|
|
response_format (Optional[Type[BaseModel]]): The format of the
|
|
response. (Not supported by Anthropic API directly)
|
|
tools (Optional[List[Dict[str, Any]]]): The schema of the tools to
|
|
use for the request.
|
|
|
|
Returns:
|
|
Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
|
|
`ChatCompletion` in the non-stream mode, or
|
|
`AsyncStream[ChatCompletionChunk]` in the stream mode.
|
|
"""
|
|
if response_format is not None:
|
|
warnings.warn(
|
|
"The 'response_format' parameter is not supported by the "
|
|
"Anthropic API and will be ignored. Consider using tools "
|
|
"for structured output instead.",
|
|
UserWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
# Update Langfuse trace with current agent session and metadata
|
|
agent_session_id = get_current_agent_session_id()
|
|
if agent_session_id:
|
|
update_langfuse_trace(
|
|
session_id=agent_session_id,
|
|
metadata={
|
|
"agent_id": agent_session_id,
|
|
"model_type": str(self.model_type),
|
|
},
|
|
tags=["CAMEL-AI", str(self.model_type)],
|
|
)
|
|
|
|
# Strip trailing whitespace from messages
|
|
processed_messages = strip_trailing_whitespace_from_messages(messages)
|
|
|
|
# Convert messages to Anthropic format
|
|
system_message, anthropic_messages = (
|
|
self._convert_openai_to_anthropic_messages(processed_messages)
|
|
)
|
|
|
|
# Prepare request parameters
|
|
request_params: Dict[str, Any] = {
|
|
"model": str(self.model_type),
|
|
"messages": anthropic_messages,
|
|
"max_tokens": self.model_config_dict.get("max_tokens", None),
|
|
}
|
|
|
|
if system_message:
|
|
# if cache_control is configured, add it to the system message
|
|
if self._cache_control_config:
|
|
request_params["system"] = [
|
|
{
|
|
"type": "text",
|
|
"text": system_message,
|
|
"cache_control": self._cache_control_config,
|
|
}
|
|
]
|
|
else:
|
|
request_params["system"] = system_message
|
|
|
|
# if cache_control is configured, add it to the last message
|
|
if self._cache_control_config and request_params["messages"]:
|
|
if isinstance(request_params["messages"], list):
|
|
if isinstance(request_params["messages"][-1]["content"], str):
|
|
request_params["messages"][-1]["content"] = [
|
|
{
|
|
"type": "text",
|
|
"text": request_params["messages"][-1]["content"],
|
|
"cache_control": self._cache_control_config,
|
|
}
|
|
]
|
|
elif isinstance(
|
|
request_params["messages"][-1]["content"], list
|
|
):
|
|
if request_params["messages"][-1][
|
|
"content"
|
|
] and isinstance(
|
|
request_params["messages"][-1]["content"][-1], dict
|
|
):
|
|
request_params["messages"][-1]["content"][-1][
|
|
"cache_control"
|
|
] = self._cache_control_config
|
|
|
|
# Add config parameters
|
|
for key in ["temperature", "top_p", "top_k", "stop_sequences"]:
|
|
if key in self.model_config_dict:
|
|
request_params[key] = self.model_config_dict[key]
|
|
|
|
# Convert tools
|
|
anthropic_tools = self._convert_openai_tools_to_anthropic(tools)
|
|
if anthropic_tools:
|
|
request_params["tools"] = anthropic_tools
|
|
|
|
# Add beta for structured outputs if configured
|
|
if self._use_beta_for_structured_outputs:
|
|
request_params["betas"] = [ANTHROPIC_BETA_FOR_STRUCTURED_OUTPUTS]
|
|
create_func = self._async_client.beta.messages.create
|
|
else:
|
|
create_func = self._async_client.messages.create
|
|
|
|
# Check if streaming
|
|
is_streaming = self.model_config_dict.get("stream", False)
|
|
|
|
if is_streaming:
|
|
# Return streaming response
|
|
stream = await self._acall_client(
|
|
create_func,
|
|
**request_params,
|
|
stream=True,
|
|
)
|
|
return self._wrap_anthropic_async_stream(
|
|
stream, str(self.model_type)
|
|
)
|
|
else:
|
|
# Return non-streaming response
|
|
response = await self._acall_client(create_func, **request_params)
|
|
return self._convert_anthropic_to_openai_response(
|
|
response, str(self.model_type)
|
|
)
|
|
|
|
def _wrap_anthropic_stream(
|
|
self, stream: Any, model: str
|
|
) -> Stream[ChatCompletionChunk]:
|
|
r"""Wrap Anthropic streaming response to OpenAI Stream format.
|
|
|
|
Args:
|
|
stream: The streaming response from Anthropic API.
|
|
model (str): The model name.
|
|
|
|
Returns:
|
|
Stream[ChatCompletionChunk]: Stream in OpenAI format.
|
|
"""
|
|
|
|
def _generate_chunks():
|
|
tool_call_index: Dict[str, int] = {}
|
|
finish_reason_sent = False
|
|
for chunk in stream:
|
|
converted = self._convert_anthropic_stream_to_openai_chunk(
|
|
chunk, model, tool_call_index, finish_reason_sent
|
|
)
|
|
# Track if we've sent a finish_reason to avoid duplicates
|
|
if converted.choices:
|
|
choice = converted.choices[0]
|
|
fr = (
|
|
choice.get("finish_reason")
|
|
if isinstance(choice, dict)
|
|
else getattr(choice, "finish_reason", None)
|
|
)
|
|
if fr is not None:
|
|
finish_reason_sent = True
|
|
yield converted
|
|
|
|
return cast(Stream[ChatCompletionChunk], _generate_chunks())
|
|
|
|
def _wrap_anthropic_async_stream(
|
|
self, stream: Any, model: str
|
|
) -> AsyncStream[ChatCompletionChunk]:
|
|
r"""Wrap Anthropic async streaming response to OpenAI AsyncStream.
|
|
|
|
Args:
|
|
stream: The async streaming response from Anthropic API.
|
|
model (str): The model name.
|
|
|
|
Returns:
|
|
AsyncStream[ChatCompletionChunk]: AsyncStream in OpenAI format.
|
|
"""
|
|
|
|
async def _generate_chunks():
|
|
tool_call_index: Dict[str, int] = {}
|
|
finish_reason_sent = False
|
|
async for chunk in stream:
|
|
converted = self._convert_anthropic_stream_to_openai_chunk(
|
|
chunk, model, tool_call_index, finish_reason_sent
|
|
)
|
|
# Track if we've sent a finish_reason to avoid duplicates
|
|
if converted.choices:
|
|
choice = converted.choices[0]
|
|
fr = (
|
|
choice.get("finish_reason")
|
|
if isinstance(choice, dict)
|
|
else getattr(choice, "finish_reason", None)
|
|
)
|
|
if fr is not None:
|
|
finish_reason_sent = True
|
|
yield converted
|
|
|
|
return cast(AsyncStream[ChatCompletionChunk], _generate_chunks())
|
|
|
|
@property
|
|
def stream(self) -> bool:
|
|
r"""Returns whether the model is in stream mode, which sends partial
|
|
results each time.
|
|
|
|
Returns:
|
|
bool: Whether the model is in stream mode.
|
|
"""
|
|
return self.model_config_dict.get("stream", False)
|