agent-zero/plugins/_a0_connector/tools/code_execution_remote.py
Alessandro b48e31bead Forward remote exec reset flag
Forward reset=true from code_execution_remote replacement commands to the connected CLI and document when to use it versus runtime=reset. This lets the CLI tear down stuck host sessions before running the next command.

Tests from /home/eclypso/a0/a0-connector: PYTHONPATH=src conda run -n a0 pytest tests/test_plugin_backend.py::test_code_execution_remote_forwards_reset_true_with_replacement_command -v; ./.venv/bin/python -m pytest tests/test_plugin_backend.py -k 'code_execution_remote or select_remote_exec or ws_connector_exec_result' -v. Mirrored to live container 07e0288dc04f and health check returned HTTP 200.
2026-05-15 00:28:10 +02:00

241 lines
8.5 KiB
Python

"""code_execution_remote tool — run shell-backed frontend operations on the CLI machine via `/ws`."""
from __future__ import annotations
import asyncio
import uuid
from typing import Any
from helpers.tool import Response, Tool
from helpers.ws import NAMESPACE
from helpers.ws_manager import ConnectionNotFoundError, get_shared_ws_manager
from plugins._a0_connector.helpers.ws_runtime import (
clear_pending_exec_op,
remote_exec_metadata_for_sid,
remote_file_metadata_for_sid,
remote_tool_sids_for_context,
select_remote_exec_target_sid,
store_pending_exec_op,
)
EXEC_OP_TIMEOUT = 120.0
EXEC_OP_EVENT = "connector_exec_op"
class CodeExecutionRemote(Tool):
"""Send shell-backed frontend execution operations to the connected CLI machine."""
@staticmethod
def _runtime_requires_write_access(runtime: str) -> bool:
return runtime in {"terminal", "python", "nodejs", "input"}
@staticmethod
def _coerce_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
return False
def get_log_object(self):
import uuid
return self.agent.context.log.log(
type="code_exe",
heading=self.get_heading(),
content="",
kvps=self.args,
id=str(uuid.uuid4()),
)
def get_heading(self, text: str = "") -> str:
if not text:
name = str(getattr(self, "name", "code_execution_remote"))
runtime = str(self.args.get("runtime", "unknown") or "unknown")
text = f"{name} - {runtime}"
normalized = " ".join(str(text).split())
if len(normalized) > 200:
normalized = normalized[:197].rstrip() + "..."
session = self.args.get("session", None)
session_text = f"[{session}] " if session or session == 0 else ""
return f"icon://terminal {session_text}{normalized}"
async def execute(self, **kwargs: Any) -> Response:
runtime = str(self.args.get("runtime", "")).strip().lower()
if runtime not in {"terminal", "python", "nodejs", "output", "input", "reset"}:
return Response(
message=(
"runtime is required (terminal, python, nodejs, output, reset, "
"or input [deprecated compatibility alias])"
),
break_loop=False,
)
context_id = self.agent.context.id
candidates = remote_tool_sids_for_context(context_id)
require_writes = self._runtime_requires_write_access(runtime)
sid = select_remote_exec_target_sid(context_id, require_writes=require_writes)
if not sid:
exec_enabled = False
write_blocked = False
for candidate_sid in candidates:
exec_metadata = remote_exec_metadata_for_sid(candidate_sid)
if exec_metadata is None or not exec_metadata.get("enabled"):
continue
exec_enabled = True
if not require_writes:
break
file_metadata = remote_file_metadata_for_sid(candidate_sid)
if file_metadata is None or (
not file_metadata.get("enabled", True)
or not file_metadata.get("write_enabled")
):
write_blocked = True
return Response(
message=(
"code_execution_remote: no connected CLI currently allows "
"shell-backed execution that may modify local files. Press F3 to switch "
"the CLI to Read&Write. `runtime=output` and `runtime=reset` remain "
"available for existing sessions."
if candidates and require_writes and exec_enabled and write_blocked
else "code_execution_remote: no connected CLI currently has "
"remote execution enabled. Connect the CLI and press F4 to switch exec on."
if candidates
else "code_execution_remote: no CLI client connected to Agent Zero. "
"Make sure the CLI is connected to this instance."
),
break_loop=False,
)
try:
session = int(self.args.get("session", 0) or 0)
except (TypeError, ValueError):
return Response(
message="session must be an integer",
break_loop=False,
)
op_id = str(uuid.uuid4())
payload: dict[str, Any] = {
"op_id": op_id,
"runtime": runtime,
"session": session,
"context_id": context_id,
}
if runtime != "reset" and self._coerce_bool(self.args.get("reset")):
payload["reset"] = True
if runtime in {"terminal", "python", "nodejs"}:
code = self.args.get("code")
if code is None or not str(code).strip():
return Response(
message=f"code is required for runtime={runtime}",
break_loop=False,
)
payload["code"] = str(code)
elif runtime == "input":
keyboard = self.args.get("keyboard")
if keyboard is None:
keyboard = self.args.get("code")
if keyboard is None:
return Response(
message="keyboard is required for runtime=input",
break_loop=False,
)
payload["keyboard"] = str(keyboard)
elif runtime == "reset":
reason = self.args.get("reason")
if reason is not None:
payload["reason"] = str(reason)
loop = asyncio.get_running_loop()
future: asyncio.Future[dict[str, Any]] = loop.create_future()
store_pending_exec_op(
op_id,
sid=sid,
future=future,
loop=loop,
context_id=context_id,
)
try:
await get_shared_ws_manager().emit_to(
NAMESPACE,
sid,
EXEC_OP_EVENT,
payload,
handler_id=f"{self.__class__.__module__}.{self.__class__.__name__}",
)
result = await asyncio.wait_for(future, timeout=EXEC_OP_TIMEOUT)
except ConnectionNotFoundError:
clear_pending_exec_op(op_id)
return Response(
message=(
"code_execution_remote: the selected CLI client disconnected before "
"the execution request could be delivered"
),
break_loop=False,
)
except asyncio.TimeoutError:
clear_pending_exec_op(op_id)
return Response(
message=(
"code_execution_remote: timed out waiting for CLI to respond "
f"to runtime={runtime!r} in session {session}"
),
break_loop=False,
)
except Exception as exc:
clear_pending_exec_op(op_id)
return Response(
message=f"code_execution_remote: error sending exec_op: {exc}",
break_loop=False,
)
finally:
clear_pending_exec_op(op_id)
return Response(
message=self._extract_result(result, runtime, session),
break_loop=False,
)
def _extract_result(self, result: Any, runtime: str, session: int) -> str:
if not isinstance(result, dict):
return f"Unexpected response format from CLI: {result!r}"
ok = bool(result.get("ok"))
data = result.get("result")
error = result.get("error")
if not ok:
return (
f"Error (runtime={runtime!r}, session={session}): "
f"{error or 'Unknown error'}"
)
if not isinstance(data, dict):
data = {}
output = str(data.get("output") or data.get("text") or "").strip()
message = str(data.get("message") or "").strip()
running = bool(data.get("running"))
parts: list[str] = []
if message:
parts.append(message)
if output:
parts.append(output)
if not parts:
state = "running" if running else "completed"
parts.append(f"Session {session} {state}.")
return "\n\n".join(parts)