eigent/backend/app/utils/toolkit/notion_mcp_toolkit.py
2025-11-07 16:47:13 +08:00

114 lines
4.6 KiB
Python

import os
import json
import asyncio
from textwrap import indent
from typing import Any, Dict, List
from camel.toolkits import FunctionTool
from app.component.environment import env
from app.utils.toolkit.abstract_toolkit import AbstractToolkit
from camel.toolkits.mcp_toolkit import MCPToolkit
from utils import traceroot_wrapper as traceroot
logger = traceroot.get_logger("notion_mcp_toolkit")
def _customize_function_parameters(schema: Dict[str, Any]) -> None:
r"""Customize function parameters for specific functions.
This method allows modifying parameter descriptions or other schema
attributes for specific functions.
"""
function_info = schema.get("function", {})
function_name = function_info.get("name", "")
parameters = function_info.get("parameters", {})
properties = parameters.get("properties", {})
required = parameters.get("required", [])
help_description = "If you need use parent, you can use `notion-search` for the information"
# Modify the notion-create-pages function to make parent optional
if function_name == "notion-create-pages" or function_name == "notion-create-database":
required.remove("parent")
parameters["required"] = required
if "parent" in properties:
# Update the parent parameter description
properties["parent"]["description"] = "Optional. " + properties["parent"]["description"] + help_description
class NotionMCPToolkit(MCPToolkit, AbstractToolkit):
def __init__(
self,
api_task_id: str,
timeout: float | None = None,
):
self.api_task_id = api_task_id
if timeout is None:
timeout = 120.0
config_dict={
"mcpServers": {
"notionMCP": {
"command": "npx",
"args": [
"-y",
"mcp-remote",
"https://mcp.notion.com/mcp",
],
"env": {
"MCP_REMOTE_CONFIG_DIR": env("MCP_REMOTE_CONFIG_DIR", os.path.expanduser("~/.mcp-auth")),
},
}
}
}
super().__init__(config_dict=config_dict, timeout=timeout)
@classmethod
async def get_can_use_tools(cls, api_task_id: str) -> list[FunctionTool]:
# Retry mechanism for remote MCP connection
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
tools = []
toolkit = None
try:
# Create a fresh toolkit instance for each retry
toolkit = cls(api_task_id)
logger.info(f"Attempting to connect to Notion MCP server (attempt {attempt + 1}/{max_retries})")
await toolkit.connect()
# Get tools from the connected toolkit
all_tools = toolkit.get_tools()
tool_schema = [
item.get_openai_tool_schema() for item in all_tools
]
# Adjust tool schema
for item in tool_schema:
_customize_function_parameters(item)
for item in all_tools:
setattr(item, "_toolkit_name", cls.__name__)
tools.append(item)
# Check if we actually got tools
if len(tools) == 0:
logger.warning(f"Connected to Notion MCP server but got 0 tools (attempt {attempt + 1}/{max_retries})")
raise Exception("No tools retrieved from Notion MCP server")
# Success! Got tools
logger.info(f"Successfully connected to Notion MCP server and loaded {len(tools)} tools")
return tools
except Exception as e:
logger.warning(f"Failed to connect to Notion MCP server (attempt {attempt + 1}/{max_retries}): {e}")
# If not the last attempt, wait and retry
if attempt < max_retries - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
else:
# Last attempt failed
logger.error(f"All {max_retries} connection attempts to Notion MCP server failed. Notion tools will not be available for this task.")
return []