mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-24 13:43:45 +00:00
610 lines
24 KiB
Python
610 lines
24 KiB
Python
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
import ast
|
|
import difflib
|
|
import importlib
|
|
import os
|
|
import subprocess
|
|
import typing
|
|
from typing import Any, ClassVar, Dict, List, Optional
|
|
|
|
from camel.interpreters.base import BaseInterpreter
|
|
from camel.interpreters.interpreter_error import InterpreterError
|
|
|
|
|
|
class InternalPythonInterpreter(BaseInterpreter):
|
|
r"""A customized python interpreter to control the execution of
|
|
LLM-generated codes. The interpreter makes sure the code can only execute
|
|
functions given in action space and import white list. It also supports
|
|
fuzzy variable matching to retrieve uncertain input variable name.
|
|
|
|
.. highlight:: none
|
|
|
|
This class is adapted from the hugging face implementation
|
|
`python_interpreter.py <https://github.com/huggingface/transformers/blob/8f
|
|
093fb799246f7dd9104ff44728da0c53a9f67a/src/transformers/tools/python_interp
|
|
reter.py>`_. The original license applies::
|
|
|
|
Copyright 2023 The HuggingFace Inc. team. All rights reserved.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
implied. See the License for the specific language governing
|
|
permissions and limitations under the License.
|
|
|
|
We have modified the original code to suit our requirements. We have
|
|
encapsulated the original functions within a class and saved the
|
|
interpreter state after execution. We have added support for "import"
|
|
statements, "for" statements, and several binary and unary operators. We
|
|
have added import white list to keep `import` statement safe. Additionally,
|
|
we have modified the variable matching logic and introduced the
|
|
:obj:`fuzz_state` for fuzzy matching.
|
|
|
|
Modifications copyright (C) 2023 CAMEL-AI.org
|
|
|
|
Args:
|
|
action_space (Dict[str, Any], optional): A dictionary that maps action
|
|
names to their corresponding functions or objects. The interpreter
|
|
can only execute functions that are either directly listed in this
|
|
dictionary or are member functions of objects listed in this
|
|
dictionary. The concept of :obj:`action_space` is derived from
|
|
EmbodiedAgent, representing the actions that an agent is capable of
|
|
performing. If `None`, set to empty dict. (default: :obj:`None`)
|
|
import_white_list (List[str], optional): A list that stores
|
|
the Python modules or functions that can be imported in the code.
|
|
All submodules and functions of the modules listed in this list are
|
|
importable. Any other import statements will be rejected. The
|
|
module and its submodule or function name are separated by a period
|
|
(:obj:`.`). (default: :obj:`None`)
|
|
unsafe_mode (bool, optional): If `True`, the interpreter runs the code
|
|
by `eval()` or `exec()` without any security check.
|
|
(default: :obj:`False`)
|
|
raise_error (bool, optional): Raise error if the interpreter fails.
|
|
(default: :obj:`False`)
|
|
allow_builtins (bool, optional): If `True`, safe built-in functions
|
|
like print, len, str, etc. are added to the action space.
|
|
(default: :obj:`True`)
|
|
"""
|
|
|
|
_CODE_TYPES: ClassVar[List[str]] = ["python", "py", "python3", "python2"]
|
|
|
|
def __init__(
|
|
self,
|
|
action_space: Optional[Dict[str, Any]] = None,
|
|
import_white_list: Optional[List[str]] = None,
|
|
unsafe_mode: bool = False,
|
|
raise_error: bool = False,
|
|
allow_builtins: bool = True,
|
|
) -> None:
|
|
self.action_space = action_space or dict()
|
|
self.state = self.action_space.copy()
|
|
self.fuzz_state: Dict[str, Any] = dict()
|
|
self.import_white_list = import_white_list or list()
|
|
self.raise_error = raise_error
|
|
self.unsafe_mode = unsafe_mode
|
|
|
|
# Add safe built-in functions if allowed
|
|
if allow_builtins:
|
|
self._add_safe_builtins()
|
|
|
|
def _add_safe_builtins(self):
|
|
r"""Add safe built-in functions to the action space."""
|
|
safe_builtins = {
|
|
'print': print,
|
|
'len': len,
|
|
'str': str,
|
|
'int': int,
|
|
'float': float,
|
|
'bool': bool,
|
|
'list': list,
|
|
'dict': dict,
|
|
'tuple': tuple,
|
|
'set': set,
|
|
'abs': abs,
|
|
'min': min,
|
|
'max': max,
|
|
'sum': sum,
|
|
'sorted': sorted,
|
|
'reversed': reversed,
|
|
'enumerate': enumerate,
|
|
'zip': zip,
|
|
'range': range,
|
|
'round': round,
|
|
'type': type,
|
|
'isinstance': isinstance,
|
|
'hasattr': hasattr,
|
|
'getattr': getattr,
|
|
'setattr': setattr,
|
|
'dir': dir,
|
|
'help': help,
|
|
'map': map,
|
|
'filter': filter,
|
|
'any': any,
|
|
'all': all,
|
|
'ord': ord,
|
|
'chr': chr,
|
|
'bin': bin,
|
|
'oct': oct,
|
|
'hex': hex,
|
|
}
|
|
self.action_space.update(safe_builtins)
|
|
self.state.update(safe_builtins)
|
|
|
|
def run(self, code: str, code_type: str = "python") -> str:
|
|
r"""Executes the given code with specified code type in the
|
|
interpreter.
|
|
|
|
This method takes a string of code and its type, checks if the code
|
|
type is supported, and then executes the code. If `unsafe_mode` is
|
|
set to `False`, the code is executed in a controlled environment using
|
|
the `execute` method. If `unsafe_mode` is `True`, the code is executed
|
|
using `eval()` or `exec()` with the action space as the global context.
|
|
An `InterpreterError` is raised if the code type is unsupported or if
|
|
any runtime error occurs during execution.
|
|
|
|
Args:
|
|
code (str): The python code to be executed.
|
|
code_type (str): The type of the code, which should be one of the
|
|
supported code types (`python`, `py`, `python3`, `python2`).
|
|
(default: obj:`python`)
|
|
|
|
Returns:
|
|
str: The string representation of the output of the executed code.
|
|
|
|
Raises:
|
|
InterpreterError: If the `code_type` is not supported or if any
|
|
runtime error occurs during the execution of the code.
|
|
"""
|
|
if code_type not in self._CODE_TYPES:
|
|
raise InterpreterError(
|
|
f"Unsupported code type {code_type}. "
|
|
f"`{self.__class__.__name__}` only supports "
|
|
f"{', '.join(self._CODE_TYPES)}."
|
|
)
|
|
if self.unsafe_mode:
|
|
import contextlib
|
|
import io
|
|
|
|
# Try to execute first and capture stdout
|
|
output_buffer = io.StringIO()
|
|
with contextlib.redirect_stdout(output_buffer):
|
|
exec(code, self.action_space)
|
|
result = output_buffer.getvalue()
|
|
|
|
# If no output was captured, try to evaluate the code
|
|
if not result:
|
|
try:
|
|
result = str(eval(code, self.action_space))
|
|
except (SyntaxError, NameError):
|
|
result = "" # If eval fails, return empty string
|
|
|
|
return result
|
|
else:
|
|
return str(self.execute(code))
|
|
|
|
def update_action_space(self, action_space: Dict[str, Any]) -> None:
|
|
r"""Updates action space for *python* interpreter."""
|
|
self.action_space.update(action_space)
|
|
|
|
def supported_code_types(self) -> List[str]:
|
|
r"""Provides supported code types by the interpreter."""
|
|
return self._CODE_TYPES
|
|
|
|
def execute(
|
|
self,
|
|
code: str,
|
|
state: Optional[Dict[str, Any]] = None,
|
|
fuzz_state: Optional[Dict[str, Any]] = None,
|
|
keep_state: bool = True,
|
|
) -> Any:
|
|
r"""Execute the input python codes in a security environment.
|
|
|
|
Args:
|
|
code (str): Generated python code to be executed.
|
|
state (Optional[Dict[str, Any]], optional): External variables that
|
|
may be used in the generated code. (default: :obj:`None`)
|
|
fuzz_state (Optional[Dict[str, Any]], optional): External variables
|
|
that do not have certain variable names. The interpreter will
|
|
use fuzzy matching to access these variables. For example, if
|
|
:obj:`fuzz_state` has a variable :obj:`image`, the generated
|
|
code can use :obj:`input_image` to access it. (default:
|
|
:obj:`None`)
|
|
keep_state (bool, optional): If :obj:`True`, :obj:`state` and
|
|
:obj:`fuzz_state` will be kept for later execution. Otherwise,
|
|
they will be cleared. (default: :obj:`True`)
|
|
|
|
Returns:
|
|
Any: The value of the last statement (excluding "import") in the
|
|
code. For this interpreter, the value of an expression is its
|
|
value, the value of an "assign" statement is the assigned
|
|
value, and the value of an "if" and "for" block statement is
|
|
the value of the last statement in the block.
|
|
"""
|
|
if state is not None:
|
|
self.state.update(state)
|
|
if fuzz_state is not None:
|
|
self.fuzz_state.update(fuzz_state)
|
|
|
|
try:
|
|
expression = ast.parse(code)
|
|
except SyntaxError as e:
|
|
if self.raise_error:
|
|
raise InterpreterError(f"Syntax error in code: {e}")
|
|
else:
|
|
import traceback
|
|
|
|
return traceback.format_exc()
|
|
|
|
result = None
|
|
for idx, node in enumerate(expression.body):
|
|
try:
|
|
line_result = self._execute_ast(node)
|
|
except InterpreterError as e:
|
|
if not keep_state:
|
|
self.clear_state()
|
|
msg = (
|
|
f"Evaluation of the code stopped at node {idx}. "
|
|
f"See:\n{e}"
|
|
)
|
|
# More information can be provided by `ast.unparse()`,
|
|
# which is new in python 3.9.
|
|
if self.raise_error:
|
|
raise InterpreterError(msg)
|
|
else:
|
|
import traceback
|
|
|
|
return traceback.format_exc()
|
|
if line_result is not None:
|
|
result = line_result
|
|
|
|
if not keep_state:
|
|
self.clear_state()
|
|
|
|
return result
|
|
|
|
def clear_state(self) -> None:
|
|
r"""Initialize :obj:`state` and :obj:`fuzz_state`."""
|
|
self.state = self.action_space.copy()
|
|
self.fuzz_state = {}
|
|
|
|
# ast.Index is deprecated after python 3.9, which cannot pass type check,
|
|
# but is still necessary for older versions.
|
|
@typing.no_type_check
|
|
def _execute_ast(self, expression: ast.AST) -> Any:
|
|
if isinstance(expression, ast.Assign):
|
|
# Assignment -> evaluate the assignment which should
|
|
# update the state. We return the variable assigned as it may
|
|
# be used to determine the final result.
|
|
return self._execute_assign(expression)
|
|
elif isinstance(expression, ast.Attribute):
|
|
value = self._execute_ast(expression.value)
|
|
return getattr(value, expression.attr)
|
|
elif isinstance(expression, ast.BinOp):
|
|
# Binary Operator -> return the result value
|
|
return self._execute_binop(expression)
|
|
elif isinstance(expression, ast.Call):
|
|
# Function call -> return the value of the function call
|
|
return self._execute_call(expression)
|
|
elif isinstance(expression, ast.Compare):
|
|
# Compare -> return True or False
|
|
return self._execute_condition(expression)
|
|
elif isinstance(expression, ast.Constant):
|
|
# Constant -> just return the value
|
|
return expression.value
|
|
elif isinstance(expression, ast.Dict):
|
|
# Dict -> evaluate all keys and values
|
|
result: Dict = {}
|
|
for k, v in zip(expression.keys, expression.values):
|
|
if k is not None:
|
|
result[self._execute_ast(k)] = self._execute_ast(v)
|
|
else:
|
|
result.update(self._execute_ast(v))
|
|
return result
|
|
elif isinstance(expression, ast.Expr):
|
|
# Expression -> evaluate the content
|
|
return self._execute_ast(expression.value)
|
|
elif isinstance(expression, ast.For):
|
|
return self._execute_for(expression)
|
|
elif isinstance(expression, ast.FormattedValue):
|
|
# Formatted value (part of f-string) -> evaluate the content
|
|
# and return
|
|
return self._execute_ast(expression.value)
|
|
elif isinstance(expression, ast.If):
|
|
# If -> execute the right branch
|
|
return self._execute_if(expression)
|
|
elif isinstance(expression, ast.Import):
|
|
# Import -> add imported names in self.state and return None.
|
|
self._execute_import(expression)
|
|
return None
|
|
elif isinstance(expression, ast.ImportFrom):
|
|
self._execute_import_from(expression)
|
|
return None
|
|
elif hasattr(ast, "Index") and isinstance(expression, ast.Index):
|
|
# cannot pass type check
|
|
return self._execute_ast(expression.value)
|
|
elif isinstance(expression, ast.JoinedStr):
|
|
return "".join(
|
|
[str(self._execute_ast(v)) for v in expression.values]
|
|
)
|
|
elif isinstance(expression, ast.List):
|
|
# List -> evaluate all elements
|
|
return [self._execute_ast(elt) for elt in expression.elts]
|
|
elif isinstance(expression, ast.Name):
|
|
# Name -> pick up the value in the state
|
|
return self._execute_name(expression)
|
|
elif isinstance(expression, ast.Subscript):
|
|
# Subscript -> return the value of the indexing
|
|
return self._execute_subscript(expression)
|
|
elif isinstance(expression, ast.Tuple):
|
|
return tuple([self._execute_ast(elt) for elt in expression.elts])
|
|
elif isinstance(expression, ast.UnaryOp):
|
|
# Binary Operator -> return the result value
|
|
return self._execute_unaryop(expression)
|
|
else:
|
|
# For now we refuse anything else. Let's add things as we need
|
|
# them.
|
|
raise InterpreterError(
|
|
f"{expression.__class__.__name__} is not supported."
|
|
)
|
|
|
|
def _execute_assign(self, assign: ast.Assign) -> Any:
|
|
targets = assign.targets
|
|
result = self._execute_ast(assign.value)
|
|
|
|
for target in targets:
|
|
self._assign(target, result)
|
|
return result
|
|
|
|
def _assign(self, target: ast.expr, value: Any):
|
|
if isinstance(target, ast.Name):
|
|
self.state[target.id] = value
|
|
elif isinstance(target, ast.Tuple):
|
|
if not isinstance(value, tuple):
|
|
raise InterpreterError(
|
|
f"Expected type tuple, but got"
|
|
f"{value.__class__.__name__} instead."
|
|
)
|
|
if len(target.elts) != len(value):
|
|
raise InterpreterError(
|
|
f"Expected {len(target.elts)} values but got"
|
|
f" {len(value)}."
|
|
)
|
|
for t, v in zip(target.elts, value):
|
|
self.state[self._execute_ast(t)] = v
|
|
else:
|
|
raise InterpreterError(
|
|
f"Unsupported variable type. Expected "
|
|
f"ast.Name or ast.Tuple, got "
|
|
f"{target.__class__.__name__} instead."
|
|
)
|
|
|
|
def _execute_call(self, call: ast.Call) -> Any:
|
|
callable_func = self._execute_ast(call.func)
|
|
|
|
# Todo deal with args
|
|
args = [self._execute_ast(arg) for arg in call.args]
|
|
kwargs = {
|
|
keyword.arg: self._execute_ast(keyword.value)
|
|
for keyword in call.keywords
|
|
}
|
|
return callable_func(*args, **kwargs)
|
|
|
|
def _execute_subscript(self, subscript: ast.Subscript):
|
|
index = self._execute_ast(subscript.slice)
|
|
value = self._execute_ast(subscript.value)
|
|
if not isinstance(subscript.ctx, ast.Load):
|
|
raise InterpreterError(
|
|
f"{subscript.ctx.__class__.__name__} is not supported for "
|
|
"subscript."
|
|
)
|
|
if isinstance(value, (list, tuple)):
|
|
return value[int(index)]
|
|
if index in value:
|
|
return value[index]
|
|
if isinstance(index, str) and isinstance(value, dict):
|
|
close_matches = difflib.get_close_matches(
|
|
index,
|
|
[key for key in list(value.keys()) if isinstance(key, str)],
|
|
)
|
|
if len(close_matches) > 0:
|
|
return value[close_matches[0]]
|
|
|
|
raise InterpreterError(f"Could not index {value} with '{index}'.")
|
|
|
|
def _execute_name(self, name: ast.Name):
|
|
if isinstance(name.ctx, ast.Store):
|
|
return name.id
|
|
elif isinstance(name.ctx, ast.Load):
|
|
return self._get_value_from_state(name.id)
|
|
else:
|
|
raise InterpreterError(f"{name.ctx} is not supported.")
|
|
|
|
def _execute_condition(self, condition: ast.Compare):
|
|
if len(condition.ops) > 1:
|
|
raise InterpreterError(
|
|
"Cannot evaluate conditions with multiple operators"
|
|
)
|
|
|
|
left = self._execute_ast(condition.left)
|
|
comparator = condition.ops[0]
|
|
right = self._execute_ast(condition.comparators[0])
|
|
|
|
if isinstance(comparator, ast.Eq):
|
|
return left == right
|
|
elif isinstance(comparator, ast.NotEq):
|
|
return left != right
|
|
elif isinstance(comparator, ast.Lt):
|
|
return left < right
|
|
elif isinstance(comparator, ast.LtE):
|
|
return left <= right
|
|
elif isinstance(comparator, ast.Gt):
|
|
return left > right
|
|
elif isinstance(comparator, ast.GtE):
|
|
return left >= right
|
|
elif isinstance(comparator, ast.Is):
|
|
return left is right
|
|
elif isinstance(comparator, ast.IsNot):
|
|
return left is not right
|
|
elif isinstance(comparator, ast.In):
|
|
return left in right
|
|
elif isinstance(comparator, ast.NotIn):
|
|
return left not in right
|
|
else:
|
|
raise InterpreterError(f"Unsupported operator: {comparator}")
|
|
|
|
def _execute_if(self, if_statement: ast.If):
|
|
result = None
|
|
if not isinstance(if_statement.test, ast.Compare):
|
|
raise InterpreterError(
|
|
"Only Compare expr supported in if statement, get"
|
|
f" {if_statement.test.__class__.__name__}"
|
|
)
|
|
if self._execute_condition(if_statement.test):
|
|
for line in if_statement.body:
|
|
line_result = self._execute_ast(line)
|
|
if line_result is not None:
|
|
result = line_result
|
|
else:
|
|
for line in if_statement.orelse:
|
|
line_result = self._execute_ast(line)
|
|
if line_result is not None:
|
|
result = line_result
|
|
return result
|
|
|
|
def _execute_for(self, for_statement: ast.For):
|
|
result = None
|
|
for value in self._execute_ast(for_statement.iter):
|
|
self._assign(for_statement.target, value)
|
|
for line in for_statement.body:
|
|
line_result = self._execute_ast(line)
|
|
if line_result is not None:
|
|
result = line_result
|
|
|
|
return result
|
|
|
|
def _execute_import(self, import_module: ast.Import) -> None:
|
|
for module in import_module.names:
|
|
self._validate_import(module.name)
|
|
alias = module.asname or module.name
|
|
self.state[alias] = importlib.import_module(module.name)
|
|
|
|
def _execute_import_from(self, import_from: ast.ImportFrom):
|
|
if import_from.module is None:
|
|
raise InterpreterError("\"from . import\" is not supported.")
|
|
for import_name in import_from.names:
|
|
full_name = import_from.module + f".{import_name.name}"
|
|
self._validate_import(full_name)
|
|
imported_module = importlib.import_module(import_from.module)
|
|
alias = import_name.asname or import_name.name
|
|
self.state[alias] = getattr(imported_module, import_name.name)
|
|
|
|
def _validate_import(self, full_name: str):
|
|
tmp_name = ""
|
|
found_name = False
|
|
for name in full_name.split("."):
|
|
tmp_name += name if tmp_name == "" else f".{name}"
|
|
if tmp_name in self.import_white_list:
|
|
found_name = True
|
|
return
|
|
|
|
if not found_name:
|
|
raise InterpreterError(
|
|
f"It is not permitted to import modules "
|
|
f"than module white list (try to import "
|
|
f"{full_name})."
|
|
)
|
|
|
|
def _execute_binop(self, binop: ast.BinOp):
|
|
left = self._execute_ast(binop.left)
|
|
operator = binop.op
|
|
right = self._execute_ast(binop.right)
|
|
|
|
if isinstance(operator, ast.Add):
|
|
return left + right
|
|
elif isinstance(operator, ast.Sub):
|
|
return left - right
|
|
elif isinstance(operator, ast.Mult):
|
|
return left * right
|
|
elif isinstance(operator, ast.Div):
|
|
return left / right
|
|
elif isinstance(operator, ast.FloorDiv):
|
|
return left // right
|
|
elif isinstance(operator, ast.Mod):
|
|
return left % right
|
|
elif isinstance(operator, ast.Pow):
|
|
return left**right
|
|
elif isinstance(operator, ast.LShift):
|
|
return left << right
|
|
elif isinstance(operator, ast.RShift):
|
|
return left >> right
|
|
elif isinstance(operator, ast.MatMult):
|
|
return left @ right
|
|
else:
|
|
raise InterpreterError(f"Operator not supported: {operator}")
|
|
|
|
def _execute_unaryop(self, unaryop: ast.UnaryOp):
|
|
operand = self._execute_ast(unaryop.operand)
|
|
operator = unaryop.op
|
|
|
|
if isinstance(operator, ast.UAdd):
|
|
return +operand
|
|
elif isinstance(operator, ast.USub):
|
|
return -operand
|
|
elif isinstance(operator, ast.Not):
|
|
return not operand
|
|
else:
|
|
raise InterpreterError(f"Operator not supported: {operator}")
|
|
|
|
def _get_value_from_state(self, key: str) -> Any:
|
|
if key in self.state:
|
|
return self.state[key]
|
|
else:
|
|
close_matches = difflib.get_close_matches(
|
|
key, list(self.fuzz_state.keys()), n=1
|
|
)
|
|
if close_matches:
|
|
return self.fuzz_state[close_matches[0]]
|
|
else:
|
|
raise InterpreterError(f"The variable `{key}` is not defined.")
|
|
|
|
def execute_command(self, command: str) -> tuple[str, str]:
|
|
r"""Execute a command in the internal python interpreter.
|
|
|
|
Args:
|
|
command (str): The command to execute.
|
|
|
|
Returns:
|
|
tuple: A tuple containing the stdout and stderr of the command.
|
|
"""
|
|
try:
|
|
proc = subprocess.Popen(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
env=os.environ,
|
|
shell=True,
|
|
)
|
|
stdout, stderr = proc.communicate()
|
|
|
|
return stdout, stderr
|
|
except Exception as e:
|
|
raise InterpreterError(f"Error executing command: {e}")
|