mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-17 04:01:13 +00:00
Forward reset=true from code_execution_remote replacement commands to the connected CLI and document when to use it versus runtime=reset. This lets the CLI tear down stuck host sessions before running the next command. Tests from /home/eclypso/a0/a0-connector: PYTHONPATH=src conda run -n a0 pytest tests/test_plugin_backend.py::test_code_execution_remote_forwards_reset_true_with_replacement_command -v; ./.venv/bin/python -m pytest tests/test_plugin_backend.py -k 'code_execution_remote or select_remote_exec or ws_connector_exec_result' -v. Mirrored to live container 07e0288dc04f and health check returned HTTP 200.
241 lines
8.5 KiB
Python
241 lines
8.5 KiB
Python
"""code_execution_remote tool — run shell-backed frontend operations on the CLI machine via `/ws`."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from helpers.tool import Response, Tool
|
|
from helpers.ws import NAMESPACE
|
|
from helpers.ws_manager import ConnectionNotFoundError, get_shared_ws_manager
|
|
|
|
from plugins._a0_connector.helpers.ws_runtime import (
|
|
clear_pending_exec_op,
|
|
remote_exec_metadata_for_sid,
|
|
remote_file_metadata_for_sid,
|
|
remote_tool_sids_for_context,
|
|
select_remote_exec_target_sid,
|
|
store_pending_exec_op,
|
|
)
|
|
|
|
|
|
EXEC_OP_TIMEOUT = 120.0
|
|
EXEC_OP_EVENT = "connector_exec_op"
|
|
|
|
|
|
class CodeExecutionRemote(Tool):
|
|
"""Send shell-backed frontend execution operations to the connected CLI machine."""
|
|
|
|
@staticmethod
|
|
def _runtime_requires_write_access(runtime: str) -> bool:
|
|
return runtime in {"terminal", "python", "nodejs", "input"}
|
|
|
|
@staticmethod
|
|
def _coerce_bool(value: Any) -> bool:
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, (int, float)):
|
|
return bool(value)
|
|
if isinstance(value, str):
|
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
|
return False
|
|
|
|
def get_log_object(self):
|
|
import uuid
|
|
|
|
return self.agent.context.log.log(
|
|
type="code_exe",
|
|
heading=self.get_heading(),
|
|
content="",
|
|
kvps=self.args,
|
|
id=str(uuid.uuid4()),
|
|
)
|
|
|
|
def get_heading(self, text: str = "") -> str:
|
|
if not text:
|
|
name = str(getattr(self, "name", "code_execution_remote"))
|
|
runtime = str(self.args.get("runtime", "unknown") or "unknown")
|
|
text = f"{name} - {runtime}"
|
|
|
|
normalized = " ".join(str(text).split())
|
|
if len(normalized) > 200:
|
|
normalized = normalized[:197].rstrip() + "..."
|
|
|
|
session = self.args.get("session", None)
|
|
session_text = f"[{session}] " if session or session == 0 else ""
|
|
return f"icon://terminal {session_text}{normalized}"
|
|
|
|
async def execute(self, **kwargs: Any) -> Response:
|
|
runtime = str(self.args.get("runtime", "")).strip().lower()
|
|
if runtime not in {"terminal", "python", "nodejs", "output", "input", "reset"}:
|
|
return Response(
|
|
message=(
|
|
"runtime is required (terminal, python, nodejs, output, reset, "
|
|
"or input [deprecated compatibility alias])"
|
|
),
|
|
break_loop=False,
|
|
)
|
|
|
|
context_id = self.agent.context.id
|
|
candidates = remote_tool_sids_for_context(context_id)
|
|
require_writes = self._runtime_requires_write_access(runtime)
|
|
sid = select_remote_exec_target_sid(context_id, require_writes=require_writes)
|
|
if not sid:
|
|
exec_enabled = False
|
|
write_blocked = False
|
|
for candidate_sid in candidates:
|
|
exec_metadata = remote_exec_metadata_for_sid(candidate_sid)
|
|
if exec_metadata is None or not exec_metadata.get("enabled"):
|
|
continue
|
|
exec_enabled = True
|
|
if not require_writes:
|
|
break
|
|
file_metadata = remote_file_metadata_for_sid(candidate_sid)
|
|
if file_metadata is None or (
|
|
not file_metadata.get("enabled", True)
|
|
or not file_metadata.get("write_enabled")
|
|
):
|
|
write_blocked = True
|
|
|
|
return Response(
|
|
message=(
|
|
"code_execution_remote: no connected CLI currently allows "
|
|
"shell-backed execution that may modify local files. Press F3 to switch "
|
|
"the CLI to Read&Write. `runtime=output` and `runtime=reset` remain "
|
|
"available for existing sessions."
|
|
if candidates and require_writes and exec_enabled and write_blocked
|
|
else "code_execution_remote: no connected CLI currently has "
|
|
"remote execution enabled. Connect the CLI and press F4 to switch exec on."
|
|
if candidates
|
|
else "code_execution_remote: no CLI client connected to Agent Zero. "
|
|
"Make sure the CLI is connected to this instance."
|
|
),
|
|
break_loop=False,
|
|
)
|
|
|
|
try:
|
|
session = int(self.args.get("session", 0) or 0)
|
|
except (TypeError, ValueError):
|
|
return Response(
|
|
message="session must be an integer",
|
|
break_loop=False,
|
|
)
|
|
|
|
op_id = str(uuid.uuid4())
|
|
payload: dict[str, Any] = {
|
|
"op_id": op_id,
|
|
"runtime": runtime,
|
|
"session": session,
|
|
"context_id": context_id,
|
|
}
|
|
if runtime != "reset" and self._coerce_bool(self.args.get("reset")):
|
|
payload["reset"] = True
|
|
|
|
if runtime in {"terminal", "python", "nodejs"}:
|
|
code = self.args.get("code")
|
|
if code is None or not str(code).strip():
|
|
return Response(
|
|
message=f"code is required for runtime={runtime}",
|
|
break_loop=False,
|
|
)
|
|
payload["code"] = str(code)
|
|
|
|
elif runtime == "input":
|
|
keyboard = self.args.get("keyboard")
|
|
if keyboard is None:
|
|
keyboard = self.args.get("code")
|
|
if keyboard is None:
|
|
return Response(
|
|
message="keyboard is required for runtime=input",
|
|
break_loop=False,
|
|
)
|
|
payload["keyboard"] = str(keyboard)
|
|
|
|
elif runtime == "reset":
|
|
reason = self.args.get("reason")
|
|
if reason is not None:
|
|
payload["reason"] = str(reason)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
future: asyncio.Future[dict[str, Any]] = loop.create_future()
|
|
store_pending_exec_op(
|
|
op_id,
|
|
sid=sid,
|
|
future=future,
|
|
loop=loop,
|
|
context_id=context_id,
|
|
)
|
|
|
|
try:
|
|
await get_shared_ws_manager().emit_to(
|
|
NAMESPACE,
|
|
sid,
|
|
EXEC_OP_EVENT,
|
|
payload,
|
|
handler_id=f"{self.__class__.__module__}.{self.__class__.__name__}",
|
|
)
|
|
result = await asyncio.wait_for(future, timeout=EXEC_OP_TIMEOUT)
|
|
except ConnectionNotFoundError:
|
|
clear_pending_exec_op(op_id)
|
|
return Response(
|
|
message=(
|
|
"code_execution_remote: the selected CLI client disconnected before "
|
|
"the execution request could be delivered"
|
|
),
|
|
break_loop=False,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
clear_pending_exec_op(op_id)
|
|
return Response(
|
|
message=(
|
|
"code_execution_remote: timed out waiting for CLI to respond "
|
|
f"to runtime={runtime!r} in session {session}"
|
|
),
|
|
break_loop=False,
|
|
)
|
|
except Exception as exc:
|
|
clear_pending_exec_op(op_id)
|
|
return Response(
|
|
message=f"code_execution_remote: error sending exec_op: {exc}",
|
|
break_loop=False,
|
|
)
|
|
finally:
|
|
clear_pending_exec_op(op_id)
|
|
|
|
return Response(
|
|
message=self._extract_result(result, runtime, session),
|
|
break_loop=False,
|
|
)
|
|
|
|
def _extract_result(self, result: Any, runtime: str, session: int) -> str:
|
|
if not isinstance(result, dict):
|
|
return f"Unexpected response format from CLI: {result!r}"
|
|
|
|
ok = bool(result.get("ok"))
|
|
data = result.get("result")
|
|
error = result.get("error")
|
|
|
|
if not ok:
|
|
return (
|
|
f"Error (runtime={runtime!r}, session={session}): "
|
|
f"{error or 'Unknown error'}"
|
|
)
|
|
|
|
if not isinstance(data, dict):
|
|
data = {}
|
|
|
|
output = str(data.get("output") or data.get("text") or "").strip()
|
|
message = str(data.get("message") or "").strip()
|
|
running = bool(data.get("running"))
|
|
|
|
parts: list[str] = []
|
|
if message:
|
|
parts.append(message)
|
|
if output:
|
|
parts.append(output)
|
|
|
|
if not parts:
|
|
state = "running" if running else "completed"
|
|
parts.append(f"Session {session} {state}.")
|
|
|
|
return "\n\n".join(parts)
|