eigent/backend/camel/models/sglang_model.py
2026-03-31 17:20:08 +08:00

503 lines
18 KiB
Python

# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
import logging
import os
import subprocess
import threading
import time
from typing import Any, Dict, List, Optional, Type, Union
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
from pydantic import BaseModel
from camel.configs import SGLangConfig
from camel.messages import OpenAIMessage
from camel.models import BaseModelBackend
from camel.types import (
ChatCompletion,
ChatCompletionChunk,
ModelType,
)
from camel.utils import (
BaseTokenCounter,
OpenAITokenCounter,
update_current_observation,
)
if os.environ.get("LANGFUSE_ENABLED", "False").lower() == "true":
try:
from langfuse.decorators import observe
except ImportError:
from camel.utils import observe
else:
from camel.utils import observe
class SGLangModel(BaseModelBackend):
r"""SGLang service interface.
Args:
model_type (Union[ModelType, str]): Model for which a backend is
created.
model_config_dict (Optional[Dict[str, Any]], optional): A dictionary
that will be fed into:obj:`openai.ChatCompletion.create()`. If
:obj:`None`, :obj:`SGLangConfig().as_dict()` will be used.
(default: :obj:`None`)
api_key (Optional[str], optional): The API key for authenticating with
the model service. SGLang doesn't need API key, it would be ignored
if set. (default: :obj:`None`)
url (Optional[str], optional): The url to the model service. If not
provided, :obj:`"http://127.0.0.1:30000/v1"` will be used.
(default: :obj:`None`)
token_counter (Optional[BaseTokenCounter], optional): Token counter to
use for the model. If not provided, :obj:`OpenAITokenCounter(
ModelType.GPT_4O_MINI)` will be used.
(default: :obj:`None`)
timeout (Optional[float], optional): The timeout value in seconds for
API calls. If not provided, will fall back to the MODEL_TIMEOUT
environment variable or default to 180 seconds.
(default: :obj:`None`)
max_retries (int, optional): Maximum number of retries for API calls.
(default: :obj:`3`)
client (Optional[Any], optional): A custom synchronous
OpenAI-compatible client instance. If provided, this client will
be used instead of creating a new one. Note: When using custom
clients with SGLang, server auto-start features will be disabled.
(default: :obj:`None`)
async_client (Optional[Any], optional): A custom asynchronous
OpenAI-compatible client instance. If provided, this client will
be used instead of creating a new one. (default: :obj:`None`)
**kwargs (Any): Additional arguments to pass to the client
initialization. Ignored if custom clients are provided.
Reference: https://sgl-project.github.io/backend/openai_api_completions.
html
"""
def __init__(
self,
model_type: Union[ModelType, str],
model_config_dict: Optional[Dict[str, Any]] = None,
api_key: Optional[str] = None,
url: Optional[str] = None,
token_counter: Optional[BaseTokenCounter] = None,
timeout: Optional[float] = None,
max_retries: int = 3,
client: Optional[Any] = None,
async_client: Optional[Any] = None,
**kwargs: Any,
) -> None:
if model_config_dict is None:
model_config_dict = SGLangConfig().as_dict()
self.server_process = None
self.last_run_time: Optional[float] = (
None # Will be set when the server starts
)
self._lock = threading.Lock()
self._inactivity_thread: Optional[threading.Thread] = None
timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180))
super().__init__(
model_type,
model_config_dict,
api_key,
url,
token_counter,
timeout,
max_retries,
)
# Use custom clients if provided, otherwise create new ones
if client is not None:
self._client = client
elif self._url:
# Initialize the client if an existing URL is provided
self._client = OpenAI(
timeout=self._timeout,
max_retries=self._max_retries,
api_key="Set-but-ignored", # required but ignored
base_url=self._url,
**kwargs,
)
else:
self._client = None
if async_client is not None:
self._async_client = async_client
elif self._url:
self._async_client = AsyncOpenAI(
timeout=self._timeout,
max_retries=self._max_retries,
api_key="Set-but-ignored", # required but ignored
base_url=self._url,
**kwargs,
)
else:
self._async_client = None
def _start_server(self) -> None:
try:
if not self._url:
tool_call_flag = self.model_config_dict.get("tools")
tool_call_arg = (
f"--tool-call-parser {self._api_key} "
if tool_call_flag
else ""
)
cmd = (
f"python -m sglang.launch_server "
f"--model-path {self.model_type} "
f"{tool_call_arg}"
f"--port 30000 "
f"--host 0.0.0.0"
)
server_process = _execute_shell_command(cmd)
_wait_for_server(
base_url="http://localhost:30000", timeout=self._timeout
)
self._url = "http://127.0.0.1:30000/v1"
self.server_process = server_process # type: ignore[assignment]
# Start the inactivity monitor in a background thread
self._inactivity_thread = threading.Thread(
target=self._monitor_inactivity, daemon=True
)
self._inactivity_thread.start()
self.last_run_time = time.time()
# Initialize client after server starts if not already set
if self._client is None:
self._client = OpenAI(
timeout=self._timeout,
max_retries=self._max_retries,
api_key="Set-but-ignored", # required but ignored
base_url=self._url,
)
if (
not hasattr(self, '_async_client')
or self._async_client is None
):
self._async_client = AsyncOpenAI(
timeout=self._timeout,
max_retries=self._max_retries,
api_key="Set-but-ignored", # required but ignored
base_url=self._url,
)
except Exception as e:
raise RuntimeError(f"Failed to start SGLang server: {e}") from e
def _ensure_server_running(self) -> None:
r"""Ensures that the server is running. If not, starts the server."""
with self._lock:
if self.server_process is None:
self._start_server()
def _monitor_inactivity(self):
r"""Monitor whether the server process has been inactive for over 10
minutes.
"""
while True:
# Check every 10 seconds
time.sleep(10)
# Over 10 minutes
with self._lock:
# Over 10 minutes
if self.last_run_time and (
time.time() - self.last_run_time > 600
):
if self.server_process:
_terminate_process(self.server_process)
self.server_process = None
self._client = None # Invalidate the client
logging.info(
"Server process terminated due to inactivity."
)
break
@property
def token_counter(self) -> BaseTokenCounter:
r"""Initialize the token counter for the model backend.
Returns:
BaseTokenCounter: The token counter following the model's
tokenization style.
"""
if not self._token_counter:
self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI)
return self._token_counter
@observe(as_type='generation')
async def _arun(
self,
messages: List[OpenAIMessage],
response_format: Optional[Type[BaseModel]] = None,
tools: Optional[List[Dict[str, Any]]] = None,
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
r"""Runs inference of OpenAI chat completion.
Args:
messages (List[OpenAIMessage]): Message list with the chat history
in OpenAI API format.
Returns:
Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
`ChatCompletion` in the non-stream mode, or
`AsyncStream[ChatCompletionChunk]` in the stream mode.
"""
update_current_observation(
input={
"messages": messages,
"tools": tools,
},
model=str(self.model_type),
model_parameters=self.model_config_dict,
)
self._log_and_trace()
# Ensure server is running
self._ensure_server_running()
with self._lock:
# Update last run time
self.last_run_time = time.time()
async_client = self._async_client
if async_client is None:
raise RuntimeError(
"Client is not initialized. Ensure the server is running."
)
# Prepare additional parameters
extra_params: Dict[str, Any] = {}
if response_format is not None:
extra_params["response_format"] = response_format
if tools is not None:
extra_params["tools"] = tools
response = await self._acall_client(
async_client.chat.completions.create,
messages=messages,
model=self.model_type,
**extra_params,
**self.model_config_dict,
)
update_current_observation(
usage_details={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
},
)
return response
@observe(as_type='generation')
def _run(
self,
messages: List[OpenAIMessage],
response_format: Optional[Type[BaseModel]] = None,
tools: Optional[List[Dict[str, Any]]] = None,
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
r"""Runs inference of OpenAI chat completion.
Args:
messages (List[OpenAIMessage]): Message list with the chat history
in OpenAI API format.
Returns:
Union[ChatCompletion, Stream[ChatCompletionChunk]]:
`ChatCompletion` in the non-stream mode, or
`Stream[ChatCompletionChunk]` in the stream mode.
"""
update_current_observation(
input={
"messages": messages,
"tools": tools,
},
model=str(self.model_type),
model_parameters=self.model_config_dict,
)
self._log_and_trace()
# Ensure server is running
self._ensure_server_running()
with self._lock:
# Update last run time
self.last_run_time = time.time()
client = self._client
if client is None:
raise RuntimeError(
"Client is not initialized. Ensure the server is running."
)
# Prepare additional parameters
extra_params: Dict[str, Any] = {}
if response_format is not None:
extra_params["response_format"] = response_format
if tools is not None:
extra_params["tools"] = tools
response = self._call_client(
client.chat.completions.create,
messages=messages,
model=self.model_type,
**extra_params,
**self.model_config_dict,
)
update_current_observation(
usage_details={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
},
)
return response
@property
def stream(self) -> bool:
r"""Returns whether the model is in stream mode, which sends partial
results each time.
Returns:
bool: Whether the model is in stream mode.
"""
return self.model_config_dict.get('stream', False)
def __del__(self):
r"""Properly clean up resources when the model is destroyed."""
self.cleanup()
def cleanup(self):
r"""Terminate the server process and clean up resources."""
with self._lock:
if self.server_process:
_terminate_process(self.server_process)
self.server_process = None
self._client = None
logging.info("Server process terminated during cleanup.")
# Below are helper functions from sglang.utils
def _terminate_process(process):
_kill_process_tree(process.pid)
def _kill_process_tree(
parent_pid, include_parent: bool = True, skip_pid: Optional[int] = None
):
r"""Kill the process and all its child processes."""
import os
import signal
import psutil
if parent_pid is None:
parent_pid = os.getpid()
include_parent = False
try:
itself = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
children = itself.children(recursive=True)
for child in children:
if child.pid == skip_pid:
continue
try:
child.kill()
except psutil.NoSuchProcess:
pass
if include_parent:
try:
itself.kill()
# Sometime processes cannot be killed with SIGKILL
# so we send an additional signal to kill them.
if hasattr(signal, "SIGQUIT"):
itself.send_signal(signal.SIGQUIT)
else:
itself.send_signal(signal.SIGTERM)
except psutil.NoSuchProcess:
pass
def _execute_shell_command(command: str) -> subprocess.Popen:
r"""Execute a shell command and return the process handle
Args:
command: Shell command as a string (can include \\ line continuations)
Returns:
subprocess.Popen: Process handle
"""
import subprocess
# Replace \ newline with space and split
command = command.replace("\\\n", " ").replace("\\", " ")
parts = command.split()
return subprocess.Popen(parts, text=True, stderr=subprocess.STDOUT)
def _wait_for_server(base_url: str, timeout: Optional[float] = 30) -> None:
r"""Wait for the server to be ready by polling the /v1/models endpoint.
Args:
base_url (str): The base URL of the server
timeout (Optional[float]): Maximum time to wait in seconds.
(default: :obj:`30`)
"""
import requests
# Set a default value if timeout is None
actual_timeout = 30 if timeout is None else timeout
start_time = time.time()
while True:
try:
response = requests.get(
f"{base_url}/v1/models",
headers={"Authorization": "Bearer None"},
timeout=5, # Add a timeout for the request itself
)
if response.status_code == 200:
time.sleep(5)
print(
"""\n
NOTE: Typically, the server runs in a separate terminal.
In this notebook, we run the server and notebook code
together, so their outputs are combined.
To improve clarity, the server logs are displayed in the
original black color, while the notebook outputs are
highlighted in blue.
"""
)
break
if time.time() - start_time > actual_timeout:
raise TimeoutError(
f"Server did not become ready within "
f"{actual_timeout} seconds"
)
except (requests.exceptions.RequestException, TimeoutError) as e:
if time.time() - start_time > actual_timeout:
raise TimeoutError(
f"Server did not become ready within "
f"{actual_timeout} seconds: {e}"
)
time.sleep(1)