# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= import os import re from typing import Any, Dict, List, Literal, Optional, Union from camel.logger import get_logger from camel.toolkits import FunctionTool from camel.toolkits.base import BaseToolkit from camel.utils import MCPServer logger = get_logger(__name__) SCOPES = [ 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.send', 'https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/gmail.compose', 'https://www.googleapis.com/auth/gmail.labels', 'https://www.googleapis.com/auth/contacts.readonly', 'https://www.googleapis.com/auth/userinfo.profile', ] @MCPServer() class GmailToolkit(BaseToolkit): r"""A comprehensive toolkit for Gmail operations. This class provides methods for Gmail operations including sending emails, managing drafts, fetching messages, managing labels, and handling contacts. API keys can be accessed in google cloud console (https://console.cloud.google.com/) """ def __init__( self, timeout: Optional[float] = None, ): r"""Initializes a new instance of the GmailToolkit class. Args: timeout (Optional[float]): The timeout value for API requests in seconds. If None, no timeout is applied. (default: :obj:`None`) """ super().__init__(timeout=timeout) self._credentials = self._authenticate() self.gmail_service: Any = self._get_gmail_service() self._people_service: Any = None @property def people_service(self) -> Any: r"""Lazily initialize and return the Google People service.""" if self._people_service is None: self._people_service = self._get_people_service() return self._people_service @people_service.setter def people_service(self, service: Any) -> None: r"""Allow overriding/injecting the People service (e.g., in tests).""" self._people_service = service def gmail_send_email( self, to: Union[str, List[str]], subject: str, body: str, cc: Optional[Union[str, List[str]]] = None, bcc: Optional[Union[str, List[str]]] = None, attachments: Optional[List[str]] = None, is_html: bool = False, ) -> Dict[str, Any]: r"""Send an email through Gmail. Args: to (Union[str, List[str]]): Recipient email address(es). subject (str): Email subject. body (str): Email body content. cc (Optional[Union[str, List[str]]]): CC recipient email address(es). bcc (Optional[Union[str, List[str]]]): BCC recipient email address(es). attachments (Optional[List[str]]): List of file paths to attach. is_html (bool): Whether the body is HTML format. Set to True when sending formatted emails with HTML tags (e.g., bold, links, images). Use False (default) for plain text emails. Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: # Normalize recipients to lists to_list = [to] if isinstance(to, str) else to cc_list = [cc] if isinstance(cc, str) else (cc or []) bcc_list = [bcc] if isinstance(bcc, str) else (bcc or []) # Validate email addresses all_recipients = to_list + cc_list + bcc_list for email in all_recipients: if not self._is_valid_email(email): return {"error": f"Invalid email address: {email}"} # Create message message = self._create_message( to_list, subject, body, cc_list, bcc_list, attachments, is_html ) # Send message sent_message = ( self.gmail_service.users() .messages() .send(userId='me', body=message) .execute() ) return { "success": True, "message_id": sent_message.get('id'), "thread_id": sent_message.get('threadId'), "message": "Email sent successfully", } except Exception as e: logger.error("Failed to send email: %s", e) return {"error": f"Failed to send email: {e!s}"} def gmail_reply_to_email( self, message_id: str, reply_body: str, reply_all: bool = False, is_html: bool = False, ) -> Dict[str, Any]: r"""Reply to an email message. Args: message_id (str): The unique identifier of the message to reply to. To get a message ID, first use fetch_emails() to list messages, or use the 'message_id' returned from send_email() or create_email_draft(). reply_body (str): The reply message body. reply_all (bool): Whether to reply to all recipients. is_html (bool): Whether the body is HTML format. Set to True when sending formatted emails with HTML tags (e.g., bold, links, images). Use False (default) for plain text emails. Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: # Get the original message original_message = ( self.gmail_service.users() .messages() .get(userId='me', id=message_id) .execute() ) # Extract headers (single pass, case-insensitive) headers = original_message['payload'].get('headers', []) subject = from_email = to_emails = cc_emails = None missing = {'subject', 'from', 'to', 'cc'} for header in headers: name = (header.get('name') or '').lower() if name not in missing: continue value = header.get('value') if name == 'subject': subject = value elif name == 'from': from_email = value elif name == 'to': to_emails = value elif name == 'cc': cc_emails = value missing.discard(name) if not missing: break # Extract identifiers for reply context message_id_header = self._get_header_value( headers, 'Message-Id' ) or self._get_header_value(headers, 'Message-ID') thread_id = original_message.get('threadId') # Prepare reply subject if subject and not subject.startswith('Re: '): subject = f"Re: {subject}" elif not subject: subject = "Re: (No Subject)" # Validate from_email if not from_email: return {"error": "Original message has no sender address"} # Prepare recipients if reply_all: recipients = [from_email] if to_emails: recipients.extend( [email.strip() for email in to_emails.split(',')] ) if cc_emails: recipients.extend( [email.strip() for email in cc_emails.split(',')] ) # Remove duplicates and None values recipients = [r for r in list(set(recipients)) if r] # Get current user's email and remove it from recipients try: profile_result = self.gmail_get_profile() if profile_result.get('success'): current_user_email = profile_result['profile'][ 'email_address' ] # Remove current user from recipients (handle both # plain email and "Name " format) filtered_recipients = [] for email in recipients: # Extract email from "Name " format match = re.search(r'<([^>]+)>$', email.strip()) email_addr = ( match.group(1) if match else email.strip() ) if email_addr != current_user_email: filtered_recipients.append(email) recipients = filtered_recipients except Exception as e: logger.warning( "Could not get current user email to filter from " "recipients: %s", e, ) else: recipients = [from_email] # Create reply message with reply headers message = self._create_message( recipients, subject, reply_body, is_html=is_html, in_reply_to=message_id_header or original_message.get('id'), references=[message_id_header] if message_id_header else None, ) # Send reply in the same thread sent_message = ( self.gmail_service.users() .messages() .send(userId='me', body={**message, 'threadId': thread_id}) .execute() ) return { "success": True, "message_id": sent_message.get('id'), "thread_id": sent_message.get('threadId'), "message": "Reply sent successfully", } except Exception as e: logger.error("Failed to reply to email: %s", e) return {"error": f"Failed to reply to email: {e!s}"} def gmail_forward_email( self, message_id: str, to: Union[str, List[str]], forward_body: Optional[str] = None, cc: Optional[Union[str, List[str]]] = None, bcc: Optional[Union[str, List[str]]] = None, include_attachments: bool = True, ) -> Dict[str, Any]: r"""Forward an email message. Args: message_id (str): The unique identifier of the message to forward. To get a message ID, first use fetch_emails() to list messages, or use the 'message_id' returned from send_email() or create_email_draft(). to (Union[str, List[str]]): Recipient email address(es). forward_body (Optional[str]): Additional message to include at the top of the forwarded email, before the original message content. If not provided, only the original message will be forwarded. cc (Optional[Union[str, List[str]]]): CC recipient email address(es). bcc (Optional[Union[str, List[str]]]): BCC recipient email address(es). include_attachments (bool): Whether to include original attachments. Defaults to True. Only includes real attachments, not inline images. Returns: Dict[str, Any]: A dictionary containing the result of the operation, including the number of attachments forwarded. """ try: import tempfile # Get the original message original_message = ( self.gmail_service.users() .messages() .get(userId='me', id=message_id) .execute() ) # Extract headers (single pass, case-insensitive) headers = original_message['payload'].get('headers', []) subject = from_email = date = None missing = {'subject', 'from', 'date'} for header in headers: name = (header.get('name') or '').lower() if name not in missing: continue value = header.get('value') if name == 'subject': subject = value elif name == 'from': from_email = value elif name == 'date': date = value missing.discard(name) if not missing: break # Prepare forward subject if subject and not subject.startswith('Fwd: '): subject = f"Fwd: {subject}" elif not subject: subject = "Fwd: (No Subject)" # Prepare forward body if forward_body: body = f"{forward_body}\n\n--- Forwarded message ---\n" else: body = "--- Forwarded message ---\n" body += f"From: {from_email}\n" body += f"Date: {date}\n" body += f"Subject: {subject.replace('Fwd: ', '')}\n\n" # Add original message body body += self._extract_message_body(original_message) # Normalize recipients to_list = [to] if isinstance(to, str) else to cc_list = [cc] if isinstance(cc, str) else (cc or []) bcc_list = [bcc] if isinstance(bcc, str) else (bcc or []) # Handle attachments attachment_paths = [] temp_files: List[str] = [] try: if include_attachments: # Extract attachment metadata attachments = self._extract_attachments(original_message) for att in attachments: try: # Create temp file temp_file = tempfile.NamedTemporaryFile( delete=False, suffix=f"_{att['filename']}" ) temp_files.append(temp_file.name) # Download attachment result = self.gmail_get_attachment( message_id=message_id, attachment_id=att['attachment_id'], save_path=temp_file.name, ) if result.get('success'): attachment_paths.append(temp_file.name) except Exception as e: logger.warning( f"Failed to download attachment " f"{att['filename']}: {e}" ) # Create forward message (now with attachments!) message = self._create_message( to_list, subject, body, cc_list, bcc_list, attachments=attachment_paths if attachment_paths else None, ) # Send forward sent_message = ( self.gmail_service.users() .messages() .send(userId='me', body=message) .execute() ) return { "success": True, "message_id": sent_message.get('id'), "thread_id": sent_message.get('threadId'), "message": "Email forwarded successfully", "attachments_forwarded": len(attachment_paths), } finally: # Clean up temp files for temp_file_path in temp_files: try: os.unlink(temp_file_path) except Exception as e: logger.warning( f"Failed to delete temp file {temp_file_path}: {e}" ) except Exception as e: logger.error("Failed to forward email: %s", e) return {"error": f"Failed to forward email: {e!s}"} def gmail_create_draft( self, to: Union[str, List[str]], subject: str, body: str, cc: Optional[Union[str, List[str]]] = None, bcc: Optional[Union[str, List[str]]] = None, attachments: Optional[List[str]] = None, is_html: bool = False, ) -> Dict[str, Any]: r"""Create an email draft. Args: to (Union[str, List[str]]): Recipient email address(es). subject (str): Email subject. body (str): Email body content. cc (Optional[Union[str, List[str]]]): CC recipient email address(es). bcc (Optional[Union[str, List[str]]]): BCC recipient email address(es). attachments (Optional[List[str]]): List of file paths to attach. is_html (bool): Whether the body is HTML format. Set to True when sending formatted emails with HTML tags (e.g., bold, links, images). Use False (default) for plain text emails. Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: # Normalize recipients to lists to_list = [to] if isinstance(to, str) else to cc_list = [cc] if isinstance(cc, str) else (cc or []) bcc_list = [bcc] if isinstance(bcc, str) else (bcc or []) # Validate email addresses all_recipients = to_list + cc_list + bcc_list for email in all_recipients: if not self._is_valid_email(email): return {"error": f"Invalid email address: {email}"} # Create message message = self._create_message( to_list, subject, body, cc_list, bcc_list, attachments, is_html ) # Create draft draft = ( self.gmail_service.users() .drafts() .create(userId='me', body={'message': message}) .execute() ) return { "success": True, "draft_id": draft.get('id'), "message_id": draft.get('message', {}).get('id'), "message": "Draft created successfully", } except Exception as e: logger.error("Failed to create draft: %s", e) return {"error": f"Failed to create draft: {e!s}"} def gmail_send_draft(self, draft_id: str) -> Dict[str, Any]: r"""Send a draft email. Args: draft_id (str): The unique identifier of the draft to send. To get a draft ID, first use list_drafts() to list drafts, or use the 'draft_id' returned from create_email_draft(). Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: # Send draft sent_message = ( self.gmail_service.users() .drafts() .send(userId='me', body={'id': draft_id}) .execute() ) return { "success": True, "message_id": sent_message.get('id'), "thread_id": sent_message.get('threadId'), "message": "Draft sent successfully", } except Exception as e: logger.error("Failed to send draft: %s", e) return {"error": f"Failed to send draft: {e!s}"} def gmail_fetch_emails( self, query: str = "", max_results: int = 10, include_spam_trash: bool = False, label_ids: Optional[List[str]] = None, page_token: Optional[str] = None, ) -> Dict[str, Any]: r"""Fetch emails with filters and pagination. Args: query (str): Gmail search query string. Use Gmail's search syntax: - 'from:example@domain.com' - emails from specific sender - 'subject:meeting' - emails with specific subject text - 'has:attachment' - emails with attachments - 'is:unread' - unread emails - 'in:sent' - emails in sent folder - 'after:2024/01/01 before:2024/12/31' - date range Examples: 'from:john@example.com subject:project', 'is:unread has:attachment' max_results (int): Maximum number of emails to fetch. include_spam_trash (bool): Whether to include spam and trash. label_ids (Optional[List[str]]): List of label IDs to filter emails by. Only emails with ALL of the specified labels will be returned. Label IDs can be: - System labels: 'INBOX', 'SENT', 'DRAFT', 'SPAM', 'TRASH', 'UNREAD', 'STARRED', 'IMPORTANT', 'CATEGORY_PERSONAL', etc. - Custom label IDs: Retrieved from list_gmail_labels() method. page_token (Optional[str]): Pagination token from a previous response. If provided, fetches the next page of results. Returns: Dict[str, Any]: A dictionary containing the fetched emails. """ try: # Build request parameters request_params = { 'userId': 'me', 'maxResults': max_results, 'includeSpamTrash': include_spam_trash, } if query: request_params['q'] = query if label_ids: request_params['labelIds'] = label_ids # List messages if page_token: request_params['pageToken'] = page_token messages_result = ( self.gmail_service.users() .messages() .list(**request_params) .execute() ) messages = messages_result.get('messages', []) emails = [] # Fetch detailed information for each message for msg in messages: email_detail = self._get_message_details(msg['id']) if email_detail: emails.append(email_detail) return { "success": True, "emails": emails, "total_count": len(emails), "next_page_token": messages_result.get('nextPageToken'), } except Exception as e: logger.error("Failed to fetch emails: %s", e) return {"error": f"Failed to fetch emails: {e!s}"} def gmail_fetch_thread_by_id(self, thread_id: str) -> Dict[str, Any]: r"""Fetch a thread by ID. Args: thread_id (str): The unique identifier of the thread to fetch. To get a thread ID, first use list_threads() to list threads, or use the 'thread_id' returned from send_email() or reply_to_email(). Returns: Dict[str, Any]: A dictionary containing the thread details. """ try: thread = ( self.gmail_service.users() .threads() .get(userId='me', id=thread_id) .execute() ) messages = [] for message in thread.get('messages', []): message_detail = self._get_message_details(message['id']) if message_detail: messages.append(message_detail) return { "success": True, "thread_id": thread_id, "messages": messages, "message_count": len(messages), } except Exception as e: logger.error("Failed to fetch thread: %s", e) return {"error": f"Failed to fetch thread: {e!s}"} def gmail_modify_email_labels( self, message_id: str, add_labels: Optional[List[str]] = None, remove_labels: Optional[List[str]] = None, ) -> Dict[str, Any]: r"""Modify labels on an email message. Args: message_id (str): The unique identifier of the message to modify. To get a message ID, first use fetch_emails() to list messages, or use the 'message_id' returned from send_email() or create_email_draft(). add_labels (Optional[List[str]]): List of label IDs to add to the message. Label IDs can be: - System labels: 'INBOX', 'STARRED', 'IMPORTANT', 'UNREAD', etc. - Custom label IDs: Retrieved from list_gmail_labels() method. Example: ['STARRED', 'IMPORTANT'] marks email as starred and important. remove_labels (Optional[List[str]]): List of label IDs to remove from the message. Uses the same format as add_labels. Example: ['UNREAD'] marks the email as read. Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: body = {} if add_labels: body['addLabelIds'] = add_labels if remove_labels: body['removeLabelIds'] = remove_labels if not body: return {"error": "No labels to add or remove"} modified_message = ( self.gmail_service.users() .messages() .modify(userId='me', id=message_id, body=body) .execute() ) return { "success": True, "message_id": message_id, "label_ids": modified_message.get('labelIds', []), "message": "Labels modified successfully", } except Exception as e: logger.error("Failed to modify labels: %s", e) return {"error": f"Failed to modify labels: {e!s}"} def gmail_move_to_trash(self, message_id: str) -> Dict[str, Any]: r"""Move a message to trash. Args: message_id (str): The unique identifier of the message to move to trash. To get a message ID, first use fetch_emails() to list messages, or use the 'message_id' returned from send_email() or create_email_draft(). Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: trashed_message = ( self.gmail_service.users() .messages() .trash(userId='me', id=message_id) .execute() ) return { "success": True, "message_id": message_id, "label_ids": trashed_message.get('labelIds', []), "message": "Message moved to trash successfully", } except Exception as e: logger.error("Failed to move message to trash: %s", e) return {"error": f"Failed to move message to trash: {e!s}"} def gmail_get_attachment( self, message_id: str, attachment_id: str, save_path: Optional[str] = None, ) -> Dict[str, Any]: r"""Get an attachment from a message. Args: message_id (str): The unique identifier of the message containing the attachment. To get a message ID, first use fetch_emails() to list messages, or use the 'message_id' returned from send_email() or create_email_draft(). attachment_id (str): The unique identifier of the attachment to download. To get an attachment ID, first use fetch_emails() to get message details, then look for 'attachment_id' in the 'attachments' list of each message. save_path (Optional[str]): Local file path where the attachment should be saved. If provided, the attachment will be saved to this location and the response will include a success message. If not provided, the attachment data will be returned as base64-encoded content in the response. Returns: Dict[str, Any]: A dictionary containing the attachment data or save result. """ try: import base64 attachment = ( self.gmail_service.users() .messages() .attachments() .get(userId='me', messageId=message_id, id=attachment_id) .execute() ) # Decode the attachment data file_data = base64.urlsafe_b64decode(attachment['data']) if save_path: with open(save_path, 'wb') as f: f.write(file_data) return { "success": True, "message": f"Attachment saved to {save_path}", "file_size": len(file_data), } else: return { "success": True, "data": base64.b64encode(file_data).decode('utf-8'), "file_size": len(file_data), } except Exception as e: logger.error("Failed to get attachment: %s", e) return {"error": f"Failed to get attachment: {e!s}"} def gmail_list_threads( self, query: str = "", max_results: int = 10, include_spam_trash: bool = False, label_ids: Optional[List[str]] = None, page_token: Optional[str] = None, ) -> Dict[str, Any]: r"""List email threads. Args: query (str): Gmail search query string. Use Gmail's search syntax: - 'from:example@domain.com' - threads from specific sender - 'subject:meeting' - threads with specific subject text - 'has:attachment' - threads with attachments - 'is:unread' - unread threads - 'in:sent' - threads in sent folder - 'after:2024/01/01 before:2024/12/31' - date range Examples: 'from:john@example.com subject:project', 'is:unread has:attachment' max_results (int): Maximum number of threads to fetch. include_spam_trash (bool): Whether to include spam and trash. label_ids (Optional[List[str]]): List of label IDs to filter threads by. Only threads with ALL of the specified labels will be returned. Label IDs can be: - System labels: 'INBOX', 'SENT', 'DRAFT', 'SPAM', 'TRASH', 'UNREAD', 'STARRED', 'IMPORTANT', 'CATEGORY_PERSONAL', etc. - Custom label IDs: Retrieved from list_gmail_labels() method. page_token (Optional[str]): Pagination token from a previous response. If provided, fetches the next page of results. Returns: Dict[str, Any]: A dictionary containing the thread list. """ try: # Build request parameters request_params = { 'userId': 'me', 'maxResults': max_results, 'includeSpamTrash': include_spam_trash, } if query: request_params['q'] = query if label_ids: request_params['labelIds'] = label_ids # List threads if page_token: request_params['pageToken'] = page_token threads_result = ( self.gmail_service.users() .threads() .list(**request_params) .execute() ) threads = threads_result.get('threads', []) thread_list = [] for thread in threads: thread_list.append( { "thread_id": thread['id'], "snippet": thread.get('snippet', ''), "history_id": thread.get('historyId', ''), } ) return { "success": True, "threads": thread_list, "total_count": len(thread_list), "next_page_token": threads_result.get('nextPageToken'), } except Exception as e: logger.error("Failed to list threads: %s", e) return {"error": f"Failed to list threads: {e!s}"} def gmail_list_drafts( self, max_results: int = 10, page_token: Optional[str] = None ) -> Dict[str, Any]: r"""List email drafts. Args: max_results (int): Maximum number of drafts to fetch. page_token (Optional[str]): Pagination token from a previous response. If provided, fetches the next page of results. Returns: Dict[str, Any]: A dictionary containing the draft list. """ try: drafts_result = ( self.gmail_service.users() .drafts() .list( userId='me', maxResults=max_results, **({"pageToken": page_token} if page_token else {}), ) .execute() ) drafts = drafts_result.get('drafts', []) draft_list = [] for draft in drafts: draft_info = { "draft_id": draft['id'], "message_id": draft.get('message', {}).get('id', ''), "thread_id": draft.get('message', {}).get('threadId', ''), "snippet": draft.get('message', {}).get('snippet', ''), } draft_list.append(draft_info) return { "success": True, "drafts": draft_list, "total_count": len(draft_list), "next_page_token": drafts_result.get('nextPageToken'), } except Exception as e: logger.error("Failed to list drafts: %s", e) return {"error": f"Failed to list drafts: {e!s}"} def gmail_list_labels(self) -> Dict[str, Any]: r"""List all Gmail labels. Returns: Dict[str, Any]: A dictionary containing the label list. """ try: labels_result = ( self.gmail_service.users().labels().list(userId='me').execute() ) labels = labels_result.get('labels', []) label_list = [] for label in labels: label_info = { "id": label['id'], "name": label['name'], "type": label.get('type', 'user'), "messages_total": label.get('messagesTotal', 0), "messages_unread": label.get('messagesUnread', 0), "threads_total": label.get('threadsTotal', 0), "threads_unread": label.get('threadsUnread', 0), } label_list.append(label_info) return { "success": True, "labels": label_list, "total_count": len(label_list), } except Exception as e: logger.error("Failed to list labels: %s", e) return {"error": f"Failed to list labels: {e!s}"} def gmail_create_label( self, name: str, label_list_visibility: Literal["labelShow", "labelHide"] = "labelShow", message_list_visibility: Literal["show", "hide"] = "show", ) -> Dict[str, Any]: r"""Create a new Gmail label. Args: name (str): The name of the label to create. label_list_visibility (str): How the label appears in Gmail's label list. - 'labelShow': Label is visible in the label list sidebar (default) - 'labelHide': Label is hidden from the label list sidebar message_list_visibility (str): How the label appears in message lists. - 'show': Label is visible on messages in inbox/lists (default) - 'hide': Label is hidden from message displays Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: label_object = { 'name': name, 'labelListVisibility': label_list_visibility, 'messageListVisibility': message_list_visibility, } created_label = ( self.gmail_service.users() .labels() .create(userId='me', body=label_object) .execute() ) return { "success": True, "label_id": created_label['id'], "label_name": created_label['name'], "message": "Label created successfully", } except Exception as e: logger.error("Failed to create label: %s", e) return {"error": f"Failed to create label: {e!s}"} def gmail_delete_label(self, label_id: str) -> Dict[str, Any]: r"""Delete a Gmail label. Args: label_id (str): The unique identifier of the user-created label to delete. To get a label ID, first use list_gmail_labels() to list all labels. Note: System labels (e.g., 'INBOX', 'SENT', 'DRAFT', 'SPAM', 'TRASH', 'UNREAD', 'STARRED', 'IMPORTANT', 'CATEGORY_PERSONAL', etc.) cannot be deleted. Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: self.gmail_service.users().labels().delete( userId='me', id=label_id ).execute() return { "success": True, "label_id": label_id, "message": "Label deleted successfully", } except Exception as e: logger.error("Failed to delete label: %s", e) return {"error": f"Failed to delete label: {e!s}"} def gmail_modify_thread_labels( self, thread_id: str, add_labels: Optional[List[str]] = None, remove_labels: Optional[List[str]] = None, ) -> Dict[str, Any]: r"""Modify labels on a thread. Args: thread_id (str): The unique identifier of the thread to modify. To get a thread ID, first use list_threads() to list threads, or use the 'thread_id' returned from send_email() or reply_to_email(). add_labels (Optional[List[str]]): List of label IDs to add to all messages in the thread. Label IDs can be: - System labels: 'INBOX', 'STARRED', 'IMPORTANT', 'UNREAD', etc. - Custom label IDs: Retrieved from list_gmail_labels(). Example: ['STARRED', 'IMPORTANT'] marks thread as starred and important. remove_labels (Optional[List[str]]): List of label IDs to remove from all messages in the thread. Uses the same format as add_labels. Example: ['UNREAD'] marks the entire thread as read. Returns: Dict[str, Any]: A dictionary containing the result of the operation. """ try: body = {} if add_labels: body['addLabelIds'] = add_labels if remove_labels: body['removeLabelIds'] = remove_labels if not body: return {"error": "No labels to add or remove"} modified_thread = ( self.gmail_service.users() .threads() .modify(userId='me', id=thread_id, body=body) .execute() ) return { "success": True, "thread_id": thread_id, "label_ids": modified_thread.get('labelIds', []), "message": "Thread labels modified successfully", } except Exception as e: logger.error("Failed to modify thread labels: %s", e) return {"error": f"Failed to modify thread labels: {e!s}"} def gmail_get_profile(self) -> Dict[str, Any]: r"""Get Gmail profile information. Returns: Dict[str, Any]: A dictionary containing the profile information. """ try: profile = ( self.gmail_service.users().getProfile(userId='me').execute() ) return { "success": True, "profile": { "email_address": profile.get('emailAddress', ''), "messages_total": profile.get('messagesTotal', 0), "threads_total": profile.get('threadsTotal', 0), "history_id": profile.get('historyId', ''), }, } except Exception as e: logger.error("Failed to get profile: %s", e) return {"error": f"Failed to get profile: {e!s}"} def gmail_get_contacts( self, max_results: int = 100, page_token: Optional[str] = None, ) -> Dict[str, Any]: r"""List connections from Google People API. Args: max_results (int): Maximum number of contacts to fetch. page_token (Optional[str]): Pagination token from a previous response. If provided, fetches the next page of results. Returns: Dict[str, Any]: A dictionary containing the contacts. """ try: # Build request parameters request_params = { 'resourceName': 'people/me', 'personFields': ( 'names,emailAddresses,phoneNumbers,organizations' ), 'pageSize': max_results, } # Search contacts if page_token: request_params['pageToken'] = page_token contacts_result = ( self.people_service.people() .connections() .list(**request_params) .execute() ) connections = contacts_result.get('connections', []) contact_list = [] for person in connections: contact_info = { "resource_name": person.get('resourceName', ''), "names": person.get('names', []), "email_addresses": person.get('emailAddresses', []), "phone_numbers": person.get('phoneNumbers', []), "organizations": person.get('organizations', []), } contact_list.append(contact_info) return { "success": True, "contacts": contact_list, "total_count": len(contact_list), "next_page_token": contacts_result.get('nextPageToken'), } except Exception as e: logger.error("Failed to get contacts: %s", e) return {"error": f"Failed to get contacts: {e!s}"} def gmail_search_people( self, query: str, max_results: int = 10, ) -> Dict[str, Any]: r"""Search for people in contacts. Args: query (str): Search query for people in contacts. Can search by: - Name: 'John Smith' or partial names like 'John' - Email: 'john@example.com' - Organization: 'Google' or 'Acme Corp' - Phone number: '+1234567890' Examples: 'John Smith', 'john@example.com', 'Google' max_results (int): Maximum number of results to fetch. Returns: Dict[str, Any]: A dictionary containing the search results. """ try: # Search people search_result = ( self.people_service.people() .searchContacts( query=query, readMask='names,emailAddresses,phoneNumbers,organizations', pageSize=max_results, ) .execute() ) results = search_result.get('results', []) people_list = [] for result in results: person = result.get('person', {}) person_info = { "resource_name": person.get('resourceName', ''), "names": person.get('names', []), "email_addresses": person.get('emailAddresses', []), "phone_numbers": person.get('phoneNumbers', []), "organizations": person.get('organizations', []), } people_list.append(person_info) return { "success": True, "people": people_list, "total_count": len(people_list), } except Exception as e: logger.error("Failed to search people: %s", e) return {"error": f"Failed to search people: {e!s}"} # Helper methods def _get_gmail_service(self): r"""Get Gmail service object.""" from googleapiclient.discovery import build try: # Build service with optional timeout if self.timeout is not None: import httplib2 http = httplib2.Http(timeout=self.timeout) http = self._credentials.authorize(http) service = build('gmail', 'v1', http=http) else: service = build('gmail', 'v1', credentials=self._credentials) return service except Exception as e: raise ValueError(f"Failed to build Gmail service: {e}") from e def _get_people_service(self): r"""Get People service object.""" from googleapiclient.discovery import build try: # Build service with optional timeout if self.timeout is not None: import httplib2 http = httplib2.Http(timeout=self.timeout) http = self._credentials.authorize(http) service = build('people', 'v1', http=http) else: service = build('people', 'v1', credentials=self._credentials) return service except Exception as e: raise ValueError(f"Failed to build People service: {e}") from e def _authenticate(self): r"""Authenticate with Google APIs using OAuth2. Automatically saves and loads credentials from ~/.camel/gmail_token.json to avoid repeated browser logins. """ import json from pathlib import Path from dotenv import load_dotenv from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow # Look for .env file in the project root (camel/) env_file = Path(__file__).parent.parent.parent / '.env' load_dotenv(env_file) client_id = os.environ.get('GOOGLE_CLIENT_ID') client_secret = os.environ.get('GOOGLE_CLIENT_SECRET') if not client_id or not client_secret: missing_vars = [] if not client_id: missing_vars.append('GOOGLE_CLIENT_ID') if not client_secret: missing_vars.append('GOOGLE_CLIENT_SECRET') raise ValueError( f"Missing required environment variables: " f"{', '.join(missing_vars)}. " "Please set these in your .env file or environment variables." ) token_file = Path.home() / '.camel' / 'gmail_token.json' creds = None # COMPONENT 1: Load saved credentials if token_file.exists(): try: with open(token_file, 'r') as f: data = json.load(f) creds = Credentials( token=data.get('token'), refresh_token=data.get('refresh_token'), token_uri=data.get( 'token_uri', 'https://oauth2.googleapis.com/token' ), client_id=client_id, client_secret=client_secret, scopes=SCOPES, ) except Exception as e: logger.warning(f"Failed to load saved token: {e}") creds = None # COMPONENT 2: Refresh if expired if creds and creds.expired and creds.refresh_token: try: creds.refresh(Request()) logger.info("Access token refreshed") # Save refreshed credentials to disk token_file.parent.mkdir(parents=True, exist_ok=True) try: with open(token_file, 'w') as f: json.dump( { 'token': creds.token, 'refresh_token': creds.refresh_token, 'token_uri': creds.token_uri, 'scopes': creds.scopes, }, f, ) os.chmod(token_file, 0o600) logger.info(f"Refreshed credentials saved to {token_file}") except Exception as e: logger.warning( f"Failed to save refreshed credentials to " f"{token_file}: {e}. " "Token refreshed but not persisted." ) return creds except Exception as e: logger.warning(f"Token refresh failed: {e}") creds = None # COMPONENT 3: Return if valid if creds and creds.valid: return creds # COMPONENT 4: Browser OAuth (first-time or invalid credentials) client_config = { "installed": { "client_id": client_id, "client_secret": client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": ["http://localhost"], } } flow = InstalledAppFlow.from_client_config(client_config, SCOPES) creds = flow.run_local_server(port=0) # Save new credentials token_file.parent.mkdir(parents=True, exist_ok=True) try: with open(token_file, 'w') as f: json.dump( { 'token': creds.token, 'refresh_token': creds.refresh_token, 'token_uri': creds.token_uri, 'scopes': creds.scopes, }, f, ) os.chmod(token_file, 0o600) logger.info(f"Credentials saved to {token_file}") except Exception as e: logger.warning( f"Failed to save credentials to {token_file}: {e}. " "You may need to re-authenticate next time." ) return creds def _create_message( self, to_list: List[str], subject: str, body: str, cc_list: Optional[List[str]] = None, bcc_list: Optional[List[str]] = None, attachments: Optional[List[str]] = None, is_html: bool = False, in_reply_to: Optional[str] = None, references: Optional[List[str]] = None, ) -> Dict[str, str]: r"""Create a message object for sending.""" import base64 from email import encoders from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText message = MIMEMultipart() message['to'] = ', '.join(to_list) message['subject'] = subject if cc_list: message['cc'] = ', '.join(cc_list) if bcc_list: message['bcc'] = ', '.join(bcc_list) # Set reply headers when provided if in_reply_to: message['In-Reply-To'] = in_reply_to if references: message['References'] = ' '.join(references) # Add body if is_html: message.attach(MIMEText(body, 'html')) else: message.attach(MIMEText(body, 'plain')) # Add attachments if attachments: for file_path in attachments: if os.path.isfile(file_path): with open(file_path, "rb") as attachment: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename= ' f'{os.path.basename(file_path)}', ) message.attach(part) # Encode message raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode( 'utf-8' ) return {'raw': raw_message} def _get_message_details( self, message_id: str ) -> Optional[Dict[str, Any]]: r"""Get detailed information about a message.""" try: message = ( self.gmail_service.users() .messages() .get(userId='me', id=message_id) .execute() ) headers = message['payload'].get('headers', []) # Build a name->value map in one pass (case-insensitive) header_map = {} for header in headers: name = header.get('name') if name: header_map[name.lower()] = header.get('value', '') return { "message_id": message['id'], "thread_id": message['threadId'], "snippet": message.get('snippet', ''), "subject": header_map.get('subject', ''), "from": header_map.get('from', ''), "to": header_map.get('to', ''), "cc": header_map.get('cc', ''), "bcc": header_map.get('bcc', ''), "date": header_map.get('date', ''), "body": self._extract_message_body(message), "attachments": self._extract_attachments( message, include_inline=True ), "label_ids": message.get('labelIds', []), "size_estimate": message.get('sizeEstimate', 0), } except Exception as e: logger.error("Failed to get message details: %s", e) return None def _get_header_value( self, headers: List[Dict[str, str]], name: str ) -> str: r"""Get header value by name.""" for header in headers: if header['name'].lower() == name.lower(): return header['value'] return "" def _extract_message_body(self, message: Dict[str, Any]) -> str: r"""Extract message body from message payload. Recursively traverses the entire message tree and collects all text content from text/plain and text/html parts. Special handling for multipart/alternative containers: recursively searches for one format (preferring plain text) to avoid duplication when both formats contain the same content. All other text parts are collected to ensure no information is lost. Args: message (Dict[str, Any]): The Gmail message dictionary containing the payload to extract text from. Returns: str: The extracted message body text with multiple parts separated by double newlines, or an empty string if no text content is found. """ import base64 import re text_parts = [] def decode_text_data(data: str, mime_type: str) -> Optional[str]: """Helper to decode base64 text data. Args: data: Base64 encoded text data. mime_type: MIME type for logging purposes. Returns: Decoded text string, or None if decoding fails or text is empty. """ if not data: return None try: text = base64.urlsafe_b64decode(data).decode('utf-8') return text if text.strip() else None except Exception as e: logger.warning(f"Failed to decode {mime_type}: {e}") return None def strip_html_tags(html_content: str) -> str: """Strip HTML tags and convert to readable plain text. Uses regex to remove tags and clean up formatting while preserving basic document structure. Args: html_content: HTML content to strip. Returns: Plain text version of HTML content. """ if not html_content or not html_content.strip(): return "" text = html_content # Remove script and style elements completely text = re.sub( r']*>.*?', '', text, flags=re.DOTALL | re.IGNORECASE, ) text = re.sub( r']*>.*?', '', text, flags=re.DOTALL | re.IGNORECASE, ) # Convert common HTML entities text = text.replace(' ', ' ') text = text.replace('&', '&') text = text.replace('<', '<') text = text.replace('>', '>') text = text.replace('"', '"') text = text.replace(''', "'") text = text.replace('’', "'") text = text.replace('‘', "'") text = text.replace('”', '"') text = text.replace('“', '"') text = text.replace('—', '—') text = text.replace('–', '-') # Convert
and
to newlines text = re.sub(r'', '\n', text, flags=re.IGNORECASE) # Convert block-level closing tags to newlines text = re.sub( r'', '\n', text, flags=re.IGNORECASE ) # Convert
to separator text = re.sub(r'', '\n---\n', text, flags=re.IGNORECASE) # Remove all remaining HTML tags text = re.sub(r'<[^>]+>', '', text) # Clean up whitespace text = re.sub( r'\n\s*\n\s*\n+', '\n\n', text ) # Multiple blank lines to double newline text = re.sub(r' +', ' ', text) # Multiple spaces to single space text = re.sub(r'\n ', '\n', text) # Remove leading spaces on lines text = re.sub( r' \n', '\n', text ) # Remove trailing spaces on lines return text.strip() def find_text_recursive( part: Dict[str, Any], target_mime: str ) -> Optional[str]: """Recursively search for text content of a specific MIME type. Args: part: Message part to search in. target_mime: Target MIME type ('text/plain' or 'text/html'). Returns: Decoded text string if found, None otherwise. """ mime = part.get('mimeType', '') # Found the target type at this level if mime == target_mime: data = part.get('body', {}).get('data', '') decoded = decode_text_data(data, target_mime) # Strip HTML tags if this is HTML content if decoded and target_mime == 'text/html': return strip_html_tags(decoded) return decoded # Not found, but has nested parts? Search recursively if 'parts' in part: for nested_part in part['parts']: result = find_text_recursive(nested_part, target_mime) if result: return result return None def extract_from_part(part: Dict[str, Any]): """Recursively collect all text from message parts.""" mime_type = part.get('mimeType', '') # Special handling for multipart/alternative if mime_type == 'multipart/alternative' and 'parts' in part: # Recursively search for one format (prefer plain text) plain_text = None html_text = None # Search each alternative branch recursively for nested_part in part['parts']: if not plain_text: plain_text = find_text_recursive( nested_part, 'text/plain' ) if not html_text: html_text = find_text_recursive( nested_part, 'text/html' ) # Prefer plain text, fall back to HTML chosen_text = plain_text if plain_text else html_text if chosen_text: text_parts.append(chosen_text) # If this part has nested parts (but not multipart/alternative) elif 'parts' in part: for nested_part in part['parts']: extract_from_part(nested_part) # If this is a text leaf, extract and collect it elif mime_type == 'text/plain': data = part.get('body', {}).get('data', '') text = decode_text_data(data, 'plain text body') if text: text_parts.append(text) elif mime_type == 'text/html': data = part.get('body', {}).get('data', '') html_text = decode_text_data(data, 'HTML body') if html_text: text = strip_html_tags(html_text) if text: text_parts.append(text) # Traverse the entire tree and collect all text parts payload = message.get('payload', {}) extract_from_part(payload) if not text_parts: return "" # Return all text parts combined return '\n\n'.join(text_parts) def _extract_attachments( self, message: Dict[str, Any], include_inline: bool = False ) -> List[Dict[str, Any]]: r"""Extract attachment information from message payload. Recursively traverses the message tree to find all attachments and extracts their metadata. Distinguishes between regular attachments and inline images embedded in HTML content. Args: message (Dict[str, Any]): The Gmail message dictionary containing the payload to extract attachments from. Returns: List[Dict[str, Any]]: List of attachment dictionaries, each containing: - attachment_id: Gmail's unique identifier for the attachment - filename: Name of the attached file - mime_type: MIME type of the attachment - size: Size of the attachment in bytes - is_inline: Whether this is an inline image (embedded in HTML) """ attachments = [] def is_inline_image(part: Dict[str, Any]) -> bool: """Check if this part is an inline image.""" headers = part.get('headers', []) for header in headers: name = header.get('name', '').lower() value = header.get('value', '').lower() # Check for Content-Disposition: inline if name == 'content-disposition' and 'inline' in value: return True # Check for Content-ID (usually indicates inline) if name == 'content-id': return True return False def find_attachments(part: Dict[str, Any]): """Recursively find attachments in message parts.""" # Check if this part has an attachmentId (indicates it's an # attachment) if 'body' in part and 'attachmentId' in part['body']: attachment_info = { 'attachment_id': part['body']['attachmentId'], 'filename': part.get('filename', 'unnamed'), 'mime_type': part.get( 'mimeType', 'application/octet-stream' ), 'size': part['body'].get('size', 0), 'is_inline': is_inline_image(part), } attachments.append(attachment_info) # Recurse into nested parts if 'parts' in part: for nested_part in part['parts']: find_attachments(nested_part) # Start traversal from the message payload payload = message.get('payload', {}) if payload: find_attachments(payload) # Return based on include_inline toggle if include_inline: return attachments return [att for att in attachments if not att['is_inline']] def _is_valid_email(self, email: str) -> bool: r"""Validate email address format. Supports both formats: - Plain email: john@example.com - Named email: John Doe """ # Extract email from "Name " format if present match = re.search(r'<([^>]+)>$', email.strip()) email_to_check = match.group(1) if match else email.strip() # Validate the email address pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email_to_check) is not None def get_tools(self) -> List[FunctionTool]: r"""Returns a list of FunctionTool objects representing the functions in the toolkit. Returns: List[FunctionTool]: A list of FunctionTool objects representing the functions in the toolkit. """ return [ FunctionTool(self.gmail_send_email), FunctionTool(self.gmail_reply_to_email), FunctionTool(self.gmail_forward_email), FunctionTool(self.gmail_create_draft), FunctionTool(self.gmail_send_draft), FunctionTool(self.gmail_fetch_emails), FunctionTool(self.gmail_fetch_thread_by_id), FunctionTool(self.gmail_modify_email_labels), FunctionTool(self.gmail_move_to_trash), FunctionTool(self.gmail_get_attachment), FunctionTool(self.gmail_list_threads), FunctionTool(self.gmail_list_drafts), FunctionTool(self.gmail_list_labels), FunctionTool(self.gmail_create_label), FunctionTool(self.gmail_delete_label), FunctionTool(self.gmail_modify_thread_labels), FunctionTool(self.gmail_get_profile), FunctionTool(self.gmail_get_contacts), FunctionTool(self.gmail_search_people), ]