eigent/backend/camel/models/gemini_model.py
2026-03-31 17:20:08 +08:00

705 lines
29 KiB
Python

# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
import os
from typing import (
Any,
AsyncGenerator,
Dict,
Generator,
List,
Optional,
Type,
Union,
)
from openai import AsyncStream, Stream
from pydantic import BaseModel
from camel.configs import GeminiConfig
from camel.messages import OpenAIMessage
from camel.models.openai_compatible_model import OpenAICompatibleModel
from camel.types import (
ChatCompletion,
ChatCompletionChunk,
ModelType,
)
from camel.utils import (
BaseTokenCounter,
api_keys_required,
)
if os.environ.get("LANGFUSE_ENABLED", "False").lower() == "true":
try:
from langfuse.decorators import observe
except ImportError:
from camel.utils import observe
elif os.environ.get("TRACEROOT_ENABLED", "False").lower() == "true":
try:
from traceroot import trace as observe # type: ignore[import]
except ImportError:
from camel.utils import observe
else:
from camel.utils import observe
class GeminiModel(OpenAICompatibleModel):
r"""Gemini API in a unified OpenAICompatibleModel interface.
Args:
model_type (Union[ModelType, str]): Model for which a backend is
created, one of Gemini series.
model_config_dict (Optional[Dict[str, Any]], optional): A dictionary
that will be fed into:obj:`openai.ChatCompletion.create()`. If
:obj:`None`, :obj:`GeminiConfig().as_dict()` will be used.
(default: :obj:`None`)
api_key (Optional[str], optional): The API key for authenticating with
the Gemini service. (default: :obj:`None`)
url (Optional[str], optional): The url to the Gemini service.
(default: :obj:`https://generativelanguage.googleapis.com/v1beta/
openai/`)
token_counter (Optional[BaseTokenCounter], optional): Token counter to
use for the model. If not provided, :obj:`OpenAITokenCounter(
ModelType.GPT_4O_MINI)` will be used.
(default: :obj:`None`)
timeout (Optional[float], optional): The timeout value in seconds for
API calls. If not provided, will fall back to the MODEL_TIMEOUT
environment variable or default to 180 seconds.
(default: :obj:`None`)
max_retries (int, optional): Maximum number of retries for API calls.
(default: :obj:`3`)
**kwargs (Any): Additional arguments to pass to the client
initialization.
"""
@api_keys_required(
[
("api_key", 'GEMINI_API_KEY'),
]
)
def __init__(
self,
model_type: Union[ModelType, str],
model_config_dict: Optional[Dict[str, Any]] = None,
api_key: Optional[str] = None,
url: Optional[str] = None,
token_counter: Optional[BaseTokenCounter] = None,
timeout: Optional[float] = None,
max_retries: int = 3,
**kwargs: Any,
) -> None:
if model_config_dict is None:
model_config_dict = GeminiConfig().as_dict()
api_key = api_key or os.environ.get("GEMINI_API_KEY")
url = url or os.environ.get(
"GEMINI_API_BASE_URL",
"https://generativelanguage.googleapis.com/v1beta/openai/",
)
timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180))
super().__init__(
model_type=model_type,
model_config_dict=model_config_dict,
api_key=api_key,
url=url,
token_counter=token_counter,
timeout=timeout,
max_retries=max_retries,
**kwargs,
)
def _process_messages(self, messages) -> List[OpenAIMessage]:
r"""Process the messages for Gemini API to ensure no empty content,
which is not accepted by Gemini. Also preserves thought signatures
required for Gemini 3 Pro function calling.
This method also merges consecutive assistant messages with single
tool calls into a single assistant message with multiple tool calls,
as required by Gemini's OpenAI-compatible API for parallel function
calling.
"""
import copy
processed_messages: List[OpenAIMessage] = []
i = 0
n = len(messages)
while i < n:
msg = messages[i]
# Check if this is an assistant message with a single tool_call
# that might need to be merged with subsequent ones
if (
msg.get('role') == 'assistant'
and 'tool_calls' in msg
and isinstance(msg['tool_calls'], list)
and len(msg['tool_calls']) == 1
):
# Look ahead to check if there are more assistant messages
# with single tool calls (interleaved with their tool results)
j = i + 1
has_more_tool_calls = False
# Collect tool_call_ids we've seen so far
first_tool_call_id = msg['tool_calls'][0].get('id')
seen_tool_call_ids = (
{first_tool_call_id} if first_tool_call_id else set()
)
# Scan ahead to find pattern: tool_result, assistant,
# tool_result, ...
while j < n:
next_msg = messages[j]
next_role = next_msg.get('role')
if next_role == 'tool':
# Tool result - check if it belongs to our batch
if next_msg.get('tool_call_id') in seen_tool_call_ids:
j += 1
continue
else:
# Tool result for unknown call, stop scanning
break
elif (
next_role == 'assistant'
and 'tool_calls' in next_msg
and isinstance(next_msg['tool_calls'], list)
and len(next_msg['tool_calls']) == 1
):
# Another single tool call - mark for merging
has_more_tool_calls = True
tc_id = next_msg['tool_calls'][0].get('id')
if tc_id:
seen_tool_call_ids.add(tc_id)
j += 1
continue
else:
# Something else, stop scanning
break
if has_more_tool_calls:
# Need to merge: collect all tool calls and results
merged_tool_calls = []
tool_results = []
is_first = True
for k in range(i, j):
m = messages[k]
if m.get('role') == 'assistant' and 'tool_calls' in m:
tc = m['tool_calls'][0]
if is_first:
# Keep extra_content only on first tool call
merged_tool_calls.append(copy.deepcopy(tc))
is_first = False
else:
# Remove extra_content from subsequent tool
# calls
tc_copy = {
k: v
for k, v in tc.items()
if k != 'extra_content'
}
merged_tool_calls.append(tc_copy)
elif m.get('role') == 'tool':
tool_results.append(copy.deepcopy(m))
# Build merged assistant message
merged_msg = copy.deepcopy(msg)
merged_msg['tool_calls'] = merged_tool_calls
if 'content' in merged_msg and merged_msg['content'] == '':
merged_msg['content'] = 'null'
processed_messages.append(merged_msg)
processed_messages.extend(tool_results)
i = j
continue
# Regular message processing (no merging needed)
msg_copy = copy.deepcopy(msg)
if 'content' in msg_copy and msg_copy['content'] == '':
msg_copy['content'] = 'null'
processed_messages.append(msg_copy)
i += 1
return processed_messages
@staticmethod
def _normalize_response_content(content: Any) -> Any:
r"""Normalize Gemini's literal ``"null"`` response content to empty.
Gemini rejects empty assistant content in some tool-calling request
payloads, so request-side adaptation still uses ``"null"`` as a
sentinel. This helper is only for response-side normalization before
higher layers consume the content.
"""
if isinstance(content, str) and content.strip().lower() == 'null':
return ''
return content
@classmethod
def _normalize_completion_chunk(
cls, chunk: ChatCompletionChunk
) -> ChatCompletionChunk:
r"""Normalize literal ``"null"`` text in streaming deltas."""
if not hasattr(chunk, 'choices') or not chunk.choices:
return chunk
for choice in chunk.choices:
delta = getattr(choice, 'delta', None)
if delta is None or not hasattr(delta, 'content'):
continue
normalized = cls._normalize_response_content(delta.content)
if normalized != delta.content:
delta.content = normalized
return chunk
@classmethod
def _normalize_completion_response(
cls, response: ChatCompletion
) -> ChatCompletion:
r"""Normalize literal ``"null"`` text in non-stream responses."""
if not hasattr(response, 'choices') or not response.choices:
return response
for choice in response.choices:
message = getattr(choice, 'message', None)
if message is None or not hasattr(message, 'content'):
continue
normalized = cls._normalize_response_content(message.content)
if normalized != message.content:
message.content = normalized
return response
def _preserve_thought_signatures(
self,
response: Union[
ChatCompletion,
Stream[ChatCompletionChunk],
AsyncStream[ChatCompletionChunk],
],
) -> Union[
ChatCompletion,
Generator[ChatCompletionChunk, None, None],
AsyncGenerator[ChatCompletionChunk, None],
]:
r"""Preserve thought signatures from Gemini responses for future
requests.
According to the Gemini documentation, when a response contains tool
calls with thought signatures, these signatures must be preserved
exactly as received when the response is added to conversation history
for subsequent requests.
Args:
response: The response from Gemini API
Returns:
The response with thought signatures properly preserved.
For streaming responses, returns generators that preserve
signatures.
"""
# For streaming responses, we need to wrap the stream to preserve
# thought signatures in tool calls as they come in
if isinstance(response, Stream):
return self._wrap_stream_with_thought_preservation(response)
elif isinstance(response, AsyncStream):
return self._wrap_async_stream_with_thought_preservation(response)
# For non-streaming responses, thought signatures are already preserved
# in _process_messages when the response becomes part of conversation
# history
return self._normalize_completion_response(response)
def _wrap_stream_with_thought_preservation(
self, stream: Stream[ChatCompletionChunk]
) -> Generator[ChatCompletionChunk, None, None]:
r"""Wrap a streaming response to preserve thought signatures in tool
calls.
This method ensures that when Gemini streaming responses contain tool
calls with thought signatures, these are properly preserved in the
extra_content field for future conversation context.
Args:
stream: The original streaming response from Gemini
Returns:
A wrapped stream that preserves thought signatures
"""
def thought_preserving_generator():
accumulated_signatures = {} # Store signatures by tool call index
for chunk in stream:
# Process chunk normally first
processed_chunk = self._normalize_completion_chunk(chunk)
# Check if this chunk contains tool call deltas with thought
# signatures
if (
hasattr(processed_chunk, 'choices')
and processed_chunk.choices
and hasattr(processed_chunk.choices[0], 'delta')
and hasattr(processed_chunk.choices[0].delta, 'tool_calls')
):
delta_tool_calls = (
processed_chunk.choices[0].delta.tool_calls
)
if delta_tool_calls:
for tool_call_delta in delta_tool_calls:
index = tool_call_delta.index
# Check for thought signatures in the tool call
# response Gemini may include these in custom
# fields
if hasattr(tool_call_delta, 'extra_content'):
extra_content = tool_call_delta.extra_content
if (
isinstance(extra_content, dict)
and 'google' in extra_content
):
google_content = extra_content['google']
if 'thought_signature' in google_content:
# Store the thought signature for this
# tool call
accumulated_signatures[index] = (
extra_content
)
# Also check if thought signature is in function
# response
elif hasattr(
tool_call_delta, 'function'
) and hasattr(
tool_call_delta.function, 'extra_content'
):
func_extra = (
tool_call_delta.function.extra_content
)
if (
isinstance(func_extra, dict)
and 'google' in func_extra
):
accumulated_signatures[index] = func_extra
# If we have accumulated signature for this tool
# call, ensure it's preserved in the chunk
if index in accumulated_signatures:
# Add extra_content to tool call delta if it
# doesn't exist
if not hasattr(
tool_call_delta, 'extra_content'
):
tool_call_delta.extra_content = (
accumulated_signatures[index]
)
elif tool_call_delta.extra_content is None:
tool_call_delta.extra_content = (
accumulated_signatures[index]
)
yield processed_chunk
return thought_preserving_generator()
def _wrap_async_stream_with_thought_preservation(
self, stream: AsyncStream[ChatCompletionChunk]
) -> AsyncGenerator[ChatCompletionChunk, None]:
r"""Wrap an async streaming response to preserve thought signatures in
tool calls.
This method ensures that when Gemini async streaming responses contain
tool calls with thought signatures, these are properly preserved in
the extra_content field for future conversation context.
Args:
stream: The original async streaming response from Gemini
Returns:
A wrapped async stream that preserves thought signatures
"""
async def async_thought_preserving_generator():
accumulated_signatures = {} # Store signatures by tool call index
async for chunk in stream:
# Process chunk normally first
processed_chunk = self._normalize_completion_chunk(chunk)
# Check if this chunk contains tool call deltas with thought
# signatures
if (
hasattr(processed_chunk, 'choices')
and processed_chunk.choices
and hasattr(processed_chunk.choices[0], 'delta')
and hasattr(processed_chunk.choices[0].delta, 'tool_calls')
):
delta_tool_calls = (
processed_chunk.choices[0].delta.tool_calls
)
if delta_tool_calls:
for tool_call_delta in delta_tool_calls:
index = tool_call_delta.index
# Check for thought signatures in the tool call
# response
if hasattr(tool_call_delta, 'extra_content'):
extra_content = tool_call_delta.extra_content
if (
isinstance(extra_content, dict)
and 'google' in extra_content
):
google_content = extra_content['google']
if 'thought_signature' in google_content:
# Store the thought signature for this
# tool call
accumulated_signatures[index] = (
extra_content
)
# Also check if thought signature is in function
# response
elif hasattr(
tool_call_delta, 'function'
) and hasattr(
tool_call_delta.function, 'extra_content'
):
func_extra = (
tool_call_delta.function.extra_content
)
if (
isinstance(func_extra, dict)
and 'google' in func_extra
):
accumulated_signatures[index] = func_extra
# If we have accumulated signature for this tool
# call, ensure it's preserved in the chunk
if index in accumulated_signatures:
# Add extra_content to tool call delta if it
# doesn't exist
if not hasattr(
tool_call_delta, 'extra_content'
):
tool_call_delta.extra_content = (
accumulated_signatures[index]
)
elif tool_call_delta.extra_content is None:
tool_call_delta.extra_content = (
accumulated_signatures[index]
)
yield processed_chunk
return async_thought_preserving_generator()
@observe()
def _run(
self,
messages: List[OpenAIMessage],
response_format: Optional[Type[BaseModel]] = None,
tools: Optional[List[Dict[str, Any]]] = None,
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
r"""Runs inference of Gemini chat completion.
Args:
messages (List[OpenAIMessage]): Message list with the chat history
in OpenAI API format.
response_format (Optional[Type[BaseModel]]): The format of the
response.
tools (Optional[List[Dict[str, Any]]]): The schema of the tools to
use for the request.
Returns:
Union[ChatCompletion, Stream[ChatCompletionChunk]]:
`ChatCompletion` in the non-stream mode, or
`Stream[ChatCompletionChunk]` in the stream mode.
"""
self._log_and_trace()
response_format = response_format or self.model_config_dict.get(
"response_format", None
)
messages = self._process_messages(messages)
if response_format:
if tools:
raise ValueError(
"Gemini does not support function calling with "
"response format."
)
result: Union[ChatCompletion, Stream[ChatCompletionChunk]] = (
self._request_parse(messages, response_format)
)
else:
result = self._request_chat_completion(messages, tools)
return result
@observe()
async def _arun(
self,
messages: List[OpenAIMessage],
response_format: Optional[Type[BaseModel]] = None,
tools: Optional[List[Dict[str, Any]]] = None,
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
r"""Runs inference of OpenAI chat completion in async mode.
Args:
messages (List[OpenAIMessage]): Message list with the chat history
in OpenAI API format.
response_format (Optional[Type[BaseModel]]): The format of the
response.
tools (Optional[List[Dict[str, Any]]]): The schema of the tools to
use for the request.
Returns:
Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
`ChatCompletion` in the non-stream mode, or
`AsyncStream[ChatCompletionChunk]` in the stream mode.
"""
self._log_and_trace()
response_format = response_format or self.model_config_dict.get(
"response_format", None
)
messages = self._process_messages(messages)
if response_format:
if tools:
raise ValueError(
"Gemini does not support function calling with "
"response format."
)
result: Union[
ChatCompletion, AsyncStream[ChatCompletionChunk]
] = await self._arequest_parse(messages, response_format)
else:
result = await self._arequest_chat_completion(messages, tools)
return result
def _request_chat_completion(
self,
messages: List[OpenAIMessage],
tools: Optional[List[Dict[str, Any]]] = None,
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
import copy
request_config = copy.deepcopy(self.model_config_dict)
# Remove strict and anyOf from each tool's function parameters since
# Gemini does not support them
if tools:
for tool in tools:
function_dict = tool.get('function', {})
function_dict.pop("strict", None)
# Process parameters to remove anyOf and handle enum/format
if 'parameters' in function_dict:
params = function_dict['parameters']
if 'properties' in params:
for prop_name, prop_value in params[
'properties'
].items():
if 'anyOf' in prop_value:
# Replace anyOf with the first type in the list
first_type = prop_value['anyOf'][0]
params['properties'][prop_name] = first_type
# Preserve description if it exists
if 'description' in prop_value:
params['properties'][prop_name][
'description'
] = prop_value['description']
# Handle enum and format restrictions for Gemini
# API enum: only allowed for string type
if prop_value.get('type') != 'string':
prop_value.pop('enum', None)
# format: only allowed for string, integer, and
# number types
if prop_value.get('type') not in [
'string',
'integer',
'number',
]:
prop_value.pop('format', None)
request_config["tools"] = tools
response = self._call_client(
self._client.chat.completions.create,
messages=messages,
model=self.model_type,
**request_config,
)
# Preserve thought signatures from the response for future requests
return self._preserve_thought_signatures(response) # type: ignore[return-value]
async def _arequest_chat_completion(
self,
messages: List[OpenAIMessage],
tools: Optional[List[Dict[str, Any]]] = None,
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
import copy
request_config = copy.deepcopy(self.model_config_dict)
# Remove strict and anyOf from each tool's function parameters since
# Gemini does not support them
if tools:
for tool in tools:
function_dict = tool.get('function', {})
function_dict.pop("strict", None)
# Process parameters to remove anyOf and handle enum/format
if 'parameters' in function_dict:
params = function_dict['parameters']
if 'properties' in params:
for prop_name, prop_value in params[
'properties'
].items():
if 'anyOf' in prop_value:
# Replace anyOf with the first type in the list
first_type = prop_value['anyOf'][0]
params['properties'][prop_name] = first_type
# Preserve description if it exists
if 'description' in prop_value:
params['properties'][prop_name][
'description'
] = prop_value['description']
# Handle enum and format restrictions for Gemini
# API enum: only allowed for string type
if prop_value.get('type') != 'string':
prop_value.pop('enum', None)
# format: only allowed for string, integer, and
# number types
if prop_value.get('type') not in [
'string',
'integer',
'number',
]:
prop_value.pop('format', None)
request_config["tools"] = tools
response = await self._acall_client(
self._async_client.chat.completions.create,
messages=messages,
model=self.model_type,
**request_config,
)
# Preserve thought signatures from the response for future requests
return self._preserve_thought_signatures(response) # type: ignore[return-value]