Add builtin ACP plugin
Some checks are pending
Build And Publish Docker Images / plan (push) Waiting to run
Build And Publish Docker Images / build (push) Blocked by required conditions

Expose Agent Zero over the Agent Client Protocol as a builtin stdio plugin with session lifecycle support, streaming bridges, registry metadata, and editor workspace handling.

Add lazy dependency installation through the ACP plugin hook using the root requirements pin so self-updated instances can recover without a fresh Docker image.

Pin agent-client-protocol, add focused static coverage, and allow ACP sessions to override the per-context workdir for code execution and workdir prompts.
This commit is contained in:
Alessandro 2026-06-01 02:38:23 +02:00
parent a7b4fcd798
commit 46112c9750
20 changed files with 1773 additions and 3 deletions

View file

@ -50,7 +50,7 @@ class IncludeWorkdirExtras(Extension):
max_lines = set["workdir_max_lines"]
gitignore_raw = set["workdir_gitignore"]
folder = set["workdir_path"]
folder = self.agent.context.get_data("workdir_path") or set["workdir_path"]
scan_path = files.get_abs_path_development(folder)
files.create_dir(scan_path)

34
plugins/_acp/README.md Normal file
View file

@ -0,0 +1,34 @@
# Agent Client Protocol
This builtin plugin exposes Agent Zero through the Agent Client Protocol (ACP)
over stdio so ACP-capable editors can create, load, resume, and prompt Agent
Zero sessions directly from a workspace.
## Usage
From the Agent Zero repository or runtime container:
```bash
python -m plugins._acp
```
For the Dockerized Agent Zero runtime, point the editor command at the
framework interpreter inside the container, for example:
```bash
docker exec -i agent-zero /opt/venv-a0/bin/python -m plugins._acp
```
ACP reserves stdout for JSON-RPC, so the adapter writes diagnostics to stderr.
## Checks
```bash
python -m plugins._acp --check
python -m plugins._acp --registry
```
The ACP Python SDK is provided by `agent-client-protocol`.
Fresh Docker images install it through the root `requirements.txt`; self-updated
instances lazy-install the same root pin through `plugins/_acp/hooks.py` the
first time the ACP stdio entrypoint starts.

4
plugins/_acp/__init__.py Normal file
View file

@ -0,0 +1,4 @@
PLUGIN_NAME = "_acp"
PLUGIN_TITLE = "Agent Client Protocol"
AGENT_ZERO_ACP_VERSION = "1.19"
PLUGIN_VERSION = AGENT_ZERO_ACP_VERSION

5
plugins/_acp/__main__.py Normal file
View file

@ -0,0 +1,5 @@
from .entry import main
if __name__ == "__main__":
main()

View file

@ -0,0 +1,15 @@
{
"id": "agent-zero",
"name": "Agent Zero",
"version": "1.19",
"description": "Agent Zero ACP adapter for editor-native agent sessions.",
"repository": "https://github.com/agent0ai/agent-zero",
"authors": ["Agent Zero"],
"license": "MIT",
"distribution": {
"stdio": {
"command": "python",
"args": ["-m", "plugins._acp"]
}
}
}

149
plugins/_acp/entry.py Normal file
View file

@ -0,0 +1,149 @@
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from pathlib import Path
from plugins._acp import PLUGIN_VERSION
_BENIGN_PROBE_METHODS = {"ping", "health", "healthcheck"}
class _BenignProbeFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
if record.getMessage() != "Background task failed":
return True
exc_info = record.exc_info
if not exc_info:
return True
try:
from acp.exceptions import RequestError
except ImportError:
return True
exc = exc_info[1]
if not isinstance(exc, RequestError):
return True
if getattr(exc, "code", None) != -32601:
return True
data = getattr(exc, "data", None)
method = data.get("method") if isinstance(data, dict) else None
return method not in _BENIGN_PROBE_METHODS
def main(argv: list[str] | None = None) -> None:
args = _parse_args(argv)
if args.version:
print(PLUGIN_VERSION)
return
if args.registry:
registry = Path(__file__).resolve().parent / "acp_registry" / "agent.json"
print(registry.read_text(encoding="utf-8"))
return
if args.check:
_run_check()
return
_setup_logging(debug=args.debug)
_ensure_project_root()
acp = _import_acp_or_install()
from helpers import persist_chat
from plugins._acp.helpers.server import AgentZeroACPAgent
persist_chat.load_tmp_chats()
logger = logging.getLogger(__name__)
logger.info("Starting Agent Zero ACP adapter")
try:
asyncio.run(acp.run_agent(AgentZeroACPAgent(), use_unstable_protocol=True))
except KeyboardInterrupt:
logger.info("ACP adapter stopped")
except Exception:
logger.exception("ACP adapter crashed")
sys.exit(1)
def _parse_args(argv: list[str] | None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="agent-zero-acp",
description="Run Agent Zero as an Agent Client Protocol stdio server.",
)
parser.add_argument("--check", action="store_true", help="Verify ACP imports and exit")
parser.add_argument("--registry", action="store_true", help="Print ACP registry metadata")
parser.add_argument("--version", action="store_true", help="Print ACP plugin version")
parser.add_argument("--debug", action="store_true", help="Enable debug logging to stderr")
return parser.parse_args(argv)
def _setup_logging(*, debug: bool = False) -> None:
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
handler.addFilter(_BenignProbeFilter())
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(logging.DEBUG if debug else logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)
def _ensure_project_root() -> None:
project_root = str(Path(__file__).resolve().parents[2])
if project_root not in sys.path:
sys.path.insert(0, project_root)
def _run_check() -> None:
_ensure_project_root()
_import_acp_or_install()
from plugins._acp.helpers.server import AgentZeroACPAgent # noqa: F401
print("Agent Zero ACP check OK")
def _import_acp_or_install():
try:
import acp
return acp
except ImportError:
pass
from plugins._acp import hooks
if hooks.ensure_dependencies(raise_on_error=False):
try:
import acp
return acp
except ImportError:
pass
_missing_dependency()
def _missing_dependency() -> None:
print(
"Agent Zero ACP requires agent-client-protocol. "
"The plugin tried to install the root requirements pin automatically; "
"manual fallback: pip install agent-client-protocol==0.10.1",
file=sys.stderr,
)
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,9 @@
from helpers.extension import Extension
from plugins._acp.helpers import bridge
class ACPReasoningStream(Extension):
async def execute(self, stream_data=None, **kwargs):
if not self.agent or not stream_data:
return
bridge.send_agent_thought_delta(self.agent.context.id, stream_data.get("full", ""))

View file

@ -0,0 +1,16 @@
from helpers.extension import Extension
from plugins._acp.helpers import bridge
class ACPResponseStream(Extension):
async def execute(self, parsed=None, **kwargs):
if not self.agent or not isinstance(parsed, dict):
return
tool_name = parsed.get("tool_name") or parsed.get("tool")
if tool_name != "response":
return
tool_args = parsed.get("tool_args") if isinstance(parsed.get("tool_args"), dict) else parsed.get("args")
if not isinstance(tool_args, dict):
return
text = tool_args.get("text", tool_args.get("message", ""))
bridge.send_agent_delta(self.agent.context.id, str(text or ""))

View file

@ -0,0 +1,9 @@
from helpers.extension import Extension
from plugins._acp.helpers import bridge
class ACPToolFinish(Extension):
async def execute(self, response=None, tool_name="", **kwargs):
if not self.agent:
return
bridge.finish_tool(self.agent.context.id, str(tool_name or ""), response)

View file

@ -0,0 +1,9 @@
from helpers.extension import Extension
from plugins._acp.helpers import bridge
class ACPToolStart(Extension):
async def execute(self, tool_name="", tool_args=None, **kwargs):
if not self.agent:
return
bridge.start_tool(self.agent.context.id, str(tool_name or ""), tool_args or {})

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,303 @@
from __future__ import annotations
import asyncio
import json
import logging
import threading
import uuid
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import Any, Deque
logger = logging.getLogger(__name__)
CTX_IS_ACP = "acp_session"
CTX_CWD = "acp_cwd"
CTX_ADDITIONAL_DIRECTORIES = "acp_additional_directories"
CTX_MODE = "acp_mode"
CTX_CONFIG_OPTIONS = "acp_config_options"
CTX_MODEL_ID = "acp_model_id"
CTX_WORKDIR = "workdir_path"
DEFAULT_MODE = "default"
_MAX_RAW_OUTPUT = 12000
@dataclass
class SessionBridge:
context_id: str
session_id: str
conn: Any
loop: asyncio.AbstractEventLoop
response_text_sent: str = ""
thought_text_sent: str = ""
message_id: str = ""
tool_queues: dict[str, Deque[str]] = field(default_factory=lambda: defaultdict(deque))
lock: threading.RLock = field(default_factory=threading.RLock)
_bridges_by_context: dict[str, SessionBridge] = {}
_bridges_by_session: dict[str, SessionBridge] = {}
_registry_lock = threading.RLock()
def register_bridge(
*,
context_id: str,
session_id: str,
conn: Any,
loop: asyncio.AbstractEventLoop,
) -> SessionBridge:
bridge = SessionBridge(
context_id=context_id,
session_id=session_id,
conn=conn,
loop=loop,
)
with _registry_lock:
_bridges_by_context[context_id] = bridge
_bridges_by_session[session_id] = bridge
return bridge
def unregister_bridge(context_id: str | None = None, session_id: str | None = None) -> None:
with _registry_lock:
bridge = None
if context_id:
bridge = _bridges_by_context.pop(context_id, None)
if session_id:
bridge = _bridges_by_session.pop(session_id, None) or bridge
if bridge:
_bridges_by_context.pop(bridge.context_id, None)
_bridges_by_session.pop(bridge.session_id, None)
def get_bridge_for_context(context_id: str) -> SessionBridge | None:
with _registry_lock:
return _bridges_by_context.get(context_id)
def get_bridge_for_session(session_id: str) -> SessionBridge | None:
with _registry_lock:
return _bridges_by_session.get(session_id)
def reset_turn(context_id: str, message_id: str = "") -> None:
bridge = get_bridge_for_context(context_id)
if not bridge:
return
with bridge.lock:
bridge.response_text_sent = ""
bridge.thought_text_sent = ""
bridge.message_id = message_id
bridge.tool_queues.clear()
def send_agent_delta(context_id: str, full_text: str) -> bool:
bridge = get_bridge_for_context(context_id)
if not bridge:
return False
full_text = str(full_text or "")
with bridge.lock:
previous = bridge.response_text_sent
if full_text == previous:
return False
if full_text.startswith(previous):
delta = full_text[len(previous) :]
else:
delta = full_text
bridge.response_text_sent = full_text
if not delta:
return False
return _send_acp_helper_update(bridge, "update_agent_message_text", delta)
def send_agent_text(context_id: str, text: str) -> bool:
bridge = get_bridge_for_context(context_id)
if not bridge:
return False
text = str(text or "")
if not text:
return False
with bridge.lock:
bridge.response_text_sent += text
return _send_acp_helper_update(bridge, "update_agent_message_text", text)
def send_agent_thought_delta(context_id: str, full_text: str) -> bool:
bridge = get_bridge_for_context(context_id)
if not bridge:
return False
full_text = str(full_text or "")
with bridge.lock:
previous = bridge.thought_text_sent
if full_text == previous:
return False
if full_text.startswith(previous):
delta = full_text[len(previous) :]
else:
delta = full_text
bridge.thought_text_sent = full_text
if not delta:
return False
return _send_acp_helper_update(bridge, "update_agent_thought_text", delta)
def start_tool(context_id: str, tool_name: str, tool_args: dict[str, Any] | None = None) -> str:
bridge = get_bridge_for_context(context_id)
if not bridge:
return ""
normalized_name = str(tool_name or "tool")
if normalized_name == "response":
return ""
tool_call_id = f"a0-{uuid.uuid4().hex}"
title = _tool_title(normalized_name, tool_args or {})
kind = _tool_kind(normalized_name, tool_args or {})
raw_input = _json_safe(tool_args or {})
try:
import acp
update = acp.start_tool_call(
tool_call_id,
title,
kind=kind,
status="in_progress",
raw_input=raw_input,
)
except Exception:
logger.debug("Could not build ACP tool start", exc_info=True)
return ""
with bridge.lock:
bridge.tool_queues[normalized_name].append(tool_call_id)
_send_update(bridge, update)
return tool_call_id
def finish_tool(context_id: str, tool_name: str, response: Any) -> bool:
bridge = get_bridge_for_context(context_id)
if not bridge:
return False
normalized_name = str(tool_name or "tool")
if normalized_name == "response":
return False
with bridge.lock:
queue = bridge.tool_queues.get(normalized_name)
if not queue:
return False
tool_call_id = queue.popleft()
if not queue:
bridge.tool_queues.pop(normalized_name, None)
message = getattr(response, "message", response)
raw_output = _truncate_raw_output(message)
try:
import acp
update = acp.update_tool_call(
tool_call_id,
status="completed",
raw_output=raw_output,
)
except Exception:
logger.debug("Could not build ACP tool completion", exc_info=True)
return False
_send_update(bridge, update)
return True
async def send_update_async(session_id: str, update: Any) -> bool:
bridge = get_bridge_for_session(session_id)
if not bridge:
return False
try:
await bridge.conn.session_update(session_id, update)
return True
except Exception:
logger.debug("Could not send ACP update", exc_info=True)
return False
def _send_acp_helper_update(bridge: SessionBridge, helper_name: str, text: str) -> bool:
try:
import acp
helper = getattr(acp, helper_name)
update = helper(text)
except Exception:
logger.debug("Could not build ACP text update", exc_info=True)
return False
_send_update(bridge, update)
return True
def _send_update(bridge: SessionBridge, update: Any) -> None:
if bridge.loop.is_closed():
return
async def _deliver() -> None:
await bridge.conn.session_update(bridge.session_id, update)
future = asyncio.run_coroutine_threadsafe(_deliver(), bridge.loop)
def _log_failure(done_future: Any) -> None:
try:
done_future.result()
except Exception:
logger.debug("Failed to send ACP session update", exc_info=True)
future.add_done_callback(_log_failure)
def _tool_title(tool_name: str, tool_args: dict[str, Any]) -> str:
action = tool_args.get("action") or tool_args.get("method") or ""
if action:
return f"{tool_name}: {action}"
return tool_name.replace("_", " ").strip().title() or "Tool"
def _tool_kind(tool_name: str, tool_args: dict[str, Any]) -> str:
probe = " ".join(
str(value).lower()
for value in (tool_name, tool_args.get("action", ""), tool_args.get("method", ""))
)
if any(token in probe for token in ("read", "list", "show", "inspect")):
return "read"
if any(token in probe for token in ("write", "edit", "patch", "replace", "create")):
return "edit"
if any(token in probe for token in ("delete", "remove")):
return "delete"
if any(token in probe for token in ("move", "rename")):
return "move"
if any(token in probe for token in ("search", "find", "grep")):
return "search"
if any(token in probe for token in ("terminal", "shell", "python", "node", "code_execution")):
return "execute"
if any(token in probe for token in ("browser", "fetch", "http")):
return "fetch"
return "other"
def _json_safe(value: Any) -> Any:
try:
json.dumps(value)
return value
except Exception:
return str(value)
def _truncate_raw_output(value: Any) -> Any:
if value is None:
return None
if isinstance(value, str):
if len(value) <= _MAX_RAW_OUTPUT:
return value
hidden = len(value) - _MAX_RAW_OUTPUT
return value[:_MAX_RAW_OUTPUT] + f"\n\n[ACP output truncated: {hidden} characters hidden]"
return _json_safe(value)

View file

@ -0,0 +1,370 @@
from __future__ import annotations
import base64
import json
import mimetypes
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlparse
from helpers import files
from helpers.security import safe_filename
MAX_INLINE_RESOURCE_BYTES = 512 * 1024
MAX_ATTACHMENT_BYTES = 16 * 1024 * 1024
_TEXT_MIME_PREFIXES = ("text/",)
_TEXT_MIME_TYPES = {
"application/json",
"application/javascript",
"application/typescript",
"application/xml",
"application/x-yaml",
"application/yaml",
"application/toml",
"application/sql",
}
@dataclass
class PromptParts:
text: str = ""
attachments: list[str] = field(default_factory=list)
def normalize_cwd(cwd: str | None) -> str:
raw = str(cwd or "").strip() or os.getcwd()
raw = os.path.expanduser(raw)
translated = translate_windows_drive_path(raw)
if translated:
raw = translated
return os.path.abspath(raw)
def normalize_path_for_compare(path: str | None) -> str:
return os.path.normcase(os.path.normpath(normalize_cwd(path)))
def translate_windows_drive_path(path: str) -> str | None:
match = re.match(r"^/?([A-Za-z]):[\\/](.*)$", str(path or ""))
if not match:
return None
drive = match.group(1).lower()
tail = match.group(2).replace("\\", "/").lstrip("/")
return f"/mnt/{drive}/{tail}"
def path_from_file_uri(uri: str) -> Path | None:
raw = str(uri or "").strip()
if not raw:
return None
parsed = urlparse(raw)
if parsed.scheme and parsed.scheme != "file":
return None
if parsed.scheme == "file":
if parsed.netloc and parsed.netloc not in {"", "localhost"}:
return None
path_text = unquote(parsed.path or "")
else:
path_text = unquote(raw)
translated = translate_windows_drive_path(path_text)
if translated:
return Path(translated)
return Path(path_text)
def stringify_message_content(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, dict):
preview = content.get("preview")
if isinstance(preview, str) and preview.strip():
return preview
raw = content.get("raw_content")
if raw is not None:
return stringify_message_content(raw)
if isinstance(content, list):
parts = [stringify_message_content(item) for item in content]
return "\n".join(part for part in parts if part)
try:
return json.dumps(content, ensure_ascii=False)
except Exception:
return str(content)
def prompt_blocks_to_user_message(
prompt: list[Any],
*,
context_id: str,
message_id: str,
) -> PromptParts:
text_parts: list[str] = []
attachments: list[str] = []
for index, block in enumerate(prompt or []):
block_type = str(getattr(block, "type", "") or "").strip()
if block_type == "text" or hasattr(block, "text"):
text = str(getattr(block, "text", "") or "")
if text:
text_parts.append(text)
continue
if block_type == "image":
_append_media_block(
block,
context_id=context_id,
message_id=message_id,
index=index,
kind="image",
text_parts=text_parts,
attachments=attachments,
)
continue
if block_type == "audio":
_append_media_block(
block,
context_id=context_id,
message_id=message_id,
index=index,
kind="audio",
text_parts=text_parts,
attachments=attachments,
)
continue
if block_type == "resource_link":
_append_resource_link(block, text_parts=text_parts, attachments=attachments)
continue
if block_type == "resource":
_append_embedded_resource(
block,
context_id=context_id,
message_id=message_id,
index=index,
text_parts=text_parts,
attachments=attachments,
)
text = "\n\n".join(part for part in text_parts if part).strip()
if not text and attachments:
text = "Please inspect the attached file(s)."
return PromptParts(text=text, attachments=attachments)
def _append_media_block(
block: Any,
*,
context_id: str,
message_id: str,
index: int,
kind: str,
text_parts: list[str],
attachments: list[str],
) -> None:
uri = str(getattr(block, "uri", "") or "").strip()
mime_type = str(getattr(block, "mime_type", "") or "").strip() or None
data = str(getattr(block, "data", "") or "").strip()
if uri:
path = path_from_file_uri(uri)
if path and path.exists():
attachments.append(str(path))
text_parts.append(f"[Attached {kind}: {path}]")
return
text_parts.append(f"[Attached {kind} reference: {uri}]")
return
if not data:
return
try:
raw = base64.b64decode(data.split(",", 1)[-1], validate=False)
except Exception:
raw = data.encode("utf-8", errors="replace")
saved = _save_attachment_bytes(
context_id=context_id,
message_id=message_id,
index=index,
label=kind,
data=raw,
mime_type=mime_type,
)
attachments.append(saved)
text_parts.append(f"[Attached {kind}: {saved}]")
def _append_resource_link(
block: Any,
*,
text_parts: list[str],
attachments: list[str],
) -> None:
uri = str(getattr(block, "uri", "") or "").strip()
if not uri:
return
name = _resource_display_name(block, uri)
mime_type = str(getattr(block, "mime_type", "") or "").strip() or None
path = path_from_file_uri(uri)
if path is None:
text_parts.append(f"[Attached resource: {name}]\nURI: {uri}")
return
if not path.exists():
text_parts.append(f"[Attached resource unavailable: {name}]\nURI: {uri}\nPath: {path}")
return
if _is_probably_text(path, mime_type):
try:
size = path.stat().st_size
data = path.read_bytes()[:MAX_INLINE_RESOURCE_BYTES]
body = decode_text_bytes(data, mime_type) or ""
if size > MAX_INLINE_RESOURCE_BYTES:
body += f"\n\n[Truncated to {MAX_INLINE_RESOURCE_BYTES} of {size} bytes]"
text_parts.append(f"[Attached file: {name}]\nURI: {uri}\n\n{body}")
return
except OSError as exc:
text_parts.append(f"[Attached file unreadable: {name}]\nURI: {uri}\nError: {exc}")
return
attachments.append(str(path))
text_parts.append(f"[Attached file: {path}]")
def _append_embedded_resource(
block: Any,
*,
context_id: str,
message_id: str,
index: int,
text_parts: list[str],
attachments: list[str],
) -> None:
resource = getattr(block, "resource", None)
if resource is None:
return
uri = str(getattr(resource, "uri", "") or "").strip()
mime_type = str(getattr(resource, "mime_type", "") or "").strip() or None
if hasattr(resource, "text"):
body = str(getattr(resource, "text", "") or "")
text_parts.append(f"[Attached resource: {uri or 'embedded text'}]\n\n{body}")
return
blob = str(getattr(resource, "blob", "") or "")
if not blob:
return
try:
data = base64.b64decode(blob, validate=True)
except Exception:
data = blob.encode("utf-8", errors="replace")
text = decode_text_bytes(data[:MAX_INLINE_RESOURCE_BYTES], mime_type)
if text is not None and _is_text_mime(mime_type):
if len(data) > MAX_INLINE_RESOURCE_BYTES:
text += f"\n\n[Truncated to {MAX_INLINE_RESOURCE_BYTES} of {len(data)} bytes]"
text_parts.append(f"[Attached resource: {uri or 'embedded text'}]\n\n{text}")
return
saved = _save_attachment_bytes(
context_id=context_id,
message_id=message_id,
index=index,
label="resource",
data=data,
mime_type=mime_type,
)
attachments.append(saved)
text_parts.append(f"[Attached resource: {saved}]")
def _save_attachment_bytes(
*,
context_id: str,
message_id: str,
index: int,
label: str,
data: bytes,
mime_type: str | None,
) -> str:
if len(data) > MAX_ATTACHMENT_BYTES:
data = data[:MAX_ATTACHMENT_BYTES]
extension = mimetypes.guess_extension((mime_type or "").split(";", 1)[0]) or ".bin"
base_name = safe_filename(f"{label}-{message_id}-{index}{extension}") or f"{label}-{index}{extension}"
from helpers import persist_chat
attach_dir = files.get_abs_path(persist_chat.get_chat_folder_path(context_id), "acp")
os.makedirs(attach_dir, exist_ok=True)
path = os.path.join(attach_dir, base_name)
with open(path, "wb") as handle:
handle.write(data)
return files.normalize_a0_path(path)
def _resource_display_name(block: Any, uri: str) -> str:
title = str(getattr(block, "title", "") or "").strip()
name = str(getattr(block, "name", "") or "").strip()
if title and name and title != name:
return f"{title} ({name})"
if title:
return title
if name:
return name
parsed = urlparse(uri)
return Path(unquote(parsed.path or uri)).name or uri or "resource"
def _is_text_mime(mime_type: str | None) -> bool:
mime = (mime_type or "").split(";", 1)[0].strip().lower()
if not mime:
return False
return mime.startswith(_TEXT_MIME_PREFIXES) or mime in _TEXT_MIME_TYPES
def _is_probably_text(path: Path, mime_type: str | None) -> bool:
if _is_text_mime(mime_type):
return True
guessed, _encoding = mimetypes.guess_type(str(path))
if _is_text_mime(guessed):
return True
return path.suffix.lower() in {
".md",
".txt",
".py",
".js",
".ts",
".tsx",
".jsx",
".json",
".yaml",
".yml",
".toml",
".xml",
".html",
".css",
".sql",
".sh",
}
def decode_text_bytes(data: bytes, mime_type: str | None = None) -> str | None:
if b"\x00" in data and not _is_text_mime(mime_type):
return None
for encoding in ("utf-8-sig", "utf-8", "latin-1"):
try:
return data.decode(encoding)
except UnicodeDecodeError:
continue
return data.decode("utf-8", errors="replace")

View file

@ -0,0 +1,627 @@
from __future__ import annotations
import asyncio
import logging
import uuid
from concurrent.futures import CancelledError
from pathlib import Path
from typing import Any
import acp
from acp.schema import (
AgentCapabilities,
AvailableCommand,
AvailableCommandsUpdate,
ClientCapabilities,
CloseSessionResponse,
CurrentModeUpdate,
ForkSessionResponse,
Implementation,
InitializeResponse,
ListSessionsResponse,
LoadSessionResponse,
NewSessionResponse,
PromptCapabilities,
PromptResponse,
ResumeSessionResponse,
SessionCapabilities,
SessionForkCapabilities,
SessionInfo,
SessionListCapabilities,
SessionMode,
SessionModeState,
SessionResumeCapabilities,
SetSessionConfigOptionResponse,
SetSessionModelResponse,
SetSessionModeResponse,
UnstructuredCommandInput,
Usage,
)
from agent import Agent, AgentContext, AgentContextType, UserMessage
from helpers import git, message_queue as mq, persist_chat, tokens
from helpers.localization import Localization
from initialize import initialize_agent
from plugins._acp import AGENT_ZERO_ACP_VERSION, PLUGIN_VERSION
from plugins._acp.helpers import bridge
from plugins._acp.helpers.content import (
normalize_cwd,
normalize_path_for_compare,
prompt_blocks_to_user_message,
stringify_message_content,
)
logger = logging.getLogger(__name__)
class AgentZeroACPAgent(acp.Agent):
_ADVERTISED_COMMANDS = (
{
"name": "help",
"description": "List Agent Zero ACP commands",
},
{
"name": "context",
"description": "Show conversation and context-window status",
},
{
"name": "reset",
"description": "Clear the current Agent Zero conversation",
},
{
"name": "version",
"description": "Show Agent Zero and ACP plugin versions",
},
)
_MODE_DEFAULT = bridge.DEFAULT_MODE
_MODE_PLAN = "plan"
_MODE_ACT = "act"
_MODE_INSTRUCTIONS = {
_MODE_PLAN: (
"ACP session mode: plan first. Prefer analysis, architecture, and tradeoffs. "
"Do not modify files unless the user explicitly asks you to proceed."
),
_MODE_ACT: (
"ACP session mode: act. Complete the requested work end-to-end with focused "
"implementation and validation."
),
}
def __init__(self) -> None:
super().__init__()
self._conn: acp.Client | None = None
def on_connect(self, conn: acp.Client) -> None:
self._conn = conn
logger.info("ACP client connected")
async def initialize(
self,
protocol_version: int | None = None,
client_capabilities: ClientCapabilities | None = None,
client_info: Implementation | None = None,
**kwargs: Any,
) -> InitializeResponse:
client_name = client_info.name if client_info else "unknown"
logger.info("ACP initialize from %s (protocol v%s)", client_name, protocol_version)
return InitializeResponse(
protocol_version=acp.PROTOCOL_VERSION,
agent_info=Implementation(
name="agent-zero",
title="Agent Zero",
version=AGENT_ZERO_ACP_VERSION,
),
agent_capabilities=AgentCapabilities(
load_session=True,
prompt_capabilities=PromptCapabilities(
embedded_context=True,
image=True,
audio=True,
),
session_capabilities=SessionCapabilities(
fork=SessionForkCapabilities(),
list=SessionListCapabilities(),
resume=SessionResumeCapabilities(),
),
),
auth_methods=[],
)
async def new_session(
self,
cwd: str,
additional_directories: list[str] | None = None,
**kwargs: Any,
) -> NewSessionResponse:
context = self._create_context(cwd=cwd, additional_directories=additional_directories)
self._register(context)
await self._send_session_start_updates(context)
persist_chat.save_tmp_chat(context)
return NewSessionResponse(session_id=context.id, modes=self._session_modes(context))
async def load_session(
self,
cwd: str,
session_id: str,
additional_directories: list[str] | None = None,
**kwargs: Any,
) -> LoadSessionResponse | None:
context = self._get_context(session_id)
if context is None:
logger.warning("ACP load_session: missing session %s", session_id)
return None
self._apply_workspace(context, cwd=cwd, additional_directories=additional_directories)
self._register(context)
await self._replay_history(context)
await self._send_session_start_updates(context)
persist_chat.save_tmp_chat(context)
return LoadSessionResponse(modes=self._session_modes(context))
async def resume_session(
self,
cwd: str,
session_id: str,
additional_directories: list[str] | None = None,
**kwargs: Any,
) -> ResumeSessionResponse:
context = self._get_context(session_id)
if context is None:
context = self._create_context(
cwd=cwd,
additional_directories=additional_directories,
context_id=session_id,
)
else:
self._apply_workspace(context, cwd=cwd, additional_directories=additional_directories)
self._register(context)
await self._replay_history(context)
await self._send_session_start_updates(context)
persist_chat.save_tmp_chat(context)
return ResumeSessionResponse(modes=self._session_modes(context))
async def fork_session(
self,
cwd: str,
session_id: str,
additional_directories: list[str] | None = None,
**kwargs: Any,
) -> ForkSessionResponse:
original = self._get_context(session_id)
if original is None:
return ForkSessionResponse(session_id="")
new_ids = persist_chat.load_json_chats([persist_chat.export_json_chat(original)])
new_id = new_ids[0] if new_ids else ""
context = self._get_context(new_id) if new_id else None
if context is None:
return ForkSessionResponse(session_id="")
context.name = f"{original.name or 'ACP session'} (fork)"
self._apply_workspace(context, cwd=cwd, additional_directories=additional_directories)
self._register(context)
await self._send_session_start_updates(context)
persist_chat.save_tmp_chat(context)
return ForkSessionResponse(session_id=context.id, modes=self._session_modes(context))
async def list_sessions(
self,
cursor: str | None = None,
cwd: str | None = None,
**kwargs: Any,
) -> ListSessionsResponse:
persist_chat.load_tmp_chats()
contexts = [ctx for ctx in AgentContext.all() if ctx.get_data(bridge.CTX_IS_ACP)]
if cwd:
normalized = normalize_path_for_compare(cwd)
contexts = [
ctx
for ctx in contexts
if normalize_path_for_compare(ctx.get_data(bridge.CTX_CWD) or ctx.get_data(bridge.CTX_WORKDIR))
== normalized
]
contexts.sort(key=lambda ctx: ctx.last_message or ctx.created_at, reverse=True)
if cursor:
for idx, context in enumerate(contexts):
if context.id == cursor:
contexts = contexts[idx + 1 :]
break
else:
contexts = []
page = contexts[:50]
next_cursor = contexts[50].id if len(contexts) > 50 else None
sessions = [self._session_info(context) for context in page]
return ListSessionsResponse(sessions=sessions, next_cursor=next_cursor)
async def prompt(
self,
prompt: list[Any],
session_id: str,
message_id: str | None = None,
**kwargs: Any,
) -> PromptResponse:
context = self._get_context(session_id)
if context is None:
logger.warning("ACP prompt: missing session %s", session_id)
return PromptResponse(stop_reason="refusal", user_message_id=message_id)
self._register(context)
msg_id = message_id or str(uuid.uuid4())
parts = prompt_blocks_to_user_message(
prompt,
context_id=context.id,
message_id=msg_id,
)
user_text = parts.text.strip()
if not user_text and not parts.attachments:
return PromptResponse(stop_reason="end_turn", user_message_id=msg_id)
context.last_message = Localization.get().now()
text_only = bool(prompt) and all(str(getattr(block, "type", "") or "") == "text" for block in prompt)
if text_only and not parts.attachments and user_text.startswith("/"):
response_text = await self._handle_slash_command(user_text, context)
if response_text is not None:
await self._send_agent_message(context.id, response_text)
persist_chat.save_tmp_chat(context)
return PromptResponse(stop_reason="end_turn", user_message_id=msg_id)
if context.is_running():
await self._send_agent_message(
context.id,
"A turn is already running. Wait for it to finish or send cancel from the ACP client.",
)
return PromptResponse(stop_reason="end_turn", user_message_id=msg_id)
mode_instruction = self._mode_instruction(context)
system_message = [mode_instruction] if mode_instruction else []
mq.log_user_message(
context,
user_text,
parts.attachments,
message_id=msg_id,
source=" (ACP)",
)
bridge.reset_turn(context.id, msg_id)
task = context.communicate(
UserMessage(
message=user_text,
attachments=parts.attachments,
system_message=system_message,
id=msg_id,
)
)
stop_reason = "end_turn"
final_text = ""
try:
result = await task.result()
final_text = "" if result is None else str(result)
except (asyncio.CancelledError, CancelledError):
stop_reason = "cancelled"
except Exception as exc:
logger.exception("ACP prompt failed for session %s", session_id)
final_text = f"Error: {exc}"
stop_reason = "end_turn"
session_bridge = bridge.get_bridge_for_context(context.id)
if final_text and session_bridge and not session_bridge.response_text_sent.strip():
await self._send_agent_message(context.id, final_text)
elif final_text and session_bridge is None:
await self._send_agent_message(context.id, final_text)
persist_chat.save_tmp_chat(context)
return PromptResponse(
stop_reason=stop_reason,
usage=self._usage(context, final_text),
user_message_id=msg_id,
)
async def cancel(self, session_id: str, **kwargs: Any) -> None:
context = self._get_context(session_id)
if context:
context.kill_process()
bridge.reset_turn(context.id)
logger.info("ACP cancelled session %s", session_id)
async def close_session(self, session_id: str, **kwargs: Any) -> CloseSessionResponse | None:
context = self._get_context(session_id)
if context:
context.kill_process()
AgentContext.remove(context.id)
persist_chat.remove_chat(context.id)
bridge.unregister_bridge(session_id=session_id)
return CloseSessionResponse()
async def set_session_mode(
self,
mode_id: str,
session_id: str,
**kwargs: Any,
) -> SetSessionModeResponse | None:
context = self._get_context(session_id)
if context is None:
return None
normalized = str(mode_id or self._MODE_DEFAULT).strip()
if normalized not in {self._MODE_DEFAULT, self._MODE_PLAN, self._MODE_ACT}:
normalized = self._MODE_DEFAULT
context.set_data(bridge.CTX_MODE, normalized)
persist_chat.save_tmp_chat(context)
await self._send_current_mode(context)
return SetSessionModeResponse()
async def set_session_model(
self,
model_id: str,
session_id: str,
**kwargs: Any,
) -> SetSessionModelResponse | None:
context = self._get_context(session_id)
if context is None:
return None
context.set_data(bridge.CTX_MODEL_ID, str(model_id or ""))
persist_chat.save_tmp_chat(context)
return SetSessionModelResponse()
async def set_config_option(
self,
config_id: str,
session_id: str,
value: str | bool,
**kwargs: Any,
) -> SetSessionConfigOptionResponse | None:
context = self._get_context(session_id)
if context is None:
return None
options = context.get_data(bridge.CTX_CONFIG_OPTIONS) or {}
if not isinstance(options, dict):
options = {}
options[str(config_id)] = value
context.set_data(bridge.CTX_CONFIG_OPTIONS, options)
persist_chat.save_tmp_chat(context)
return SetSessionConfigOptionResponse(config_options=[])
async def ext_method(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
if method in {"ping", "health", "healthcheck"}:
return {"ok": True}
return {}
async def ext_notification(self, method: str, params: dict[str, Any]) -> None:
return None
def _create_context(
self,
*,
cwd: str,
additional_directories: list[str] | None = None,
context_id: str | None = None,
) -> AgentContext:
resolved_cwd = normalize_cwd(cwd)
context = AgentContext(
config=initialize_agent(),
id=context_id or None,
name=self._title_for_cwd(resolved_cwd),
type=AgentContextType.USER,
)
self._apply_workspace(context, cwd=resolved_cwd, additional_directories=additional_directories)
return context
def _get_context(self, session_id: str) -> AgentContext | None:
context = AgentContext.get(session_id)
if context is not None:
return context
persist_chat.load_tmp_chats()
return AgentContext.get(session_id)
def _apply_workspace(
self,
context: AgentContext,
*,
cwd: str,
additional_directories: list[str] | None = None,
) -> None:
resolved_cwd = normalize_cwd(cwd)
context.set_data(bridge.CTX_IS_ACP, True)
context.set_data(bridge.CTX_CWD, resolved_cwd)
context.set_data(bridge.CTX_WORKDIR, resolved_cwd)
context.set_data(
bridge.CTX_ADDITIONAL_DIRECTORIES,
[normalize_cwd(path) for path in (additional_directories or []) if path],
)
context.set_data(bridge.CTX_MODE, context.get_data(bridge.CTX_MODE) or self._MODE_DEFAULT)
if not context.name:
context.name = self._title_for_cwd(resolved_cwd)
def _register(self, context: AgentContext) -> None:
if not self._conn:
return
bridge.register_bridge(
context_id=context.id,
session_id=context.id,
conn=self._conn,
loop=asyncio.get_running_loop(),
)
def _session_modes(self, context: AgentContext) -> SessionModeState:
current = str(context.get_data(bridge.CTX_MODE) or self._MODE_DEFAULT)
if current not in {self._MODE_DEFAULT, self._MODE_PLAN, self._MODE_ACT}:
current = self._MODE_DEFAULT
return SessionModeState(
current_mode_id=current,
available_modes=[
SessionMode(
id=self._MODE_DEFAULT,
name="Default",
description="Use the normal Agent Zero behavior.",
),
SessionMode(
id=self._MODE_PLAN,
name="Plan",
description="Prefer planning and analysis before changing files.",
),
SessionMode(
id=self._MODE_ACT,
name="Act",
description="Proceed end-to-end when the request is actionable.",
),
],
)
def _mode_instruction(self, context: AgentContext) -> str:
return self._MODE_INSTRUCTIONS.get(str(context.get_data(bridge.CTX_MODE) or ""), "")
async def _send_session_start_updates(self, context: AgentContext) -> None:
await self._send_available_commands(context)
await self._send_current_mode(context)
async def _send_available_commands(self, context: AgentContext) -> None:
if not self._conn:
return
try:
await self._conn.session_update(
context.id,
AvailableCommandsUpdate(
session_update="available_commands_update",
available_commands=self._available_commands(),
),
)
except Exception:
logger.debug("Could not advertise ACP commands", exc_info=True)
async def _send_current_mode(self, context: AgentContext) -> None:
if not self._conn:
return
try:
update = CurrentModeUpdate(
session_update="current_mode_update",
current_mode_id=str(context.get_data(bridge.CTX_MODE) or self._MODE_DEFAULT),
)
await self._conn.session_update(context.id, update)
except Exception:
logger.debug("Could not send ACP current mode", exc_info=True)
@classmethod
def _available_commands(cls) -> list[AvailableCommand]:
commands: list[AvailableCommand] = []
for spec in cls._ADVERTISED_COMMANDS:
input_hint = spec.get("input_hint")
commands.append(
AvailableCommand(
name=spec["name"],
description=spec["description"],
input=UnstructuredCommandInput(hint=input_hint) if input_hint else None,
)
)
return commands
async def _replay_history(self, context: AgentContext) -> None:
if not self._conn:
return
try:
outputs = context.agent0.history.output()
except Exception:
logger.debug("Could not read Agent Zero history for ACP replay", exc_info=True)
return
for item in outputs:
text = stringify_message_content(item.get("content")).strip()
if not text:
continue
update = (
acp.update_agent_message_text(text)
if item.get("ai")
else acp.update_user_message_text(text)
)
try:
await self._conn.session_update(context.id, update)
except Exception:
logger.debug("Could not replay ACP history", exc_info=True)
return
async def _send_agent_message(self, session_id: str, text: str) -> None:
if not self._conn or not text:
return
try:
await self._conn.session_update(session_id, acp.update_agent_message_text(text))
except Exception:
logger.debug("Could not send ACP agent message", exc_info=True)
async def _handle_slash_command(self, text: str, context: AgentContext) -> str | None:
command, _, args = text.partition(" ")
command = command.lstrip("/").lower().strip()
args = args.strip()
if command == "help":
lines = ["Available commands:", ""]
for spec in self._ADVERTISED_COMMANDS:
lines.append(f"/{spec['name']}: {spec['description']}")
return "\n".join(lines)
if command == "context":
return self._context_summary(context)
if command == "reset":
context.reset()
self._apply_workspace(
context,
cwd=context.get_data(bridge.CTX_CWD) or context.get_data(bridge.CTX_WORKDIR),
additional_directories=context.get_data(bridge.CTX_ADDITIONAL_DIRECTORIES) or [],
)
return "Conversation history cleared."
if command == "version":
return f"Agent Zero {git.get_version()}\nACP plugin {PLUGIN_VERSION}\nACP protocol {acp.PROTOCOL_VERSION}"
return None
def _context_summary(self, context: AgentContext) -> str:
outputs = context.agent0.history.output()
user_count = sum(1 for item in outputs if not item.get("ai"))
assistant_count = sum(1 for item in outputs if item.get("ai"))
history_tokens = context.agent0.history.get_tokens()
window = context.agent0.get_data(Agent.DATA_NAME_CTX_WINDOW) or {}
window_tokens = window.get("tokens", 0) if isinstance(window, dict) else 0
cwd = context.get_data(bridge.CTX_CWD) or context.get_data(bridge.CTX_WORKDIR) or ""
lines = [
f"Session: {context.id}",
f"Workspace: {cwd}",
f"Messages: {user_count} user, {assistant_count} assistant",
f"History tokens: ~{history_tokens:,}",
]
if window_tokens:
lines.append(f"Last context window: ~{int(window_tokens):,} tokens")
mode = context.get_data(bridge.CTX_MODE) or self._MODE_DEFAULT
lines.append(f"Mode: {mode}")
return "\n".join(lines)
def _usage(self, context: AgentContext, final_text: str) -> Usage | None:
window = context.agent0.get_data(Agent.DATA_NAME_CTX_WINDOW) or {}
input_tokens = int(window.get("tokens", 0) or 0) if isinstance(window, dict) else 0
output_tokens = tokens.approximate_tokens(final_text or "")
total = input_tokens + output_tokens
if total <= 0:
return None
return Usage(
input_tokens=max(input_tokens, 0),
output_tokens=max(output_tokens, 0),
total_tokens=max(total, 0),
)
def _session_info(self, context: AgentContext) -> SessionInfo:
cwd = context.get_data(bridge.CTX_CWD) or context.get_data(bridge.CTX_WORKDIR) or "."
updated_at = context.last_message or context.created_at or Localization.get().now()
if hasattr(updated_at, "isoformat"):
updated_at_str = updated_at.isoformat()
else:
updated_at_str = str(updated_at)
return SessionInfo(
session_id=context.id,
cwd=normalize_cwd(cwd),
title=context.name or self._title_for_cwd(cwd),
updated_at=updated_at_str,
additional_directories=context.get_data(bridge.CTX_ADDITIONAL_DIRECTORIES) or None,
)
@staticmethod
def _title_for_cwd(cwd: str) -> str:
name = Path(str(cwd or "")).name
return f"ACP: {name or 'workspace'}"

103
plugins/_acp/hooks.py Normal file
View file

@ -0,0 +1,103 @@
from __future__ import annotations
import importlib
import importlib.util
import re
import shutil
import subprocess
import sys
import threading
from pathlib import Path
from helpers.errors import format_error
from helpers.print_style import PrintStyle
PLUGIN_NAME = "_acp"
IMPORT_NAME = "acp"
DISTRIBUTION_NAME = "agent-client-protocol"
FALLBACK_REQUIREMENT = "agent-client-protocol==0.10.1"
_LOCK = threading.Lock()
_CHECKED = False
_PLUGIN_DIR = Path(__file__).resolve().parent
_PROJECT_ROOT = _PLUGIN_DIR.parents[1]
_ROOT_REQUIREMENTS_FILE = _PROJECT_ROOT / "requirements.txt"
def has_acp() -> bool:
return importlib.util.find_spec(IMPORT_NAME) is not None
def get_acp_requirement() -> str:
if not _ROOT_REQUIREMENTS_FILE.is_file():
return FALLBACK_REQUIREMENT
pattern = re.compile(rf"^\s*{re.escape(DISTRIBUTION_NAME)}\s*(?:[<>=!~]=?|===).*$")
for raw_line in _ROOT_REQUIREMENTS_FILE.read_text(encoding="utf-8").splitlines():
line = raw_line.split("#", 1)[0].strip()
if line and pattern.match(line):
return line
return FALLBACK_REQUIREMENT
def ensure_dependencies(raise_on_error: bool = True) -> bool:
"""Install the ACP SDK into the framework runtime when self-updates need it."""
global _CHECKED
if _CHECKED and has_acp():
return True
with _LOCK:
if _CHECKED and has_acp():
return True
if has_acp():
_CHECKED = True
return True
requirement = get_acp_requirement()
try:
_install_requirement(requirement)
importlib.invalidate_caches()
if not has_acp():
raise RuntimeError(
f"ACP dependency '{requirement}' is still unavailable after installation"
)
_CHECKED = True
return True
except Exception as exc:
message = f"Agent Zero ACP: failed to install {requirement}: {format_error(exc)}"
if raise_on_error:
raise RuntimeError(message) from exc
PrintStyle.error(message)
return False
def install() -> bool:
return ensure_dependencies(raise_on_error=True)
def _install_requirement(requirement: str) -> None:
cmd = _install_command(requirement)
PrintStyle.info(f"Agent Zero ACP: installing {requirement}")
subprocess.check_call(cmd, cwd=str(_PROJECT_ROOT))
def _install_command(requirement: str) -> list[str]:
uv = shutil.which("uv")
if uv:
return [
uv,
"pip",
"install",
"--python",
sys.executable,
requirement,
]
return [
sys.executable,
"-m",
"pip",
"install",
requirement,
]

9
plugins/_acp/plugin.yaml Normal file
View file

@ -0,0 +1,9 @@
name: _acp
title: Agent Client Protocol
description: Exposes Agent Zero as an ACP stdio agent for editor clients.
version: "1.19"
settings_sections:
- external
per_project_config: false
per_agent_config: false
always_enabled: true

View file

@ -0,0 +1,8 @@
<div class="space-y-4">
<div>
<h3 class="text-lg font-semibold">Agent Client Protocol</h3>
<p class="text-sm opacity-70">Expose Agent Zero to ACP-capable editors over stdio.</p>
</div>
<pre class="rounded bg-base-300 p-3 text-xs overflow-auto">python -m plugins._acp</pre>
<pre class="rounded bg-base-300 p-3 text-xs overflow-auto">docker exec -i agent-zero /opt/venv-a0/bin/python -m plugins._acp</pre>
</div>

View file

@ -480,8 +480,10 @@ class CodeExecution(Tool):
if project_name:
path = projects.get_project_folder(project_name)
else:
set = settings.get_settings()
path = set.get("workdir_path")
path = self.agent.context.get_data("workdir_path")
if not path:
set = settings.get_settings()
path = set.get("workdir_path")
if not path:
return None

View file

@ -1,5 +1,6 @@
a2wsgi==1.10.8
ansio==0.0.1
agent-client-protocol==0.10.1
docker==7.1.0
duckduckgo-search==6.1.12
pyreqwest-impersonate==0.5.3 # freeze nearest wheel-backed release; 0.5.5 is source-only

View file

@ -0,0 +1,96 @@
from __future__ import annotations
from pathlib import Path
import sys
from types import SimpleNamespace
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from plugins._acp.helpers.content import (
normalize_cwd,
path_from_file_uri,
prompt_blocks_to_user_message,
)
from plugins._acp import hooks
def test_acp_manifest_declares_builtin_plugin() -> None:
manifest = (ROOT / "plugins" / "_acp" / "plugin.yaml").read_text(encoding="utf-8")
assert "name: _acp" in manifest
assert "always_enabled: true" in manifest
assert 'version: "1.19"' in manifest
def test_acp_registry_metadata_matches_release() -> None:
registry = (ROOT / "plugins" / "_acp" / "acp_registry" / "agent.json").read_text(
encoding="utf-8"
)
assert '"version": "1.19"' in registry
assert '"license": "MIT"' in registry
def test_acp_dependency_is_pinned() -> None:
requirements = (ROOT / "requirements.txt").read_text(encoding="utf-8")
assert "agent-client-protocol==0.10.1" in requirements
def test_acp_hook_uses_root_requirement_pin() -> None:
assert hooks.get_acp_requirement() == "agent-client-protocol==0.10.1"
assert not (ROOT / "plugins" / "_acp" / "requirements.txt").exists()
def test_acp_entrypoint_attempts_lazy_dependency_install() -> None:
entry = (ROOT / "plugins" / "_acp" / "entry.py").read_text(encoding="utf-8")
assert "_import_acp_or_install()" in entry
assert "hooks.ensure_dependencies(raise_on_error=False)" in entry
def test_acp_initialize_metadata_uses_release_version_constant() -> None:
server = (ROOT / "plugins" / "_acp" / "helpers" / "server.py").read_text(
encoding="utf-8"
)
assert "version=AGENT_ZERO_ACP_VERSION" in server
def test_context_workdir_override_is_wired() -> None:
code_execution = (
ROOT / "plugins" / "_code_execution" / "tools" / "code_execution_tool.py"
).read_text(encoding="utf-8")
workdir_prompt = (
ROOT / "extensions" / "python" / "message_loop_prompts_after" / "_75_include_workdir_extras.py"
).read_text(encoding="utf-8")
assert 'get_data("workdir_path")' in code_execution
assert 'get_data("workdir_path")' in workdir_prompt
def test_file_uri_path_translation() -> None:
assert str(path_from_file_uri("file:///tmp/example.py")) == "/tmp/example.py"
assert str(path_from_file_uri("file:///C:/Users/Ada/work/app.py")) == "/mnt/c/Users/Ada/work/app.py"
assert path_from_file_uri("https://example.com/app.py") is None
def test_prompt_blocks_inline_text_resource(tmp_path: Path) -> None:
source = tmp_path / "notes.md"
source.write_text("hello ACP", encoding="utf-8")
block = SimpleNamespace(
type="resource_link",
uri=source.as_uri(),
name="notes.md",
mime_type="text/markdown",
)
parts = prompt_blocks_to_user_message(
[SimpleNamespace(type="text", text="Read this"), block],
context_id="static-test",
message_id="msg",
)
assert "Read this" in parts.text
assert "hello ACP" in parts.text
assert parts.attachments == []
def test_normalize_cwd_returns_absolute_path() -> None:
assert Path(normalize_cwd(".")).is_absolute()