diff --git a/extensions/python/message_loop_prompts_after/_75_include_workdir_extras.py b/extensions/python/message_loop_prompts_after/_75_include_workdir_extras.py index d823b8199..1e8d86058 100644 --- a/extensions/python/message_loop_prompts_after/_75_include_workdir_extras.py +++ b/extensions/python/message_loop_prompts_after/_75_include_workdir_extras.py @@ -50,7 +50,7 @@ class IncludeWorkdirExtras(Extension): max_lines = set["workdir_max_lines"] gitignore_raw = set["workdir_gitignore"] - folder = set["workdir_path"] + folder = self.agent.context.get_data("workdir_path") or set["workdir_path"] scan_path = files.get_abs_path_development(folder) files.create_dir(scan_path) diff --git a/plugins/_acp/README.md b/plugins/_acp/README.md new file mode 100644 index 000000000..6f99df8d2 --- /dev/null +++ b/plugins/_acp/README.md @@ -0,0 +1,34 @@ +# Agent Client Protocol + +This builtin plugin exposes Agent Zero through the Agent Client Protocol (ACP) +over stdio so ACP-capable editors can create, load, resume, and prompt Agent +Zero sessions directly from a workspace. + +## Usage + +From the Agent Zero repository or runtime container: + +```bash +python -m plugins._acp +``` + +For the Dockerized Agent Zero runtime, point the editor command at the +framework interpreter inside the container, for example: + +```bash +docker exec -i agent-zero /opt/venv-a0/bin/python -m plugins._acp +``` + +ACP reserves stdout for JSON-RPC, so the adapter writes diagnostics to stderr. + +## Checks + +```bash +python -m plugins._acp --check +python -m plugins._acp --registry +``` + +The ACP Python SDK is provided by `agent-client-protocol`. +Fresh Docker images install it through the root `requirements.txt`; self-updated +instances lazy-install the same root pin through `plugins/_acp/hooks.py` the +first time the ACP stdio entrypoint starts. diff --git a/plugins/_acp/__init__.py b/plugins/_acp/__init__.py new file mode 100644 index 000000000..d17b30566 --- /dev/null +++ b/plugins/_acp/__init__.py @@ -0,0 +1,4 @@ +PLUGIN_NAME = "_acp" +PLUGIN_TITLE = "Agent Client Protocol" +AGENT_ZERO_ACP_VERSION = "1.19" +PLUGIN_VERSION = AGENT_ZERO_ACP_VERSION diff --git a/plugins/_acp/__main__.py b/plugins/_acp/__main__.py new file mode 100644 index 000000000..56511a45b --- /dev/null +++ b/plugins/_acp/__main__.py @@ -0,0 +1,5 @@ +from .entry import main + + +if __name__ == "__main__": + main() diff --git a/plugins/_acp/acp_registry/agent.json b/plugins/_acp/acp_registry/agent.json new file mode 100644 index 000000000..006c390bf --- /dev/null +++ b/plugins/_acp/acp_registry/agent.json @@ -0,0 +1,15 @@ +{ + "id": "agent-zero", + "name": "Agent Zero", + "version": "1.19", + "description": "Agent Zero ACP adapter for editor-native agent sessions.", + "repository": "https://github.com/agent0ai/agent-zero", + "authors": ["Agent Zero"], + "license": "MIT", + "distribution": { + "stdio": { + "command": "python", + "args": ["-m", "plugins._acp"] + } + } +} diff --git a/plugins/_acp/entry.py b/plugins/_acp/entry.py new file mode 100644 index 000000000..15cc13266 --- /dev/null +++ b/plugins/_acp/entry.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import argparse +import asyncio +import logging +import sys +from pathlib import Path + +from plugins._acp import PLUGIN_VERSION + + +_BENIGN_PROBE_METHODS = {"ping", "health", "healthcheck"} + + +class _BenignProbeFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + if record.getMessage() != "Background task failed": + return True + exc_info = record.exc_info + if not exc_info: + return True + try: + from acp.exceptions import RequestError + except ImportError: + return True + exc = exc_info[1] + if not isinstance(exc, RequestError): + return True + if getattr(exc, "code", None) != -32601: + return True + data = getattr(exc, "data", None) + method = data.get("method") if isinstance(data, dict) else None + return method not in _BENIGN_PROBE_METHODS + + +def main(argv: list[str] | None = None) -> None: + args = _parse_args(argv) + + if args.version: + print(PLUGIN_VERSION) + return + + if args.registry: + registry = Path(__file__).resolve().parent / "acp_registry" / "agent.json" + print(registry.read_text(encoding="utf-8")) + return + + if args.check: + _run_check() + return + + _setup_logging(debug=args.debug) + _ensure_project_root() + + acp = _import_acp_or_install() + + from helpers import persist_chat + from plugins._acp.helpers.server import AgentZeroACPAgent + + persist_chat.load_tmp_chats() + logger = logging.getLogger(__name__) + logger.info("Starting Agent Zero ACP adapter") + + try: + asyncio.run(acp.run_agent(AgentZeroACPAgent(), use_unstable_protocol=True)) + except KeyboardInterrupt: + logger.info("ACP adapter stopped") + except Exception: + logger.exception("ACP adapter crashed") + sys.exit(1) + + +def _parse_args(argv: list[str] | None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog="agent-zero-acp", + description="Run Agent Zero as an Agent Client Protocol stdio server.", + ) + parser.add_argument("--check", action="store_true", help="Verify ACP imports and exit") + parser.add_argument("--registry", action="store_true", help="Print ACP registry metadata") + parser.add_argument("--version", action="store_true", help="Print ACP plugin version") + parser.add_argument("--debug", action="store_true", help="Enable debug logging to stderr") + return parser.parse_args(argv) + + +def _setup_logging(*, debug: bool = False) -> None: + handler = logging.StreamHandler(sys.stderr) + handler.setFormatter( + logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + ) + handler.addFilter(_BenignProbeFilter()) + root = logging.getLogger() + root.handlers.clear() + root.addHandler(handler) + root.setLevel(logging.DEBUG if debug else logging.INFO) + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + logging.getLogger("openai").setLevel(logging.WARNING) + + +def _ensure_project_root() -> None: + project_root = str(Path(__file__).resolve().parents[2]) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + +def _run_check() -> None: + _ensure_project_root() + _import_acp_or_install() + from plugins._acp.helpers.server import AgentZeroACPAgent # noqa: F401 + + print("Agent Zero ACP check OK") + + +def _import_acp_or_install(): + try: + import acp + + return acp + except ImportError: + pass + + from plugins._acp import hooks + + if hooks.ensure_dependencies(raise_on_error=False): + try: + import acp + + return acp + except ImportError: + pass + + _missing_dependency() + + +def _missing_dependency() -> None: + print( + "Agent Zero ACP requires agent-client-protocol. " + "The plugin tried to install the root requirements pin automatically; " + "manual fallback: pip install agent-client-protocol==0.10.1", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/plugins/_acp/extensions/python/reasoning_stream_chunk/_50_acp_stream.py b/plugins/_acp/extensions/python/reasoning_stream_chunk/_50_acp_stream.py new file mode 100644 index 000000000..110f28a53 --- /dev/null +++ b/plugins/_acp/extensions/python/reasoning_stream_chunk/_50_acp_stream.py @@ -0,0 +1,9 @@ +from helpers.extension import Extension +from plugins._acp.helpers import bridge + + +class ACPReasoningStream(Extension): + async def execute(self, stream_data=None, **kwargs): + if not self.agent or not stream_data: + return + bridge.send_agent_thought_delta(self.agent.context.id, stream_data.get("full", "")) diff --git a/plugins/_acp/extensions/python/response_stream/_50_acp_response.py b/plugins/_acp/extensions/python/response_stream/_50_acp_response.py new file mode 100644 index 000000000..aaed2d3af --- /dev/null +++ b/plugins/_acp/extensions/python/response_stream/_50_acp_response.py @@ -0,0 +1,16 @@ +from helpers.extension import Extension +from plugins._acp.helpers import bridge + + +class ACPResponseStream(Extension): + async def execute(self, parsed=None, **kwargs): + if not self.agent or not isinstance(parsed, dict): + return + tool_name = parsed.get("tool_name") or parsed.get("tool") + if tool_name != "response": + return + tool_args = parsed.get("tool_args") if isinstance(parsed.get("tool_args"), dict) else parsed.get("args") + if not isinstance(tool_args, dict): + return + text = tool_args.get("text", tool_args.get("message", "")) + bridge.send_agent_delta(self.agent.context.id, str(text or "")) diff --git a/plugins/_acp/extensions/python/tool_execute_after/_50_acp_tool.py b/plugins/_acp/extensions/python/tool_execute_after/_50_acp_tool.py new file mode 100644 index 000000000..38550b050 --- /dev/null +++ b/plugins/_acp/extensions/python/tool_execute_after/_50_acp_tool.py @@ -0,0 +1,9 @@ +from helpers.extension import Extension +from plugins._acp.helpers import bridge + + +class ACPToolFinish(Extension): + async def execute(self, response=None, tool_name="", **kwargs): + if not self.agent: + return + bridge.finish_tool(self.agent.context.id, str(tool_name or ""), response) diff --git a/plugins/_acp/extensions/python/tool_execute_before/_50_acp_tool.py b/plugins/_acp/extensions/python/tool_execute_before/_50_acp_tool.py new file mode 100644 index 000000000..35acb4118 --- /dev/null +++ b/plugins/_acp/extensions/python/tool_execute_before/_50_acp_tool.py @@ -0,0 +1,9 @@ +from helpers.extension import Extension +from plugins._acp.helpers import bridge + + +class ACPToolStart(Extension): + async def execute(self, tool_name="", tool_args=None, **kwargs): + if not self.agent: + return + bridge.start_tool(self.agent.context.id, str(tool_name or ""), tool_args or {}) diff --git a/plugins/_acp/helpers/__init__.py b/plugins/_acp/helpers/__init__.py new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/plugins/_acp/helpers/__init__.py @@ -0,0 +1 @@ + diff --git a/plugins/_acp/helpers/bridge.py b/plugins/_acp/helpers/bridge.py new file mode 100644 index 000000000..7d4883121 --- /dev/null +++ b/plugins/_acp/helpers/bridge.py @@ -0,0 +1,303 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import threading +import uuid +from collections import defaultdict, deque +from dataclasses import dataclass, field +from typing import Any, Deque + + +logger = logging.getLogger(__name__) + +CTX_IS_ACP = "acp_session" +CTX_CWD = "acp_cwd" +CTX_ADDITIONAL_DIRECTORIES = "acp_additional_directories" +CTX_MODE = "acp_mode" +CTX_CONFIG_OPTIONS = "acp_config_options" +CTX_MODEL_ID = "acp_model_id" +CTX_WORKDIR = "workdir_path" + +DEFAULT_MODE = "default" + +_MAX_RAW_OUTPUT = 12000 + + +@dataclass +class SessionBridge: + context_id: str + session_id: str + conn: Any + loop: asyncio.AbstractEventLoop + response_text_sent: str = "" + thought_text_sent: str = "" + message_id: str = "" + tool_queues: dict[str, Deque[str]] = field(default_factory=lambda: defaultdict(deque)) + lock: threading.RLock = field(default_factory=threading.RLock) + + +_bridges_by_context: dict[str, SessionBridge] = {} +_bridges_by_session: dict[str, SessionBridge] = {} +_registry_lock = threading.RLock() + + +def register_bridge( + *, + context_id: str, + session_id: str, + conn: Any, + loop: asyncio.AbstractEventLoop, +) -> SessionBridge: + bridge = SessionBridge( + context_id=context_id, + session_id=session_id, + conn=conn, + loop=loop, + ) + with _registry_lock: + _bridges_by_context[context_id] = bridge + _bridges_by_session[session_id] = bridge + return bridge + + +def unregister_bridge(context_id: str | None = None, session_id: str | None = None) -> None: + with _registry_lock: + bridge = None + if context_id: + bridge = _bridges_by_context.pop(context_id, None) + if session_id: + bridge = _bridges_by_session.pop(session_id, None) or bridge + if bridge: + _bridges_by_context.pop(bridge.context_id, None) + _bridges_by_session.pop(bridge.session_id, None) + + +def get_bridge_for_context(context_id: str) -> SessionBridge | None: + with _registry_lock: + return _bridges_by_context.get(context_id) + + +def get_bridge_for_session(session_id: str) -> SessionBridge | None: + with _registry_lock: + return _bridges_by_session.get(session_id) + + +def reset_turn(context_id: str, message_id: str = "") -> None: + bridge = get_bridge_for_context(context_id) + if not bridge: + return + with bridge.lock: + bridge.response_text_sent = "" + bridge.thought_text_sent = "" + bridge.message_id = message_id + bridge.tool_queues.clear() + + +def send_agent_delta(context_id: str, full_text: str) -> bool: + bridge = get_bridge_for_context(context_id) + if not bridge: + return False + full_text = str(full_text or "") + with bridge.lock: + previous = bridge.response_text_sent + if full_text == previous: + return False + if full_text.startswith(previous): + delta = full_text[len(previous) :] + else: + delta = full_text + bridge.response_text_sent = full_text + if not delta: + return False + return _send_acp_helper_update(bridge, "update_agent_message_text", delta) + + +def send_agent_text(context_id: str, text: str) -> bool: + bridge = get_bridge_for_context(context_id) + if not bridge: + return False + text = str(text or "") + if not text: + return False + with bridge.lock: + bridge.response_text_sent += text + return _send_acp_helper_update(bridge, "update_agent_message_text", text) + + +def send_agent_thought_delta(context_id: str, full_text: str) -> bool: + bridge = get_bridge_for_context(context_id) + if not bridge: + return False + full_text = str(full_text or "") + with bridge.lock: + previous = bridge.thought_text_sent + if full_text == previous: + return False + if full_text.startswith(previous): + delta = full_text[len(previous) :] + else: + delta = full_text + bridge.thought_text_sent = full_text + if not delta: + return False + return _send_acp_helper_update(bridge, "update_agent_thought_text", delta) + + +def start_tool(context_id: str, tool_name: str, tool_args: dict[str, Any] | None = None) -> str: + bridge = get_bridge_for_context(context_id) + if not bridge: + return "" + + normalized_name = str(tool_name or "tool") + if normalized_name == "response": + return "" + + tool_call_id = f"a0-{uuid.uuid4().hex}" + title = _tool_title(normalized_name, tool_args or {}) + kind = _tool_kind(normalized_name, tool_args or {}) + raw_input = _json_safe(tool_args or {}) + + try: + import acp + + update = acp.start_tool_call( + tool_call_id, + title, + kind=kind, + status="in_progress", + raw_input=raw_input, + ) + except Exception: + logger.debug("Could not build ACP tool start", exc_info=True) + return "" + + with bridge.lock: + bridge.tool_queues[normalized_name].append(tool_call_id) + _send_update(bridge, update) + return tool_call_id + + +def finish_tool(context_id: str, tool_name: str, response: Any) -> bool: + bridge = get_bridge_for_context(context_id) + if not bridge: + return False + normalized_name = str(tool_name or "tool") + if normalized_name == "response": + return False + + with bridge.lock: + queue = bridge.tool_queues.get(normalized_name) + if not queue: + return False + tool_call_id = queue.popleft() + if not queue: + bridge.tool_queues.pop(normalized_name, None) + + message = getattr(response, "message", response) + raw_output = _truncate_raw_output(message) + try: + import acp + + update = acp.update_tool_call( + tool_call_id, + status="completed", + raw_output=raw_output, + ) + except Exception: + logger.debug("Could not build ACP tool completion", exc_info=True) + return False + _send_update(bridge, update) + return True + + +async def send_update_async(session_id: str, update: Any) -> bool: + bridge = get_bridge_for_session(session_id) + if not bridge: + return False + try: + await bridge.conn.session_update(session_id, update) + return True + except Exception: + logger.debug("Could not send ACP update", exc_info=True) + return False + + +def _send_acp_helper_update(bridge: SessionBridge, helper_name: str, text: str) -> bool: + try: + import acp + + helper = getattr(acp, helper_name) + update = helper(text) + except Exception: + logger.debug("Could not build ACP text update", exc_info=True) + return False + _send_update(bridge, update) + return True + + +def _send_update(bridge: SessionBridge, update: Any) -> None: + if bridge.loop.is_closed(): + return + + async def _deliver() -> None: + await bridge.conn.session_update(bridge.session_id, update) + + future = asyncio.run_coroutine_threadsafe(_deliver(), bridge.loop) + + def _log_failure(done_future: Any) -> None: + try: + done_future.result() + except Exception: + logger.debug("Failed to send ACP session update", exc_info=True) + + future.add_done_callback(_log_failure) + + +def _tool_title(tool_name: str, tool_args: dict[str, Any]) -> str: + action = tool_args.get("action") or tool_args.get("method") or "" + if action: + return f"{tool_name}: {action}" + return tool_name.replace("_", " ").strip().title() or "Tool" + + +def _tool_kind(tool_name: str, tool_args: dict[str, Any]) -> str: + probe = " ".join( + str(value).lower() + for value in (tool_name, tool_args.get("action", ""), tool_args.get("method", "")) + ) + if any(token in probe for token in ("read", "list", "show", "inspect")): + return "read" + if any(token in probe for token in ("write", "edit", "patch", "replace", "create")): + return "edit" + if any(token in probe for token in ("delete", "remove")): + return "delete" + if any(token in probe for token in ("move", "rename")): + return "move" + if any(token in probe for token in ("search", "find", "grep")): + return "search" + if any(token in probe for token in ("terminal", "shell", "python", "node", "code_execution")): + return "execute" + if any(token in probe for token in ("browser", "fetch", "http")): + return "fetch" + return "other" + + +def _json_safe(value: Any) -> Any: + try: + json.dumps(value) + return value + except Exception: + return str(value) + + +def _truncate_raw_output(value: Any) -> Any: + if value is None: + return None + if isinstance(value, str): + if len(value) <= _MAX_RAW_OUTPUT: + return value + hidden = len(value) - _MAX_RAW_OUTPUT + return value[:_MAX_RAW_OUTPUT] + f"\n\n[ACP output truncated: {hidden} characters hidden]" + return _json_safe(value) diff --git a/plugins/_acp/helpers/content.py b/plugins/_acp/helpers/content.py new file mode 100644 index 000000000..7351edba2 --- /dev/null +++ b/plugins/_acp/helpers/content.py @@ -0,0 +1,370 @@ +from __future__ import annotations + +import base64 +import json +import mimetypes +import os +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any +from urllib.parse import unquote, urlparse + +from helpers import files +from helpers.security import safe_filename + + +MAX_INLINE_RESOURCE_BYTES = 512 * 1024 +MAX_ATTACHMENT_BYTES = 16 * 1024 * 1024 + +_TEXT_MIME_PREFIXES = ("text/",) +_TEXT_MIME_TYPES = { + "application/json", + "application/javascript", + "application/typescript", + "application/xml", + "application/x-yaml", + "application/yaml", + "application/toml", + "application/sql", +} + + +@dataclass +class PromptParts: + text: str = "" + attachments: list[str] = field(default_factory=list) + + +def normalize_cwd(cwd: str | None) -> str: + raw = str(cwd or "").strip() or os.getcwd() + raw = os.path.expanduser(raw) + translated = translate_windows_drive_path(raw) + if translated: + raw = translated + return os.path.abspath(raw) + + +def normalize_path_for_compare(path: str | None) -> str: + return os.path.normcase(os.path.normpath(normalize_cwd(path))) + + +def translate_windows_drive_path(path: str) -> str | None: + match = re.match(r"^/?([A-Za-z]):[\\/](.*)$", str(path or "")) + if not match: + return None + drive = match.group(1).lower() + tail = match.group(2).replace("\\", "/").lstrip("/") + return f"/mnt/{drive}/{tail}" + + +def path_from_file_uri(uri: str) -> Path | None: + raw = str(uri or "").strip() + if not raw: + return None + + parsed = urlparse(raw) + if parsed.scheme and parsed.scheme != "file": + return None + + if parsed.scheme == "file": + if parsed.netloc and parsed.netloc not in {"", "localhost"}: + return None + path_text = unquote(parsed.path or "") + else: + path_text = unquote(raw) + + translated = translate_windows_drive_path(path_text) + if translated: + return Path(translated) + return Path(path_text) + + +def stringify_message_content(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, dict): + preview = content.get("preview") + if isinstance(preview, str) and preview.strip(): + return preview + raw = content.get("raw_content") + if raw is not None: + return stringify_message_content(raw) + if isinstance(content, list): + parts = [stringify_message_content(item) for item in content] + return "\n".join(part for part in parts if part) + try: + return json.dumps(content, ensure_ascii=False) + except Exception: + return str(content) + + +def prompt_blocks_to_user_message( + prompt: list[Any], + *, + context_id: str, + message_id: str, +) -> PromptParts: + text_parts: list[str] = [] + attachments: list[str] = [] + + for index, block in enumerate(prompt or []): + block_type = str(getattr(block, "type", "") or "").strip() + if block_type == "text" or hasattr(block, "text"): + text = str(getattr(block, "text", "") or "") + if text: + text_parts.append(text) + continue + + if block_type == "image": + _append_media_block( + block, + context_id=context_id, + message_id=message_id, + index=index, + kind="image", + text_parts=text_parts, + attachments=attachments, + ) + continue + + if block_type == "audio": + _append_media_block( + block, + context_id=context_id, + message_id=message_id, + index=index, + kind="audio", + text_parts=text_parts, + attachments=attachments, + ) + continue + + if block_type == "resource_link": + _append_resource_link(block, text_parts=text_parts, attachments=attachments) + continue + + if block_type == "resource": + _append_embedded_resource( + block, + context_id=context_id, + message_id=message_id, + index=index, + text_parts=text_parts, + attachments=attachments, + ) + + text = "\n\n".join(part for part in text_parts if part).strip() + if not text and attachments: + text = "Please inspect the attached file(s)." + return PromptParts(text=text, attachments=attachments) + + +def _append_media_block( + block: Any, + *, + context_id: str, + message_id: str, + index: int, + kind: str, + text_parts: list[str], + attachments: list[str], +) -> None: + uri = str(getattr(block, "uri", "") or "").strip() + mime_type = str(getattr(block, "mime_type", "") or "").strip() or None + data = str(getattr(block, "data", "") or "").strip() + + if uri: + path = path_from_file_uri(uri) + if path and path.exists(): + attachments.append(str(path)) + text_parts.append(f"[Attached {kind}: {path}]") + return + text_parts.append(f"[Attached {kind} reference: {uri}]") + return + + if not data: + return + + try: + raw = base64.b64decode(data.split(",", 1)[-1], validate=False) + except Exception: + raw = data.encode("utf-8", errors="replace") + + saved = _save_attachment_bytes( + context_id=context_id, + message_id=message_id, + index=index, + label=kind, + data=raw, + mime_type=mime_type, + ) + attachments.append(saved) + text_parts.append(f"[Attached {kind}: {saved}]") + + +def _append_resource_link( + block: Any, + *, + text_parts: list[str], + attachments: list[str], +) -> None: + uri = str(getattr(block, "uri", "") or "").strip() + if not uri: + return + name = _resource_display_name(block, uri) + mime_type = str(getattr(block, "mime_type", "") or "").strip() or None + path = path_from_file_uri(uri) + + if path is None: + text_parts.append(f"[Attached resource: {name}]\nURI: {uri}") + return + + if not path.exists(): + text_parts.append(f"[Attached resource unavailable: {name}]\nURI: {uri}\nPath: {path}") + return + + if _is_probably_text(path, mime_type): + try: + size = path.stat().st_size + data = path.read_bytes()[:MAX_INLINE_RESOURCE_BYTES] + body = decode_text_bytes(data, mime_type) or "" + if size > MAX_INLINE_RESOURCE_BYTES: + body += f"\n\n[Truncated to {MAX_INLINE_RESOURCE_BYTES} of {size} bytes]" + text_parts.append(f"[Attached file: {name}]\nURI: {uri}\n\n{body}") + return + except OSError as exc: + text_parts.append(f"[Attached file unreadable: {name}]\nURI: {uri}\nError: {exc}") + return + + attachments.append(str(path)) + text_parts.append(f"[Attached file: {path}]") + + +def _append_embedded_resource( + block: Any, + *, + context_id: str, + message_id: str, + index: int, + text_parts: list[str], + attachments: list[str], +) -> None: + resource = getattr(block, "resource", None) + if resource is None: + return + + uri = str(getattr(resource, "uri", "") or "").strip() + mime_type = str(getattr(resource, "mime_type", "") or "").strip() or None + if hasattr(resource, "text"): + body = str(getattr(resource, "text", "") or "") + text_parts.append(f"[Attached resource: {uri or 'embedded text'}]\n\n{body}") + return + + blob = str(getattr(resource, "blob", "") or "") + if not blob: + return + + try: + data = base64.b64decode(blob, validate=True) + except Exception: + data = blob.encode("utf-8", errors="replace") + + text = decode_text_bytes(data[:MAX_INLINE_RESOURCE_BYTES], mime_type) + if text is not None and _is_text_mime(mime_type): + if len(data) > MAX_INLINE_RESOURCE_BYTES: + text += f"\n\n[Truncated to {MAX_INLINE_RESOURCE_BYTES} of {len(data)} bytes]" + text_parts.append(f"[Attached resource: {uri or 'embedded text'}]\n\n{text}") + return + + saved = _save_attachment_bytes( + context_id=context_id, + message_id=message_id, + index=index, + label="resource", + data=data, + mime_type=mime_type, + ) + attachments.append(saved) + text_parts.append(f"[Attached resource: {saved}]") + + +def _save_attachment_bytes( + *, + context_id: str, + message_id: str, + index: int, + label: str, + data: bytes, + mime_type: str | None, +) -> str: + if len(data) > MAX_ATTACHMENT_BYTES: + data = data[:MAX_ATTACHMENT_BYTES] + + extension = mimetypes.guess_extension((mime_type or "").split(";", 1)[0]) or ".bin" + base_name = safe_filename(f"{label}-{message_id}-{index}{extension}") or f"{label}-{index}{extension}" + from helpers import persist_chat + + attach_dir = files.get_abs_path(persist_chat.get_chat_folder_path(context_id), "acp") + os.makedirs(attach_dir, exist_ok=True) + path = os.path.join(attach_dir, base_name) + with open(path, "wb") as handle: + handle.write(data) + return files.normalize_a0_path(path) + + +def _resource_display_name(block: Any, uri: str) -> str: + title = str(getattr(block, "title", "") or "").strip() + name = str(getattr(block, "name", "") or "").strip() + if title and name and title != name: + return f"{title} ({name})" + if title: + return title + if name: + return name + parsed = urlparse(uri) + return Path(unquote(parsed.path or uri)).name or uri or "resource" + + +def _is_text_mime(mime_type: str | None) -> bool: + mime = (mime_type or "").split(";", 1)[0].strip().lower() + if not mime: + return False + return mime.startswith(_TEXT_MIME_PREFIXES) or mime in _TEXT_MIME_TYPES + + +def _is_probably_text(path: Path, mime_type: str | None) -> bool: + if _is_text_mime(mime_type): + return True + guessed, _encoding = mimetypes.guess_type(str(path)) + if _is_text_mime(guessed): + return True + return path.suffix.lower() in { + ".md", + ".txt", + ".py", + ".js", + ".ts", + ".tsx", + ".jsx", + ".json", + ".yaml", + ".yml", + ".toml", + ".xml", + ".html", + ".css", + ".sql", + ".sh", + } + + +def decode_text_bytes(data: bytes, mime_type: str | None = None) -> str | None: + if b"\x00" in data and not _is_text_mime(mime_type): + return None + for encoding in ("utf-8-sig", "utf-8", "latin-1"): + try: + return data.decode(encoding) + except UnicodeDecodeError: + continue + return data.decode("utf-8", errors="replace") diff --git a/plugins/_acp/helpers/server.py b/plugins/_acp/helpers/server.py new file mode 100644 index 000000000..38945dbea --- /dev/null +++ b/plugins/_acp/helpers/server.py @@ -0,0 +1,627 @@ +from __future__ import annotations + +import asyncio +import logging +import uuid +from concurrent.futures import CancelledError +from pathlib import Path +from typing import Any + +import acp +from acp.schema import ( + AgentCapabilities, + AvailableCommand, + AvailableCommandsUpdate, + ClientCapabilities, + CloseSessionResponse, + CurrentModeUpdate, + ForkSessionResponse, + Implementation, + InitializeResponse, + ListSessionsResponse, + LoadSessionResponse, + NewSessionResponse, + PromptCapabilities, + PromptResponse, + ResumeSessionResponse, + SessionCapabilities, + SessionForkCapabilities, + SessionInfo, + SessionListCapabilities, + SessionMode, + SessionModeState, + SessionResumeCapabilities, + SetSessionConfigOptionResponse, + SetSessionModelResponse, + SetSessionModeResponse, + UnstructuredCommandInput, + Usage, +) + +from agent import Agent, AgentContext, AgentContextType, UserMessage +from helpers import git, message_queue as mq, persist_chat, tokens +from helpers.localization import Localization +from initialize import initialize_agent +from plugins._acp import AGENT_ZERO_ACP_VERSION, PLUGIN_VERSION +from plugins._acp.helpers import bridge +from plugins._acp.helpers.content import ( + normalize_cwd, + normalize_path_for_compare, + prompt_blocks_to_user_message, + stringify_message_content, +) + + +logger = logging.getLogger(__name__) + + +class AgentZeroACPAgent(acp.Agent): + _ADVERTISED_COMMANDS = ( + { + "name": "help", + "description": "List Agent Zero ACP commands", + }, + { + "name": "context", + "description": "Show conversation and context-window status", + }, + { + "name": "reset", + "description": "Clear the current Agent Zero conversation", + }, + { + "name": "version", + "description": "Show Agent Zero and ACP plugin versions", + }, + ) + + _MODE_DEFAULT = bridge.DEFAULT_MODE + _MODE_PLAN = "plan" + _MODE_ACT = "act" + + _MODE_INSTRUCTIONS = { + _MODE_PLAN: ( + "ACP session mode: plan first. Prefer analysis, architecture, and tradeoffs. " + "Do not modify files unless the user explicitly asks you to proceed." + ), + _MODE_ACT: ( + "ACP session mode: act. Complete the requested work end-to-end with focused " + "implementation and validation." + ), + } + + def __init__(self) -> None: + super().__init__() + self._conn: acp.Client | None = None + + def on_connect(self, conn: acp.Client) -> None: + self._conn = conn + logger.info("ACP client connected") + + async def initialize( + self, + protocol_version: int | None = None, + client_capabilities: ClientCapabilities | None = None, + client_info: Implementation | None = None, + **kwargs: Any, + ) -> InitializeResponse: + client_name = client_info.name if client_info else "unknown" + logger.info("ACP initialize from %s (protocol v%s)", client_name, protocol_version) + return InitializeResponse( + protocol_version=acp.PROTOCOL_VERSION, + agent_info=Implementation( + name="agent-zero", + title="Agent Zero", + version=AGENT_ZERO_ACP_VERSION, + ), + agent_capabilities=AgentCapabilities( + load_session=True, + prompt_capabilities=PromptCapabilities( + embedded_context=True, + image=True, + audio=True, + ), + session_capabilities=SessionCapabilities( + fork=SessionForkCapabilities(), + list=SessionListCapabilities(), + resume=SessionResumeCapabilities(), + ), + ), + auth_methods=[], + ) + + async def new_session( + self, + cwd: str, + additional_directories: list[str] | None = None, + **kwargs: Any, + ) -> NewSessionResponse: + context = self._create_context(cwd=cwd, additional_directories=additional_directories) + self._register(context) + await self._send_session_start_updates(context) + persist_chat.save_tmp_chat(context) + return NewSessionResponse(session_id=context.id, modes=self._session_modes(context)) + + async def load_session( + self, + cwd: str, + session_id: str, + additional_directories: list[str] | None = None, + **kwargs: Any, + ) -> LoadSessionResponse | None: + context = self._get_context(session_id) + if context is None: + logger.warning("ACP load_session: missing session %s", session_id) + return None + self._apply_workspace(context, cwd=cwd, additional_directories=additional_directories) + self._register(context) + await self._replay_history(context) + await self._send_session_start_updates(context) + persist_chat.save_tmp_chat(context) + return LoadSessionResponse(modes=self._session_modes(context)) + + async def resume_session( + self, + cwd: str, + session_id: str, + additional_directories: list[str] | None = None, + **kwargs: Any, + ) -> ResumeSessionResponse: + context = self._get_context(session_id) + if context is None: + context = self._create_context( + cwd=cwd, + additional_directories=additional_directories, + context_id=session_id, + ) + else: + self._apply_workspace(context, cwd=cwd, additional_directories=additional_directories) + self._register(context) + await self._replay_history(context) + await self._send_session_start_updates(context) + persist_chat.save_tmp_chat(context) + return ResumeSessionResponse(modes=self._session_modes(context)) + + async def fork_session( + self, + cwd: str, + session_id: str, + additional_directories: list[str] | None = None, + **kwargs: Any, + ) -> ForkSessionResponse: + original = self._get_context(session_id) + if original is None: + return ForkSessionResponse(session_id="") + + new_ids = persist_chat.load_json_chats([persist_chat.export_json_chat(original)]) + new_id = new_ids[0] if new_ids else "" + context = self._get_context(new_id) if new_id else None + if context is None: + return ForkSessionResponse(session_id="") + + context.name = f"{original.name or 'ACP session'} (fork)" + self._apply_workspace(context, cwd=cwd, additional_directories=additional_directories) + self._register(context) + await self._send_session_start_updates(context) + persist_chat.save_tmp_chat(context) + return ForkSessionResponse(session_id=context.id, modes=self._session_modes(context)) + + async def list_sessions( + self, + cursor: str | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> ListSessionsResponse: + persist_chat.load_tmp_chats() + contexts = [ctx for ctx in AgentContext.all() if ctx.get_data(bridge.CTX_IS_ACP)] + if cwd: + normalized = normalize_path_for_compare(cwd) + contexts = [ + ctx + for ctx in contexts + if normalize_path_for_compare(ctx.get_data(bridge.CTX_CWD) or ctx.get_data(bridge.CTX_WORKDIR)) + == normalized + ] + + contexts.sort(key=lambda ctx: ctx.last_message or ctx.created_at, reverse=True) + if cursor: + for idx, context in enumerate(contexts): + if context.id == cursor: + contexts = contexts[idx + 1 :] + break + else: + contexts = [] + + page = contexts[:50] + next_cursor = contexts[50].id if len(contexts) > 50 else None + sessions = [self._session_info(context) for context in page] + return ListSessionsResponse(sessions=sessions, next_cursor=next_cursor) + + async def prompt( + self, + prompt: list[Any], + session_id: str, + message_id: str | None = None, + **kwargs: Any, + ) -> PromptResponse: + context = self._get_context(session_id) + if context is None: + logger.warning("ACP prompt: missing session %s", session_id) + return PromptResponse(stop_reason="refusal", user_message_id=message_id) + + self._register(context) + msg_id = message_id or str(uuid.uuid4()) + parts = prompt_blocks_to_user_message( + prompt, + context_id=context.id, + message_id=msg_id, + ) + user_text = parts.text.strip() + if not user_text and not parts.attachments: + return PromptResponse(stop_reason="end_turn", user_message_id=msg_id) + + context.last_message = Localization.get().now() + + text_only = bool(prompt) and all(str(getattr(block, "type", "") or "") == "text" for block in prompt) + if text_only and not parts.attachments and user_text.startswith("/"): + response_text = await self._handle_slash_command(user_text, context) + if response_text is not None: + await self._send_agent_message(context.id, response_text) + persist_chat.save_tmp_chat(context) + return PromptResponse(stop_reason="end_turn", user_message_id=msg_id) + + if context.is_running(): + await self._send_agent_message( + context.id, + "A turn is already running. Wait for it to finish or send cancel from the ACP client.", + ) + return PromptResponse(stop_reason="end_turn", user_message_id=msg_id) + + mode_instruction = self._mode_instruction(context) + system_message = [mode_instruction] if mode_instruction else [] + mq.log_user_message( + context, + user_text, + parts.attachments, + message_id=msg_id, + source=" (ACP)", + ) + bridge.reset_turn(context.id, msg_id) + + task = context.communicate( + UserMessage( + message=user_text, + attachments=parts.attachments, + system_message=system_message, + id=msg_id, + ) + ) + + stop_reason = "end_turn" + final_text = "" + try: + result = await task.result() + final_text = "" if result is None else str(result) + except (asyncio.CancelledError, CancelledError): + stop_reason = "cancelled" + except Exception as exc: + logger.exception("ACP prompt failed for session %s", session_id) + final_text = f"Error: {exc}" + stop_reason = "end_turn" + + session_bridge = bridge.get_bridge_for_context(context.id) + if final_text and session_bridge and not session_bridge.response_text_sent.strip(): + await self._send_agent_message(context.id, final_text) + elif final_text and session_bridge is None: + await self._send_agent_message(context.id, final_text) + + persist_chat.save_tmp_chat(context) + return PromptResponse( + stop_reason=stop_reason, + usage=self._usage(context, final_text), + user_message_id=msg_id, + ) + + async def cancel(self, session_id: str, **kwargs: Any) -> None: + context = self._get_context(session_id) + if context: + context.kill_process() + bridge.reset_turn(context.id) + logger.info("ACP cancelled session %s", session_id) + + async def close_session(self, session_id: str, **kwargs: Any) -> CloseSessionResponse | None: + context = self._get_context(session_id) + if context: + context.kill_process() + AgentContext.remove(context.id) + persist_chat.remove_chat(context.id) + bridge.unregister_bridge(session_id=session_id) + return CloseSessionResponse() + + async def set_session_mode( + self, + mode_id: str, + session_id: str, + **kwargs: Any, + ) -> SetSessionModeResponse | None: + context = self._get_context(session_id) + if context is None: + return None + normalized = str(mode_id or self._MODE_DEFAULT).strip() + if normalized not in {self._MODE_DEFAULT, self._MODE_PLAN, self._MODE_ACT}: + normalized = self._MODE_DEFAULT + context.set_data(bridge.CTX_MODE, normalized) + persist_chat.save_tmp_chat(context) + await self._send_current_mode(context) + return SetSessionModeResponse() + + async def set_session_model( + self, + model_id: str, + session_id: str, + **kwargs: Any, + ) -> SetSessionModelResponse | None: + context = self._get_context(session_id) + if context is None: + return None + context.set_data(bridge.CTX_MODEL_ID, str(model_id or "")) + persist_chat.save_tmp_chat(context) + return SetSessionModelResponse() + + async def set_config_option( + self, + config_id: str, + session_id: str, + value: str | bool, + **kwargs: Any, + ) -> SetSessionConfigOptionResponse | None: + context = self._get_context(session_id) + if context is None: + return None + options = context.get_data(bridge.CTX_CONFIG_OPTIONS) or {} + if not isinstance(options, dict): + options = {} + options[str(config_id)] = value + context.set_data(bridge.CTX_CONFIG_OPTIONS, options) + persist_chat.save_tmp_chat(context) + return SetSessionConfigOptionResponse(config_options=[]) + + async def ext_method(self, method: str, params: dict[str, Any]) -> dict[str, Any]: + if method in {"ping", "health", "healthcheck"}: + return {"ok": True} + return {} + + async def ext_notification(self, method: str, params: dict[str, Any]) -> None: + return None + + def _create_context( + self, + *, + cwd: str, + additional_directories: list[str] | None = None, + context_id: str | None = None, + ) -> AgentContext: + resolved_cwd = normalize_cwd(cwd) + context = AgentContext( + config=initialize_agent(), + id=context_id or None, + name=self._title_for_cwd(resolved_cwd), + type=AgentContextType.USER, + ) + self._apply_workspace(context, cwd=resolved_cwd, additional_directories=additional_directories) + return context + + def _get_context(self, session_id: str) -> AgentContext | None: + context = AgentContext.get(session_id) + if context is not None: + return context + persist_chat.load_tmp_chats() + return AgentContext.get(session_id) + + def _apply_workspace( + self, + context: AgentContext, + *, + cwd: str, + additional_directories: list[str] | None = None, + ) -> None: + resolved_cwd = normalize_cwd(cwd) + context.set_data(bridge.CTX_IS_ACP, True) + context.set_data(bridge.CTX_CWD, resolved_cwd) + context.set_data(bridge.CTX_WORKDIR, resolved_cwd) + context.set_data( + bridge.CTX_ADDITIONAL_DIRECTORIES, + [normalize_cwd(path) for path in (additional_directories or []) if path], + ) + context.set_data(bridge.CTX_MODE, context.get_data(bridge.CTX_MODE) or self._MODE_DEFAULT) + if not context.name: + context.name = self._title_for_cwd(resolved_cwd) + + def _register(self, context: AgentContext) -> None: + if not self._conn: + return + bridge.register_bridge( + context_id=context.id, + session_id=context.id, + conn=self._conn, + loop=asyncio.get_running_loop(), + ) + + def _session_modes(self, context: AgentContext) -> SessionModeState: + current = str(context.get_data(bridge.CTX_MODE) or self._MODE_DEFAULT) + if current not in {self._MODE_DEFAULT, self._MODE_PLAN, self._MODE_ACT}: + current = self._MODE_DEFAULT + return SessionModeState( + current_mode_id=current, + available_modes=[ + SessionMode( + id=self._MODE_DEFAULT, + name="Default", + description="Use the normal Agent Zero behavior.", + ), + SessionMode( + id=self._MODE_PLAN, + name="Plan", + description="Prefer planning and analysis before changing files.", + ), + SessionMode( + id=self._MODE_ACT, + name="Act", + description="Proceed end-to-end when the request is actionable.", + ), + ], + ) + + def _mode_instruction(self, context: AgentContext) -> str: + return self._MODE_INSTRUCTIONS.get(str(context.get_data(bridge.CTX_MODE) or ""), "") + + async def _send_session_start_updates(self, context: AgentContext) -> None: + await self._send_available_commands(context) + await self._send_current_mode(context) + + async def _send_available_commands(self, context: AgentContext) -> None: + if not self._conn: + return + try: + await self._conn.session_update( + context.id, + AvailableCommandsUpdate( + session_update="available_commands_update", + available_commands=self._available_commands(), + ), + ) + except Exception: + logger.debug("Could not advertise ACP commands", exc_info=True) + + async def _send_current_mode(self, context: AgentContext) -> None: + if not self._conn: + return + try: + update = CurrentModeUpdate( + session_update="current_mode_update", + current_mode_id=str(context.get_data(bridge.CTX_MODE) or self._MODE_DEFAULT), + ) + await self._conn.session_update(context.id, update) + except Exception: + logger.debug("Could not send ACP current mode", exc_info=True) + + @classmethod + def _available_commands(cls) -> list[AvailableCommand]: + commands: list[AvailableCommand] = [] + for spec in cls._ADVERTISED_COMMANDS: + input_hint = spec.get("input_hint") + commands.append( + AvailableCommand( + name=spec["name"], + description=spec["description"], + input=UnstructuredCommandInput(hint=input_hint) if input_hint else None, + ) + ) + return commands + + async def _replay_history(self, context: AgentContext) -> None: + if not self._conn: + return + try: + outputs = context.agent0.history.output() + except Exception: + logger.debug("Could not read Agent Zero history for ACP replay", exc_info=True) + return + for item in outputs: + text = stringify_message_content(item.get("content")).strip() + if not text: + continue + update = ( + acp.update_agent_message_text(text) + if item.get("ai") + else acp.update_user_message_text(text) + ) + try: + await self._conn.session_update(context.id, update) + except Exception: + logger.debug("Could not replay ACP history", exc_info=True) + return + + async def _send_agent_message(self, session_id: str, text: str) -> None: + if not self._conn or not text: + return + try: + await self._conn.session_update(session_id, acp.update_agent_message_text(text)) + except Exception: + logger.debug("Could not send ACP agent message", exc_info=True) + + async def _handle_slash_command(self, text: str, context: AgentContext) -> str | None: + command, _, args = text.partition(" ") + command = command.lstrip("/").lower().strip() + args = args.strip() + if command == "help": + lines = ["Available commands:", ""] + for spec in self._ADVERTISED_COMMANDS: + lines.append(f"/{spec['name']}: {spec['description']}") + return "\n".join(lines) + if command == "context": + return self._context_summary(context) + if command == "reset": + context.reset() + self._apply_workspace( + context, + cwd=context.get_data(bridge.CTX_CWD) or context.get_data(bridge.CTX_WORKDIR), + additional_directories=context.get_data(bridge.CTX_ADDITIONAL_DIRECTORIES) or [], + ) + return "Conversation history cleared." + if command == "version": + return f"Agent Zero {git.get_version()}\nACP plugin {PLUGIN_VERSION}\nACP protocol {acp.PROTOCOL_VERSION}" + return None + + def _context_summary(self, context: AgentContext) -> str: + outputs = context.agent0.history.output() + user_count = sum(1 for item in outputs if not item.get("ai")) + assistant_count = sum(1 for item in outputs if item.get("ai")) + history_tokens = context.agent0.history.get_tokens() + window = context.agent0.get_data(Agent.DATA_NAME_CTX_WINDOW) or {} + window_tokens = window.get("tokens", 0) if isinstance(window, dict) else 0 + cwd = context.get_data(bridge.CTX_CWD) or context.get_data(bridge.CTX_WORKDIR) or "" + lines = [ + f"Session: {context.id}", + f"Workspace: {cwd}", + f"Messages: {user_count} user, {assistant_count} assistant", + f"History tokens: ~{history_tokens:,}", + ] + if window_tokens: + lines.append(f"Last context window: ~{int(window_tokens):,} tokens") + mode = context.get_data(bridge.CTX_MODE) or self._MODE_DEFAULT + lines.append(f"Mode: {mode}") + return "\n".join(lines) + + def _usage(self, context: AgentContext, final_text: str) -> Usage | None: + window = context.agent0.get_data(Agent.DATA_NAME_CTX_WINDOW) or {} + input_tokens = int(window.get("tokens", 0) or 0) if isinstance(window, dict) else 0 + output_tokens = tokens.approximate_tokens(final_text or "") + total = input_tokens + output_tokens + if total <= 0: + return None + return Usage( + input_tokens=max(input_tokens, 0), + output_tokens=max(output_tokens, 0), + total_tokens=max(total, 0), + ) + + def _session_info(self, context: AgentContext) -> SessionInfo: + cwd = context.get_data(bridge.CTX_CWD) or context.get_data(bridge.CTX_WORKDIR) or "." + updated_at = context.last_message or context.created_at or Localization.get().now() + if hasattr(updated_at, "isoformat"): + updated_at_str = updated_at.isoformat() + else: + updated_at_str = str(updated_at) + return SessionInfo( + session_id=context.id, + cwd=normalize_cwd(cwd), + title=context.name or self._title_for_cwd(cwd), + updated_at=updated_at_str, + additional_directories=context.get_data(bridge.CTX_ADDITIONAL_DIRECTORIES) or None, + ) + + @staticmethod + def _title_for_cwd(cwd: str) -> str: + name = Path(str(cwd or "")).name + return f"ACP: {name or 'workspace'}" diff --git a/plugins/_acp/hooks.py b/plugins/_acp/hooks.py new file mode 100644 index 000000000..71bfa1d5f --- /dev/null +++ b/plugins/_acp/hooks.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import importlib +import importlib.util +import re +import shutil +import subprocess +import sys +import threading +from pathlib import Path + +from helpers.errors import format_error +from helpers.print_style import PrintStyle + + +PLUGIN_NAME = "_acp" +IMPORT_NAME = "acp" +DISTRIBUTION_NAME = "agent-client-protocol" +FALLBACK_REQUIREMENT = "agent-client-protocol==0.10.1" + +_LOCK = threading.Lock() +_CHECKED = False +_PLUGIN_DIR = Path(__file__).resolve().parent +_PROJECT_ROOT = _PLUGIN_DIR.parents[1] +_ROOT_REQUIREMENTS_FILE = _PROJECT_ROOT / "requirements.txt" + + +def has_acp() -> bool: + return importlib.util.find_spec(IMPORT_NAME) is not None + + +def get_acp_requirement() -> str: + if not _ROOT_REQUIREMENTS_FILE.is_file(): + return FALLBACK_REQUIREMENT + + pattern = re.compile(rf"^\s*{re.escape(DISTRIBUTION_NAME)}\s*(?:[<>=!~]=?|===).*$") + for raw_line in _ROOT_REQUIREMENTS_FILE.read_text(encoding="utf-8").splitlines(): + line = raw_line.split("#", 1)[0].strip() + if line and pattern.match(line): + return line + return FALLBACK_REQUIREMENT + + +def ensure_dependencies(raise_on_error: bool = True) -> bool: + """Install the ACP SDK into the framework runtime when self-updates need it.""" + global _CHECKED + + if _CHECKED and has_acp(): + return True + + with _LOCK: + if _CHECKED and has_acp(): + return True + if has_acp(): + _CHECKED = True + return True + + requirement = get_acp_requirement() + try: + _install_requirement(requirement) + importlib.invalidate_caches() + if not has_acp(): + raise RuntimeError( + f"ACP dependency '{requirement}' is still unavailable after installation" + ) + _CHECKED = True + return True + except Exception as exc: + message = f"Agent Zero ACP: failed to install {requirement}: {format_error(exc)}" + if raise_on_error: + raise RuntimeError(message) from exc + PrintStyle.error(message) + return False + + +def install() -> bool: + return ensure_dependencies(raise_on_error=True) + + +def _install_requirement(requirement: str) -> None: + cmd = _install_command(requirement) + PrintStyle.info(f"Agent Zero ACP: installing {requirement}") + subprocess.check_call(cmd, cwd=str(_PROJECT_ROOT)) + + +def _install_command(requirement: str) -> list[str]: + uv = shutil.which("uv") + if uv: + return [ + uv, + "pip", + "install", + "--python", + sys.executable, + requirement, + ] + return [ + sys.executable, + "-m", + "pip", + "install", + requirement, + ] diff --git a/plugins/_acp/plugin.yaml b/plugins/_acp/plugin.yaml new file mode 100644 index 000000000..0a46a2cea --- /dev/null +++ b/plugins/_acp/plugin.yaml @@ -0,0 +1,9 @@ +name: _acp +title: Agent Client Protocol +description: Exposes Agent Zero as an ACP stdio agent for editor clients. +version: "1.19" +settings_sections: + - external +per_project_config: false +per_agent_config: false +always_enabled: true diff --git a/plugins/_acp/webui/config.html b/plugins/_acp/webui/config.html new file mode 100644 index 000000000..529b0c884 --- /dev/null +++ b/plugins/_acp/webui/config.html @@ -0,0 +1,8 @@ +
+
+

Agent Client Protocol

+

Expose Agent Zero to ACP-capable editors over stdio.

+
+
python -m plugins._acp
+
docker exec -i agent-zero /opt/venv-a0/bin/python -m plugins._acp
+
diff --git a/plugins/_code_execution/tools/code_execution_tool.py b/plugins/_code_execution/tools/code_execution_tool.py index 6e06dd461..f18d075fe 100644 --- a/plugins/_code_execution/tools/code_execution_tool.py +++ b/plugins/_code_execution/tools/code_execution_tool.py @@ -480,8 +480,10 @@ class CodeExecution(Tool): if project_name: path = projects.get_project_folder(project_name) else: - set = settings.get_settings() - path = set.get("workdir_path") + path = self.agent.context.get_data("workdir_path") + if not path: + set = settings.get_settings() + path = set.get("workdir_path") if not path: return None diff --git a/requirements.txt b/requirements.txt index bbb94b2b8..445bd2f1a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ a2wsgi==1.10.8 ansio==0.0.1 +agent-client-protocol==0.10.1 docker==7.1.0 duckduckgo-search==6.1.12 pyreqwest-impersonate==0.5.3 # freeze nearest wheel-backed release; 0.5.5 is source-only diff --git a/tests/test_acp_plugin_static.py b/tests/test_acp_plugin_static.py new file mode 100644 index 000000000..d24019051 --- /dev/null +++ b/tests/test_acp_plugin_static.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from pathlib import Path +import sys +from types import SimpleNamespace + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from plugins._acp.helpers.content import ( + normalize_cwd, + path_from_file_uri, + prompt_blocks_to_user_message, +) +from plugins._acp import hooks + + +def test_acp_manifest_declares_builtin_plugin() -> None: + manifest = (ROOT / "plugins" / "_acp" / "plugin.yaml").read_text(encoding="utf-8") + assert "name: _acp" in manifest + assert "always_enabled: true" in manifest + assert 'version: "1.19"' in manifest + + +def test_acp_registry_metadata_matches_release() -> None: + registry = (ROOT / "plugins" / "_acp" / "acp_registry" / "agent.json").read_text( + encoding="utf-8" + ) + assert '"version": "1.19"' in registry + assert '"license": "MIT"' in registry + + +def test_acp_dependency_is_pinned() -> None: + requirements = (ROOT / "requirements.txt").read_text(encoding="utf-8") + assert "agent-client-protocol==0.10.1" in requirements + + +def test_acp_hook_uses_root_requirement_pin() -> None: + assert hooks.get_acp_requirement() == "agent-client-protocol==0.10.1" + assert not (ROOT / "plugins" / "_acp" / "requirements.txt").exists() + + +def test_acp_entrypoint_attempts_lazy_dependency_install() -> None: + entry = (ROOT / "plugins" / "_acp" / "entry.py").read_text(encoding="utf-8") + assert "_import_acp_or_install()" in entry + assert "hooks.ensure_dependencies(raise_on_error=False)" in entry + + +def test_acp_initialize_metadata_uses_release_version_constant() -> None: + server = (ROOT / "plugins" / "_acp" / "helpers" / "server.py").read_text( + encoding="utf-8" + ) + assert "version=AGENT_ZERO_ACP_VERSION" in server + + +def test_context_workdir_override_is_wired() -> None: + code_execution = ( + ROOT / "plugins" / "_code_execution" / "tools" / "code_execution_tool.py" + ).read_text(encoding="utf-8") + workdir_prompt = ( + ROOT / "extensions" / "python" / "message_loop_prompts_after" / "_75_include_workdir_extras.py" + ).read_text(encoding="utf-8") + assert 'get_data("workdir_path")' in code_execution + assert 'get_data("workdir_path")' in workdir_prompt + + +def test_file_uri_path_translation() -> None: + assert str(path_from_file_uri("file:///tmp/example.py")) == "/tmp/example.py" + assert str(path_from_file_uri("file:///C:/Users/Ada/work/app.py")) == "/mnt/c/Users/Ada/work/app.py" + assert path_from_file_uri("https://example.com/app.py") is None + + +def test_prompt_blocks_inline_text_resource(tmp_path: Path) -> None: + source = tmp_path / "notes.md" + source.write_text("hello ACP", encoding="utf-8") + block = SimpleNamespace( + type="resource_link", + uri=source.as_uri(), + name="notes.md", + mime_type="text/markdown", + ) + + parts = prompt_blocks_to_user_message( + [SimpleNamespace(type="text", text="Read this"), block], + context_id="static-test", + message_id="msg", + ) + + assert "Read this" in parts.text + assert "hello ACP" in parts.text + assert parts.attachments == [] + + +def test_normalize_cwd_returns_absolute_path() -> None: + assert Path(normalize_cwd(".")).is_absolute()