From aa63ebbb57cbf03a0c10448846575b07c1eb126f Mon Sep 17 00:00:00 2001 From: frdel <38891707+frdel@users.noreply.github.com> Date: Fri, 30 May 2025 13:48:42 +0200 Subject: [PATCH] MCP Server finalizing --- agent.py | 17 +-- python/helpers/mcp_server.py | 231 +++++++++++++++++++++-------------- python/helpers/settings.py | 27 ++++ run_ui.py | 27 +++- webui/index.html | 4 + webui/public/mcp_client.svg | 35 ++++++ webui/public/mcp_server.svg | 27 ++++ 7 files changed, 262 insertions(+), 106 deletions(-) create mode 100644 webui/public/mcp_client.svg create mode 100644 webui/public/mcp_server.svg diff --git a/agent.py b/agent.py index 7636f38b9..9785ddd51 100644 --- a/agent.py +++ b/agent.py @@ -13,7 +13,7 @@ from python.helpers.print_style import PrintStyle from langchain_core.prompts import ( ChatPromptTemplate, ) -from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, BaseMessage +from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage import python.helpers.log as Log from python.helpers.dirty_json import DirtyJson @@ -121,21 +121,16 @@ class AgentContext: def nudge(self): self.kill_process() self.paused = False - if self.streaming_agent: - current_agent = self.streaming_agent - else: - current_agent = self.agent0 - - self.task = self.run_task(current_agent.monologue) + self.task = self.run_task(self.get_agent().monologue) return self.task + def get_agent(self): + return self.streaming_agent or self.agent0 + def communicate(self, msg: "UserMessage", broadcast_level: int = 1): self.paused = False # unpause if paused - if self.streaming_agent: - current_agent = self.streaming_agent - else: - current_agent = self.agent0 + current_agent = self.get_agent() if self.task and self.task.is_alive(): # set intervention messages to agent(s): diff --git a/python/helpers/mcp_server.py b/python/helpers/mcp_server.py index 5356f79d9..36985a8b1 100644 --- a/python/helpers/mcp_server.py +++ b/python/helpers/mcp_server.py @@ -20,21 +20,31 @@ _PRINTER = PrintStyle(italic=True, font_color="green", padding=False) mcp_server: FastMCP = FastMCP( name="Agent Zero integrated MCP Server", instructions=""" - This server connects you to the Agent Zero instance running on the remote server. - It exposes tools to interact with the remote Agent Zero instance. + Connect to remote Agent Zero instance. + Agent Zero is a general AI assistant controlling it's linux environment. + Agent Zero can install software, manage files, execute commands, code, use internet, etc. + Agent Zero's environment is isolated unless configured otherwise. """, ) class ToolResponse(BaseModel): - status: Literal["success"] = Field(description="The status of the response", default="success") - response: str = Field(description="The response from the remote Agent Zero Instance") + status: Literal["success"] = Field( + description="The status of the response", default="success" + ) + response: str = Field( + description="The response from the remote Agent Zero Instance" + ) chat_id: str = Field(description="The id of the chat this message belongs to.") class ToolError(BaseModel): - status: Literal["error"] = Field(description="The status of the response", default="error") - error: str = Field(description="The error message from the remote Agent Zero Instance") + status: Literal["error"] = Field( + description="The status of the response", default="error" + ) + error: str = Field( + description="The error message from the remote Agent Zero Instance" + ) chat_id: str = Field(description="The id of the chat this message belongs to.") @@ -47,7 +57,19 @@ This tool is used to send a message to the remote Agent Zero Instance connected @mcp_server.tool( name="send_message", description=SEND_MESSAGE_DESCRIPTION, - tags={"agent_zero", "chat", "remote", "communication", "dialogue", "sse", "send", "message", "start", "new", "continue"}, + tags={ + "agent_zero", + "chat", + "remote", + "communication", + "dialogue", + "sse", + "send", + "message", + "start", + "new", + "continue", + }, annotations={ "remote": True, "readOnlyHint": False, @@ -58,20 +80,49 @@ This tool is used to send a message to the remote Agent Zero Instance connected }, ) async def send_message( - message: Annotated[str, Field(description="The message to send to the remote Agent Zero Instance", title="message")], - attachments: Annotated[list[str], Field( - description="Optional: A list of attachments (file paths or web urls) to send to the remote Agent Zero Instance with the message. Default: Empty list", - title="attachments", - )] | None = None, - chat_id: Annotated[str, Field( - description="Optional: ID of the chat. Used to continue a chat. This value is returned in response to sending previous message. Default: Empty string", - title="chat_id", - )] | None = None, - persistent_chat: Annotated[bool, Field( - description="Optional: Whether to use a persistent chat. If true, the chat will be saved and can be continued later. Default: False.", - title="persistent_chat", - )] | None = None, -) -> Annotated[Union[ToolResponse, ToolError], Field(description="The response from the remote Agent Zero Instance", title="response")]: + message: Annotated[ + str, + Field( + description="The message to send to the remote Agent Zero Instance", + title="message", + ), + ], + attachments: ( + Annotated[ + list[str], + Field( + description="Optional: A list of attachments (file paths or web urls) to send to the remote Agent Zero Instance with the message. Default: Empty list", + title="attachments", + ), + ] + | None + ) = None, + chat_id: ( + Annotated[ + str, + Field( + description="Optional: ID of the chat. Used to continue a chat. This value is returned in response to sending previous message. Default: Empty string", + title="chat_id", + ), + ] + | None + ) = None, + persistent_chat: ( + Annotated[ + bool, + Field( + description="Optional: Whether to use a persistent chat. If true, the chat will be saved and can be continued later. Default: False.", + title="persistent_chat", + ), + ] + | None + ) = None, +) -> Annotated[ + Union[ToolResponse, ToolError], + Field( + description="The response from the remote Agent Zero Instance", title="response" + ), +]: context: AgentContext | None = None if chat_id: context = AgentContext.get(chat_id) @@ -87,7 +138,9 @@ async def send_message( context = AgentContext(config=config, type=AgentContextType.MCP) if not message: - return ToolError(error="Message is required", chat_id=context.id if persistent_chat else "") + return ToolError( + error="Message is required", chat_id=context.id if persistent_chat else "" + ) try: response = await _run_chat(context, message, attachments) @@ -95,7 +148,9 @@ async def send_message( context.reset() AgentContext.remove(context.id) remove_chat(context.id) - return ToolResponse(response=response, chat_id=context.id if persistent_chat else "") + return ToolResponse( + response=response, chat_id=context.id if persistent_chat else "" + ) except Exception as e: return ToolError(error=str(e), chat_id=context.id if persistent_chat else "") @@ -111,7 +166,18 @@ Always use this tool to finish persistent chat conversations with remote Agent Z @mcp_server.tool( name="finish_chat", description=FINISH_CHAT_DESCRIPTION, - tags={"agent_zero", "chat", "remote", "communication", "dialogue", "sse", "finish", "close", "end", "stop"}, + tags={ + "agent_zero", + "chat", + "remote", + "communication", + "dialogue", + "sse", + "finish", + "close", + "end", + "stop", + }, annotations={ "remote": True, "readOnlyHint": False, @@ -122,11 +188,19 @@ Always use this tool to finish persistent chat conversations with remote Agent Z }, ) async def finish_chat( - chat_id: Annotated[str, Field( - description="ID of the chat to be finished. This value is returned in response to sending previous message.", - title="chat_id", - )] -) -> Annotated[Union[ToolResponse, ToolError], Field(description="The response from the remote Agent Zero Instance", title="response")]: + chat_id: Annotated[ + str, + Field( + description="ID of the chat to be finished. This value is returned in response to sending previous message.", + title="chat_id", + ), + ] +) -> Annotated[ + Union[ToolResponse, ToolError], + Field( + description="The response from the remote Agent Zero Instance", title="response" + ), +]: if not chat_id: return ToolError(error="Chat ID is required", chat_id="") @@ -140,74 +214,49 @@ async def finish_chat( return ToolResponse(response="Chat finished", chat_id=chat_id) -async def _run_chat(context: AgentContext, message: str, attachments: list[str] | None = None): - async def _run_chat_wrapper(context: AgentContext, message: str, attachments: list[str] | None = None): - # the agent instance - init in try block - agent = None +async def _run_chat( + context: AgentContext, message: str, attachments: list[str] | None = None +): + try: + _PRINTER.print("MCP Chat message received") - try: - _PRINTER.print("MCP Chat message received") - - agent = context.streaming_agent or context.agent0 - - # Pcurrent_taskhment filenames for logging - attachment_filenames = [] - if attachments: - for attachment in attachments: - if os.path.exists(attachment): - attachment_filenames.append(attachment) - else: - try: - url = urlparse(attachment) - if url.scheme in ["http", "https", "ftp", "ftps", "sftp"]: - attachment_filenames.append(attachment) - else: - _PRINTER.print(f"Skipping attachment: [{attachment}]") - except Exception: + # Pcurrent_taskhment filenames for logging + attachment_filenames = [] + if attachments: + for attachment in attachments: + if os.path.exists(attachment): + attachment_filenames.append(attachment) + else: + try: + url = urlparse(attachment) + if url.scheme in ["http", "https", "ftp", "ftps", "sftp"]: + attachment_filenames.append(attachment) + else: _PRINTER.print(f"Skipping attachment: [{attachment}]") + except Exception: + _PRINTER.print(f"Skipping attachment: [{attachment}]") - _PRINTER.print("User message:") - _PRINTER.print(f"> {message}") - if attachment_filenames: - _PRINTER.print("Attachments:") - for filename in attachment_filenames: - _PRINTER.print(f"- {filename}") + _PRINTER.print("User message:") + _PRINTER.print(f"> {message}") + if attachment_filenames: + _PRINTER.print("Attachments:") + for filename in attachment_filenames: + _PRINTER.print(f"- {filename}") - # Log the message with message_id and attachments - context.log.log( - type="user", - heading="User message", - content=message, - kvps={"attachments": attachment_filenames}, - id=str(uuid.uuid4()), + task = context.communicate( + UserMessage( + message=message, system_message=[], attachments=attachment_filenames ) + ) + result = await task.result() - agent.hist_add_user_message( - UserMessage( - message=message, - system_message=[], - attachments=attachment_filenames)) + # Success + _PRINTER.print(f"MCP Chat message completed: {result}") - # Persist after setting up the context but before running the agent - save_tmp_chat(context) + return result - result = await agent.monologue() + except Exception as e: + # Error + _PRINTER.print(f"MCP Chat message failed: {e}") - # Success - _PRINTER.print(f"MCP Chat message completed: {result}") - save_tmp_chat(context) - - return result - - except Exception as e: - # Error - _PRINTER.print(f"MCP Chat message failed: {e}") - if agent: - agent.handle_critical_exception(e) - - raise RuntimeError(f"MCP Chat message failed: {e}") from e - - deferred_task = DeferredTask(thread_name="mcp_chat_" + context.id) - deferred_task.start_task(_run_chat_wrapper, context, message, attachments) - asyncio.create_task(asyncio.sleep(0.1)) # Ensure background execution doesn't exit immediately on async await - return await deferred_task.result() + raise RuntimeError(f"MCP Chat message failed: {e}") from e diff --git a/python/helpers/settings.py b/python/helpers/settings.py index d0a1e17bc..7cde0f808 100644 --- a/python/helpers/settings.py +++ b/python/helpers/settings.py @@ -62,6 +62,9 @@ class Settings(TypedDict): stt_silence_duration: int stt_waiting_timeout: int + mcp_server_enabled: bool + + class PartialSettings(Settings, total=False): pass @@ -677,6 +680,28 @@ def convert_out(settings: Settings) -> SettingsOutput: "tab": "agent", } + # MCP section + mcp_server_fields: list[SettingsField] = [] + + mcp_server_fields.append( + { + "id": "mcp_server_enabled", + "title": "Enable A0 MCP Server", + "description": "Expose Agent Zero as an SSE MCP server. This will make this A0 instance available to MCP clients.", + "type": "switch", + "value": settings["mcp_server_enabled"], + } + ) + + mcp_server_section: SettingsSection = { + "id": "mcp_server", + "title": "A0 MCP Server", + "description": "Agent Zero can be exposed as an SSE MCP server. It can then be accessed by MCP clients on the URL and port of the web UI + /mcp/sse, for example http://localhost:5000/mcp/sse. The same applies to public URL using Cloudflare Tunnel.", + "fields": mcp_server_fields, + "tab": "mcp", + } + + # Add the section to the result result: SettingsOutput = { "sections": [ @@ -689,6 +714,7 @@ def convert_out(settings: Settings) -> SettingsOutput: stt_section, api_keys_section, auth_section, + mcp_server_section, dev_section, ] } @@ -839,6 +865,7 @@ def get_default_settings() -> Settings: stt_silence_threshold=0.3, stt_silence_duration=1000, stt_waiting_timeout=2000, + mcp_server_enabled=False, ) diff --git a/run_ui.py b/run_ui.py index 6f75d975f..6353453a3 100644 --- a/run_ui.py +++ b/run_ui.py @@ -9,7 +9,7 @@ import threading import signal from flask import Flask, request, Response from flask_basicauth import BasicAuth -from python.helpers import errors, files, git +from python.helpers import errors, files, git, settings from python.helpers.files import get_abs_path from python.helpers import persist_chat, runtime, dotenv, process from python.helpers.cloudflare_tunnel import CloudflareTunnel @@ -19,6 +19,9 @@ from python.helpers.job_loop import run_loop from python.helpers.print_style import PrintStyle from python.helpers.task_scheduler import TaskScheduler from python.helpers.defer import DeferredTask +from starlette.middleware import Middleware +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.exceptions import HTTPException as StarletteHTTPException # Set the new timezone to 'UTC' @@ -234,6 +237,22 @@ def run(): for handler in handlers: register_api_handler(webapp, handler) + + # define a Starlette-compatible middleware handler + + + async def mcp_middleware(request, call_next): + set = settings.get_settings() + if not set["mcp_server_enabled"]: + # raise a proper Starlette HTTPException with a clear message + PrintStyle.error("[MCP] Access denied: MCP server is disabled in settings.") + raise StarletteHTTPException(status_code=403, detail="MCP server is disabled in settings.") + return await call_next(request) + + mcp_middlewares = [ + Middleware(BaseHTTPMiddleware, dispatch=mcp_middleware) + ] + mcp_app = create_sse_app( server=mcp_server_instance, message_path=mcp_server_instance.settings.message_path, @@ -242,13 +261,13 @@ def run(): auth_settings=mcp_server_instance.settings.auth, debug=mcp_server_instance.settings.debug, routes=mcp_server_instance._additional_http_routes, - middleware=None + middleware=mcp_middlewares ) # add the webapp and mcp to the app app = DispatcherMiddleware(webapp, { - "/mcp": ASGIMiddleware(app=mcp_app), - }) + "/mcp": ASGIMiddleware(app=mcp_app), # type: ignore + }) # type: ignore PrintStyle().debug("Registered middleware for MCP") try: diff --git a/webui/index.html b/webui/index.html index c088972a9..8d33a7d01 100644 --- a/webui/index.html +++ b/webui/index.html @@ -583,6 +583,10 @@ :class="{'active': activeTab === 'external'}" @click="switchTab('external')" title="External Services">External Services +