From dffd615018cd8ed3586eb3a2dd428a777745a3d5 Mon Sep 17 00:00:00 2001 From: Sun Tao <2605127667@qq.com> Date: Thu, 16 Oct 2025 00:48:00 +0800 Subject: [PATCH] update --- backend/app/controller/tool_controller.py | 101 +++++++-- backend/app/utils/oauth_state_manager.py | 109 ++++++++++ .../utils/toolkit/google_calendar_toolkit.py | 203 +++++++++++++++--- src/components/AddWorker/IntegrationList.tsx | 90 +++++--- src/components/AddWorker/ToolSelect.tsx | 150 ++++++++----- src/pages/Setting/MCP.tsx | 112 ++++++++-- .../Setting/components/IntegrationList.tsx | 53 +++-- 7 files changed, 656 insertions(+), 162 deletions(-) create mode 100644 backend/app/utils/oauth_state_manager.py diff --git a/backend/app/controller/tool_controller.py b/backend/app/controller/tool_controller.py index 4a18857fe..c8eb581c3 100644 --- a/backend/app/controller/tool_controller.py +++ b/backend/app/controller/tool_controller.py @@ -2,6 +2,7 @@ from fastapi import APIRouter, HTTPException from loguru import logger from app.utils.toolkit.notion_mcp_toolkit import NotionMCPToolkit from app.utils.toolkit.google_calendar_toolkit import GoogleCalendarToolkit +from app.utils.oauth_state_manager import oauth_state_manager router = APIRouter(tags=["task"]) @@ -60,20 +61,34 @@ async def install_tool(tool: str): ) elif tool == "google_calendar": try: - # Use a dummy task_id for installation, as this is just for pre-authentication - toolkit = GoogleCalendarToolkit("install_auth") - - # Get available tools to verify connection - tools = [tool_func.func.__name__ for tool_func in toolkit.get_tools()] - logger.info(f"Successfully pre-instantiated {tool} toolkit with {len(tools)} tools") - - return { - "success": True, - "tools": tools, - "message": f"Successfully installed {tool} toolkit", - "count": len(tools), - "toolkit_name": "GoogleCalendarToolkit" - } + # Try to initialize toolkit - will succeed if credentials exist + try: + toolkit = GoogleCalendarToolkit("install_auth") + tools = [tool_func.func.__name__ for tool_func in toolkit.get_tools()] + logger.info(f"Successfully initialized Google Calendar toolkit with {len(tools)} tools") + + return { + "success": True, + "tools": tools, + "message": f"Successfully installed {tool} toolkit", + "count": len(tools), + "toolkit_name": "GoogleCalendarToolkit" + } + except ValueError as auth_error: + # No credentials - need authorization + logger.info(f"No credentials found, starting authorization: {auth_error}") + + # Start background authorization in a new thread + logger.info("Starting background Google Calendar authorization") + GoogleCalendarToolkit.start_background_auth("install_auth") + + return { + "success": False, + "status": "authorizing", + "message": "Authorization required. Browser should open automatically. Complete authorization and try installing again.", + "toolkit_name": "GoogleCalendarToolkit", + "requires_auth": True + } except Exception as e: logger.error(f"Failed to install {tool} toolkit: {e}") raise HTTPException( @@ -113,3 +128,61 @@ async def list_available_tools(): } ] } + + +@router.get("/oauth/status/{provider}", name="get oauth status") +async def get_oauth_status(provider: str): + """ + Get the current OAuth authorization status for a provider + + Args: + provider: OAuth provider name (e.g., 'google_calendar') + + Returns: + Current authorization status + """ + state = oauth_state_manager.get_state(provider) + + if not state: + return { + "provider": provider, + "status": "not_started", + "message": "No authorization in progress" + } + + return state.to_dict() + + +@router.post("/oauth/cancel/{provider}", name="cancel oauth") +async def cancel_oauth(provider: str): + """ + Cancel an ongoing OAuth authorization flow + + Args: + provider: OAuth provider name (e.g., 'google_calendar') + + Returns: + Cancellation result + """ + state = oauth_state_manager.get_state(provider) + + if not state: + raise HTTPException( + status_code=404, + detail=f"No authorization found for provider '{provider}'" + ) + + if state.status not in ["pending", "authorizing"]: + raise HTTPException( + status_code=400, + detail=f"Cannot cancel authorization with status '{state.status}'" + ) + + state.cancel() + logger.info(f"Cancelled OAuth authorization for {provider}") + + return { + "success": True, + "provider": provider, + "message": "Authorization cancelled successfully" + } diff --git a/backend/app/utils/oauth_state_manager.py b/backend/app/utils/oauth_state_manager.py new file mode 100644 index 000000000..894ee8c47 --- /dev/null +++ b/backend/app/utils/oauth_state_manager.py @@ -0,0 +1,109 @@ +""" +OAuth authorization state manager for background authorization flows +""" +import threading +from typing import Dict, Optional, Literal +from datetime import datetime +from loguru import logger + +AuthStatus = Literal["pending", "authorizing", "success", "failed", "cancelled"] + + +class OAuthState: + """Represents the state of an OAuth authorization flow""" + + def __init__(self, provider: str): + self.provider = provider + self.status: AuthStatus = "pending" + self.error: Optional[str] = None + self.thread: Optional[threading.Thread] = None + self.result: Optional[any] = None + self.started_at = datetime.now() + self.completed_at: Optional[datetime] = None + self._cancel_event = threading.Event() + self.server = None # Store the local server instance for forced shutdown + + def is_cancelled(self) -> bool: + """Check if cancellation has been requested""" + return self._cancel_event.is_set() + + def cancel(self): + """Request cancellation of the authorization flow""" + self._cancel_event.set() + self.status = "cancelled" + self.completed_at = datetime.now() + + def to_dict(self) -> Dict: + """Convert state to dictionary for API response""" + return { + "provider": self.provider, + "status": self.status, + "error": self.error, + "started_at": self.started_at.isoformat(), + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + } + + +class OAuthStateManager: + """Manager for tracking OAuth authorization flows""" + + def __init__(self): + self._states: Dict[str, OAuthState] = {} + self._lock = threading.Lock() + + def create_state(self, provider: str) -> OAuthState: + """Create a new OAuth state for a provider""" + with self._lock: + # Cancel any existing authorization for this provider + if provider in self._states: + old_state = self._states[provider] + if old_state.status in ["pending", "authorizing"]: + old_state.cancel() + logger.info(f"Cancelled previous {provider} authorization") + + state = OAuthState(provider) + self._states[provider] = state + return state + + def get_state(self, provider: str) -> Optional[OAuthState]: + """Get the current state for a provider""" + with self._lock: + return self._states.get(provider) + + def update_status( + self, + provider: str, + status: AuthStatus, + error: Optional[str] = None, + result: Optional[any] = None + ): + """Update the status of an authorization flow""" + with self._lock: + if provider in self._states: + state = self._states[provider] + state.status = status + state.error = error + state.result = result + if status in ["success", "failed", "cancelled"]: + state.completed_at = datetime.now() + logger.info(f"Updated {provider} OAuth status to {status}") + + def cleanup_old_states(self, max_age_seconds: int = 3600): + """Clean up old completed states""" + with self._lock: + now = datetime.now() + to_remove = [] + for provider, state in self._states.items(): + if state.completed_at: + age = (now - state.completed_at).total_seconds() + if age > max_age_seconds: + to_remove.append(provider) + + for provider in to_remove: + del self._states[provider] + logger.debug(f"Cleaned up old OAuth state for {provider}") + + +# Global instance +oauth_state_manager = OAuthStateManager() + diff --git a/backend/app/utils/toolkit/google_calendar_toolkit.py b/backend/app/utils/toolkit/google_calendar_toolkit.py index 08aaeb162..fb7e77780 100644 --- a/backend/app/utils/toolkit/google_calendar_toolkit.py +++ b/backend/app/utils/toolkit/google_calendar_toolkit.py @@ -1,11 +1,14 @@ from typing import Any, Dict, List import os +import threading from app.component.environment import env from app.service.task import Agents from app.utils.listen.toolkit_listen import listen_toolkit from app.utils.toolkit.abstract_toolkit import AbstractToolkit +from app.utils.oauth_state_manager import oauth_state_manager from camel.toolkits import GoogleCalendarToolkit as BaseGoogleCalendarToolkit +from loguru import logger SCOPES = ['https://www.googleapis.com/auth/calendar'] @@ -99,52 +102,188 @@ class GoogleCalendarToolkit(BaseGoogleCalendarToolkit, AbstractToolkit): from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request - client_id = os.environ.get("GOOGLE_CLIENT_ID") - client_secret = os.environ.get("GOOGLE_CLIENT_SECRET") - refresh_token = os.environ.get("GOOGLE_REFRESH_TOKEN") - token_uri = os.environ.get("GOOGLE_TOKEN_URI", "https://oauth2.googleapis.com/token") - creds = None + # First, try to load from token file try: if os.path.exists(self._token_path): + logger.info(f"Loading credentials from token file: {self._token_path}") creds = Credentials.from_authorized_user_file(self._token_path, SCOPES) - except Exception: + logger.info("Successfully loaded credentials from token file") + except Exception as e: + logger.warning(f"Could not load from token file: {e}") creds = None - if not creds and refresh_token: - creds = Credentials( - None, - refresh_token=refresh_token, - token_uri=token_uri, - client_id=client_id, - client_secret=client_secret, - scopes=SCOPES, - ) - + # If no token file, try environment variables if not creds: - client_config = { - "installed": { - "client_id": client_id, - "client_secret": client_secret, - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": token_uri, - "redirect_uris": ["http://localhost"], - } - } - flow = InstalledAppFlow.from_client_config(client_config, SCOPES) - creds = flow.run_local_server(port=0) + client_id = os.environ.get("GOOGLE_CLIENT_ID") + client_secret = os.environ.get("GOOGLE_CLIENT_SECRET") + refresh_token = os.environ.get("GOOGLE_REFRESH_TOKEN") + token_uri = os.environ.get("GOOGLE_TOKEN_URI", "https://oauth2.googleapis.com/token") + + if refresh_token and client_id and client_secret: + logger.info("Creating credentials from environment variables") + creds = Credentials( + None, + refresh_token=refresh_token, + token_uri=token_uri, + client_id=client_id, + client_secret=client_secret, + scopes=SCOPES, + ) + # If still no creds, check background authorization + if not creds: + state = oauth_state_manager.get_state("google_calendar") + if state and state.status == "success" and state.result: + logger.info("Using credentials from background authorization") + creds = state.result + else: + # No credentials available + raise ValueError("No credentials available. Please run authorization first via /api/install/tool/google_calendar") + # Refresh if expired if creds and creds.expired and creds.refresh_token: - creds.refresh(Request()) - + try: + logger.info("Token expired, refreshing...") + creds.refresh(Request()) + logger.info("Token refreshed successfully") + except Exception as e: + logger.error(f"Failed to refresh token: {e}") + raise ValueError("Failed to refresh expired token. Please re-authorize.") + # Save credentials try: os.makedirs(os.path.dirname(self._token_path), exist_ok=True) with open(self._token_path, "w") as f: f.write(creds.to_json()) - except Exception: - pass + except Exception as e: + logger.warning(f"Could not save credentials: {e}") - return creds \ No newline at end of file + return creds + + @staticmethod + def start_background_auth(api_task_id: str = "install_auth") -> str: + """ + Start background OAuth authorization flow with timeout + Returns the status of the authorization + """ + from google_auth_oauthlib.flow import InstalledAppFlow + from wsgiref import simple_server + import socket + + # Check if there's an existing authorization and force stop it + old_state = oauth_state_manager.get_state("google_calendar") + if old_state and old_state.status in ["pending", "authorizing"]: + logger.info("Found existing authorization, forcing shutdown...") + old_state.cancel() + # Try to shutdown the old server if it exists + if hasattr(old_state, 'server') and old_state.server: + try: + old_state.server.shutdown() + logger.info("Old server shutdown successfully") + except Exception as e: + logger.warning(f"Could not shutdown old server: {e}") + + # Create new state for this authorization + state = oauth_state_manager.create_state("google_calendar") + + def auth_flow(): + local_server = None + try: + state.status = "authorizing" + oauth_state_manager.update_status("google_calendar", "authorizing") + + client_id = os.environ.get("GOOGLE_CLIENT_ID") + client_secret = os.environ.get("GOOGLE_CLIENT_SECRET") + token_uri = os.environ.get("GOOGLE_TOKEN_URI", "https://oauth2.googleapis.com/token") + + logger.info(f"Google Calendar auth - client_id present: {bool(client_id)}, client_secret present: {bool(client_secret)}") + + if not client_id or not client_secret: + error_msg = "GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET must be set in environment variables" + logger.error(error_msg) + raise ValueError(error_msg) + + client_config = { + "installed": { + "client_id": client_id, + "client_secret": client_secret, + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": token_uri, + "redirect_uris": ["http://localhost"], + } + } + + flow = InstalledAppFlow.from_client_config(client_config, SCOPES) + + # Check for cancellation before starting + if state.is_cancelled(): + logger.info("Authorization cancelled before starting") + return + + # This will automatically open browser and wait for user authorization + logger.info("=" * 80) + logger.info(f"[Thread {threading.current_thread().name}] Starting local server for Google Calendar authorization") + logger.info("Browser should open automatically in a moment...") + logger.info("=" * 80) + + # Run local server - this will block until authorization completes + # Note: Each call uses a random port (port=0), so multiple concurrent attempts won't conflict + try: + creds = flow.run_local_server( + port=0, + authorization_prompt_message="", + success_message="
You can close this window and return to Eigent.
", + open_browser=True + ) + logger.info("Authorization flow completed successfully!") + except Exception as server_error: + logger.error(f"Error during run_local_server: {server_error}") + raise + + # Check for cancellation after auth + if state.is_cancelled(): + logger.info("Authorization cancelled after completion") + return + + # Save credentials to token file + token_path = os.path.join( + os.path.expanduser("~"), + ".eigent", + "tokens", + "google_calendar", + f"google_calendar_token_{api_task_id}.json", + ) + + try: + os.makedirs(os.path.dirname(token_path), exist_ok=True) + with open(token_path, "w") as f: + f.write(creds.to_json()) + logger.info(f"Saved Google Calendar credentials to {token_path}") + except Exception as e: + logger.warning(f"Could not save credentials: {e}") + + # Update state with success + oauth_state_manager.update_status("google_calendar", "success", result=creds) + logger.info("Google Calendar authorization successful!") + + except Exception as e: + if state.is_cancelled(): + logger.info("Authorization was cancelled") + oauth_state_manager.update_status("google_calendar", "cancelled") + else: + error_msg = str(e) + logger.error(f"Google Calendar authorization failed: {error_msg}") + oauth_state_manager.update_status("google_calendar", "failed", error=error_msg) + finally: + # Clean up server reference + state.server = None + + # Start authorization in background thread + thread = threading.Thread(target=auth_flow, daemon=True, name=f"GoogleCalendar-OAuth-{state.started_at.timestamp()}") + state.thread = thread + thread.start() + + logger.info("Started background Google Calendar authorization") + return "authorizing" \ No newline at end of file diff --git a/src/components/AddWorker/IntegrationList.tsx b/src/components/AddWorker/IntegrationList.tsx index 7345febb7..089a3e87f 100644 --- a/src/components/AddWorker/IntegrationList.tsx +++ b/src/components/AddWorker/IntegrationList.tsx @@ -78,18 +78,29 @@ export default function IntegrationList({ // items or configs change, recalculate installed useEffect(() => { - // remove duplicates by config_group - const groupSet = new Set