eigent/backend/camel/utils/langfuse.py
2026-03-31 17:20:08 +08:00

266 lines
8.7 KiB
Python

# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
import os
from contextvars import ContextVar
from typing import Any, Dict, List, Optional
from camel.logger import get_logger
from camel.utils import dependencies_required
logger = get_logger(__name__)
_agent_session_id_var: ContextVar[Optional[str]] = ContextVar(
'agent_session_id', default=None
)
# Global flag to track if Langfuse has been configured
_langfuse_configured = False
try:
from langfuse.decorators import langfuse_context
LANGFUSE_AVAILABLE = True
except ImportError:
LANGFUSE_AVAILABLE = False
@dependencies_required('langfuse')
def configure_langfuse(
public_key: Optional[str] = None,
secret_key: Optional[str] = None,
host: Optional[str] = None,
debug: Optional[bool] = None,
enabled: Optional[bool] = None,
):
r"""Configure Langfuse for CAMEL models.
Args:
public_key(Optional[str]): Langfuse public key. Can be set via LANGFUSE_PUBLIC_KEY.
(default: :obj:`None`)
secret_key(Optional[str]): Langfuse secret key. Can be set via LANGFUSE_SECRET_KEY.
(default: :obj:`None`)
host(Optional[str]): Langfuse host URL. Can be set via LANGFUSE_HOST.
(default: :obj:`https://cloud.langfuse.com`)
debug(Optional[bool]): Enable debug mode. Can be set via LANGFUSE_DEBUG.
(default: :obj:`None`)
enabled(Optional[bool]): Enable/disable tracing. Can be set via LANGFUSE_ENABLED.
(default: :obj:`None`)
Note:
This function configures the native langfuse_context which works with
@observe() decorators. Set enabled=False to disable all tracing.
""" # noqa: E501
global _langfuse_configured
# Get configuration from environment or parameters
public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
host = host or os.environ.get(
"LANGFUSE_HOST", "https://cloud.langfuse.com"
)
debug = (
debug
if debug is not None
else os.environ.get("LANGFUSE_DEBUG", "False").lower() == "true"
)
# Handle enabled parameter
if enabled is None:
env_enabled_str = os.environ.get("LANGFUSE_ENABLED")
if env_enabled_str is not None:
enabled = env_enabled_str.lower() == "true"
else:
enabled = False # Default to disabled
# If not enabled, don't configure anything and don't call langfuse function
if not enabled:
_langfuse_configured = False
logger.info("Langfuse tracing disabled for CAMEL models")
return
logger.debug(
f"Configuring Langfuse - enabled: {enabled}, "
f"public_key: {'***' + public_key[-4:] if public_key else None}, "
f"host: {host}, debug: {debug}"
)
if enabled and public_key and secret_key and LANGFUSE_AVAILABLE:
_langfuse_configured = True
else:
_langfuse_configured = False
try:
# Configure langfuse_context with native method
langfuse_context.configure(
public_key=public_key,
secret_key=secret_key,
host=host,
debug=debug,
enabled=True, # Always True here since we checked enabled above
)
logger.info("Langfuse tracing enabled for CAMEL models")
except Exception as e:
logger.error(f"Failed to configure Langfuse: {e}")
def is_langfuse_available() -> bool:
r"""Check if Langfuse is configured."""
return _langfuse_configured
def set_current_agent_session_id(session_id: str) -> None:
r"""Set the session ID for the current agent in context-local storage.
This is safe to use in both sync and async contexts.
In async contexts, each coroutine maintains its own value.
Args:
session_id(str): The session ID to set for the current agent.
"""
_agent_session_id_var.set(session_id)
def get_current_agent_session_id() -> Optional[str]:
r"""Get the session ID for the current agent from context-local storage.
This is safe to use in both sync and async contexts.
In async contexts, returns the value for the current coroutine.
Returns:
Optional[str]: The session ID for the current agent.
"""
if is_langfuse_available():
return _agent_session_id_var.get()
return None
def update_langfuse_trace(
session_id: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None,
) -> bool:
r"""Update the current Langfuse trace with session ID and metadata.
Args:
session_id(Optional[str]): Optional session ID to use. If :obj:`None`
uses the current agent's session ID. (default: :obj:`None`)
user_id(Optional[str]): Optional user ID for the trace.
(default: :obj:`None`)
metadata(Optional[Dict[str, Any]]): Optional metadata dictionary.
(default: :obj:`None`)
tags(Optional[List[str]]): Optional list of tags.
(default: :obj:`None`)
Returns:
bool: True if update was successful, False otherwise.
"""
if not is_langfuse_available():
return False
# Use provided session_id or get from thread-local storage
final_session_id = session_id or get_current_agent_session_id()
update_data: Dict[str, Any] = {}
if final_session_id:
update_data["session_id"] = final_session_id
if user_id:
update_data["user_id"] = user_id
if metadata:
update_data["metadata"] = metadata
if tags:
update_data["tags"] = tags
if update_data:
langfuse_context.update_current_trace(**update_data)
return True
return False
def update_current_observation(
input: Optional[Dict[str, Any]] = None,
output: Optional[Dict[str, Any]] = None,
model: Optional[str] = None,
model_parameters: Optional[Dict[str, Any]] = None,
usage_details: Optional[Dict[str, Any]] = None,
**kwargs,
) -> None:
r"""Update the current Langfuse observation with input, output,
model, model_parameters, and usage_details.
Args:
input(Optional[Dict[str, Any]]): Optional input dictionary.
(default: :obj:`None`)
output(Optional[Dict[str, Any]]): Optional output dictionary.
(default: :obj:`None`)
model(Optional[str]): Optional model name. (default: :obj:`None`)
model_parameters(Optional[Dict[str, Any]]): Optional model parameters
dictionary. (default: :obj:`None`)
usage_details(Optional[Dict[str, Any]]): Optional usage details
dictionary. (default: :obj:`None`)
Returns:
None
"""
if not is_langfuse_available():
return
langfuse_context.update_current_observation(
input=input,
output=output,
model=model,
model_parameters=model_parameters,
usage_details=usage_details,
**kwargs,
)
def get_langfuse_status() -> Dict[str, Any]:
r"""Get detailed Langfuse configuration status for debugging.
Returns:
Dict[str, Any]: Status information including configuration state.
"""
env_enabled_str = os.environ.get("LANGFUSE_ENABLED")
env_enabled = (
env_enabled_str.lower() == "true" if env_enabled_str else None
)
status = {
"configured": _langfuse_configured,
"has_public_key": bool(os.environ.get("LANGFUSE_PUBLIC_KEY")),
"has_secret_key": bool(os.environ.get("LANGFUSE_SECRET_KEY")),
"env_enabled": env_enabled,
"host": os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
"debug": os.environ.get("LANGFUSE_DEBUG", "false").lower() == "true",
"current_session_id": get_current_agent_session_id(),
}
if _langfuse_configured:
try:
# Try to get some context information
status["langfuse_context_available"] = True
except Exception as e:
status["langfuse_context_error"] = str(e)
return status
def observe(*args, **kwargs):
def decorator(func):
return func
return decorator