eigent/backend/camel/toolkits/gmail_toolkit.py
2026-03-31 17:20:08 +08:00

1839 lines
68 KiB
Python

# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
import os
import re
from typing import Any, Dict, List, Literal, Optional, Union
from camel.logger import get_logger
from camel.toolkits import FunctionTool
from camel.toolkits.base import BaseToolkit
from camel.utils import MCPServer
logger = get_logger(__name__)
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/gmail.compose',
'https://www.googleapis.com/auth/gmail.labels',
'https://www.googleapis.com/auth/contacts.readonly',
'https://www.googleapis.com/auth/userinfo.profile',
]
@MCPServer()
class GmailToolkit(BaseToolkit):
r"""A comprehensive toolkit for Gmail operations.
This class provides methods for Gmail operations including sending emails,
managing drafts, fetching messages, managing labels, and handling contacts.
API keys can be accessed in google cloud console (https://console.cloud.google.com/)
"""
def __init__(
self,
timeout: Optional[float] = None,
):
r"""Initializes a new instance of the GmailToolkit class.
Args:
timeout (Optional[float]): The timeout value for API requests
in seconds. If None, no timeout is applied.
(default: :obj:`None`)
"""
super().__init__(timeout=timeout)
self._credentials = self._authenticate()
self.gmail_service: Any = self._get_gmail_service()
self._people_service: Any = None
@property
def people_service(self) -> Any:
r"""Lazily initialize and return the Google People service."""
if self._people_service is None:
self._people_service = self._get_people_service()
return self._people_service
@people_service.setter
def people_service(self, service: Any) -> None:
r"""Allow overriding/injecting the People service (e.g., in tests)."""
self._people_service = service
def gmail_send_email(
self,
to: Union[str, List[str]],
subject: str,
body: str,
cc: Optional[Union[str, List[str]]] = None,
bcc: Optional[Union[str, List[str]]] = None,
attachments: Optional[List[str]] = None,
is_html: bool = False,
) -> Dict[str, Any]:
r"""Send an email through Gmail.
Args:
to (Union[str, List[str]]): Recipient email address(es).
subject (str): Email subject.
body (str): Email body content.
cc (Optional[Union[str, List[str]]]): CC recipient email
address(es).
bcc (Optional[Union[str, List[str]]]): BCC recipient email
address(es).
attachments (Optional[List[str]]): List of file paths to attach.
is_html (bool): Whether the body is HTML format. Set to True when
sending formatted emails with HTML tags (e.g., bold,
links, images). Use False (default) for plain text emails.
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
# Normalize recipients to lists
to_list = [to] if isinstance(to, str) else to
cc_list = [cc] if isinstance(cc, str) else (cc or [])
bcc_list = [bcc] if isinstance(bcc, str) else (bcc or [])
# Validate email addresses
all_recipients = to_list + cc_list + bcc_list
for email in all_recipients:
if not self._is_valid_email(email):
return {"error": f"Invalid email address: {email}"}
# Create message
message = self._create_message(
to_list, subject, body, cc_list, bcc_list, attachments, is_html
)
# Send message
sent_message = (
self.gmail_service.users()
.messages()
.send(userId='me', body=message)
.execute()
)
return {
"success": True,
"message_id": sent_message.get('id'),
"thread_id": sent_message.get('threadId'),
"message": "Email sent successfully",
}
except Exception as e:
logger.error("Failed to send email: %s", e)
return {"error": f"Failed to send email: {e!s}"}
def gmail_reply_to_email(
self,
message_id: str,
reply_body: str,
reply_all: bool = False,
is_html: bool = False,
) -> Dict[str, Any]:
r"""Reply to an email message.
Args:
message_id (str): The unique identifier of the message to reply to.
To get a message ID, first use fetch_emails() to list messages,
or use the 'message_id' returned from send_email() or
create_email_draft().
reply_body (str): The reply message body.
reply_all (bool): Whether to reply to all recipients.
is_html (bool): Whether the body is HTML format. Set to True when
sending formatted emails with HTML tags (e.g., bold,
links, images). Use False (default) for plain text emails.
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
# Get the original message
original_message = (
self.gmail_service.users()
.messages()
.get(userId='me', id=message_id)
.execute()
)
# Extract headers (single pass, case-insensitive)
headers = original_message['payload'].get('headers', [])
subject = from_email = to_emails = cc_emails = None
missing = {'subject', 'from', 'to', 'cc'}
for header in headers:
name = (header.get('name') or '').lower()
if name not in missing:
continue
value = header.get('value')
if name == 'subject':
subject = value
elif name == 'from':
from_email = value
elif name == 'to':
to_emails = value
elif name == 'cc':
cc_emails = value
missing.discard(name)
if not missing:
break
# Extract identifiers for reply context
message_id_header = self._get_header_value(
headers, 'Message-Id'
) or self._get_header_value(headers, 'Message-ID')
thread_id = original_message.get('threadId')
# Prepare reply subject
if subject and not subject.startswith('Re: '):
subject = f"Re: {subject}"
elif not subject:
subject = "Re: (No Subject)"
# Validate from_email
if not from_email:
return {"error": "Original message has no sender address"}
# Prepare recipients
if reply_all:
recipients = [from_email]
if to_emails:
recipients.extend(
[email.strip() for email in to_emails.split(',')]
)
if cc_emails:
recipients.extend(
[email.strip() for email in cc_emails.split(',')]
)
# Remove duplicates and None values
recipients = [r for r in list(set(recipients)) if r]
# Get current user's email and remove it from recipients
try:
profile_result = self.gmail_get_profile()
if profile_result.get('success'):
current_user_email = profile_result['profile'][
'email_address'
]
# Remove current user from recipients (handle both
# plain email and "Name <email>" format)
filtered_recipients = []
for email in recipients:
# Extract email from "Name <email>" format
match = re.search(r'<([^>]+)>$', email.strip())
email_addr = (
match.group(1) if match else email.strip()
)
if email_addr != current_user_email:
filtered_recipients.append(email)
recipients = filtered_recipients
except Exception as e:
logger.warning(
"Could not get current user email to filter from "
"recipients: %s",
e,
)
else:
recipients = [from_email]
# Create reply message with reply headers
message = self._create_message(
recipients,
subject,
reply_body,
is_html=is_html,
in_reply_to=message_id_header or original_message.get('id'),
references=[message_id_header] if message_id_header else None,
)
# Send reply in the same thread
sent_message = (
self.gmail_service.users()
.messages()
.send(userId='me', body={**message, 'threadId': thread_id})
.execute()
)
return {
"success": True,
"message_id": sent_message.get('id'),
"thread_id": sent_message.get('threadId'),
"message": "Reply sent successfully",
}
except Exception as e:
logger.error("Failed to reply to email: %s", e)
return {"error": f"Failed to reply to email: {e!s}"}
def gmail_forward_email(
self,
message_id: str,
to: Union[str, List[str]],
forward_body: Optional[str] = None,
cc: Optional[Union[str, List[str]]] = None,
bcc: Optional[Union[str, List[str]]] = None,
include_attachments: bool = True,
) -> Dict[str, Any]:
r"""Forward an email message.
Args:
message_id (str): The unique identifier of the message to forward.
To get a message ID, first use fetch_emails() to list messages,
or use the 'message_id' returned from send_email() or
create_email_draft().
to (Union[str, List[str]]): Recipient email address(es).
forward_body (Optional[str]): Additional message to include at
the top of the forwarded email, before the original message
content. If not provided, only the original message will be
forwarded.
cc (Optional[Union[str, List[str]]]): CC recipient email
address(es).
bcc (Optional[Union[str, List[str]]]): BCC recipient email
address(es).
include_attachments (bool): Whether to include original
attachments. Defaults to True. Only includes real
attachments, not inline images.
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation, including the number of attachments forwarded.
"""
try:
import tempfile
# Get the original message
original_message = (
self.gmail_service.users()
.messages()
.get(userId='me', id=message_id)
.execute()
)
# Extract headers (single pass, case-insensitive)
headers = original_message['payload'].get('headers', [])
subject = from_email = date = None
missing = {'subject', 'from', 'date'}
for header in headers:
name = (header.get('name') or '').lower()
if name not in missing:
continue
value = header.get('value')
if name == 'subject':
subject = value
elif name == 'from':
from_email = value
elif name == 'date':
date = value
missing.discard(name)
if not missing:
break
# Prepare forward subject
if subject and not subject.startswith('Fwd: '):
subject = f"Fwd: {subject}"
elif not subject:
subject = "Fwd: (No Subject)"
# Prepare forward body
if forward_body:
body = f"{forward_body}\n\n--- Forwarded message ---\n"
else:
body = "--- Forwarded message ---\n"
body += f"From: {from_email}\n"
body += f"Date: {date}\n"
body += f"Subject: {subject.replace('Fwd: ', '')}\n\n"
# Add original message body
body += self._extract_message_body(original_message)
# Normalize recipients
to_list = [to] if isinstance(to, str) else to
cc_list = [cc] if isinstance(cc, str) else (cc or [])
bcc_list = [bcc] if isinstance(bcc, str) else (bcc or [])
# Handle attachments
attachment_paths = []
temp_files: List[str] = []
try:
if include_attachments:
# Extract attachment metadata
attachments = self._extract_attachments(original_message)
for att in attachments:
try:
# Create temp file
temp_file = tempfile.NamedTemporaryFile(
delete=False, suffix=f"_{att['filename']}"
)
temp_files.append(temp_file.name)
# Download attachment
result = self.gmail_get_attachment(
message_id=message_id,
attachment_id=att['attachment_id'],
save_path=temp_file.name,
)
if result.get('success'):
attachment_paths.append(temp_file.name)
except Exception as e:
logger.warning(
f"Failed to download attachment "
f"{att['filename']}: {e}"
)
# Create forward message (now with attachments!)
message = self._create_message(
to_list,
subject,
body,
cc_list,
bcc_list,
attachments=attachment_paths if attachment_paths else None,
)
# Send forward
sent_message = (
self.gmail_service.users()
.messages()
.send(userId='me', body=message)
.execute()
)
return {
"success": True,
"message_id": sent_message.get('id'),
"thread_id": sent_message.get('threadId'),
"message": "Email forwarded successfully",
"attachments_forwarded": len(attachment_paths),
}
finally:
# Clean up temp files
for temp_file_path in temp_files:
try:
os.unlink(temp_file_path)
except Exception as e:
logger.warning(
f"Failed to delete temp file {temp_file_path}: {e}"
)
except Exception as e:
logger.error("Failed to forward email: %s", e)
return {"error": f"Failed to forward email: {e!s}"}
def gmail_create_draft(
self,
to: Union[str, List[str]],
subject: str,
body: str,
cc: Optional[Union[str, List[str]]] = None,
bcc: Optional[Union[str, List[str]]] = None,
attachments: Optional[List[str]] = None,
is_html: bool = False,
) -> Dict[str, Any]:
r"""Create an email draft.
Args:
to (Union[str, List[str]]): Recipient email address(es).
subject (str): Email subject.
body (str): Email body content.
cc (Optional[Union[str, List[str]]]): CC recipient email
address(es).
bcc (Optional[Union[str, List[str]]]): BCC recipient email
address(es).
attachments (Optional[List[str]]): List of file paths to attach.
is_html (bool): Whether the body is HTML format. Set to True when
sending formatted emails with HTML tags (e.g., bold,
links, images). Use False (default) for plain text emails.
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
# Normalize recipients to lists
to_list = [to] if isinstance(to, str) else to
cc_list = [cc] if isinstance(cc, str) else (cc or [])
bcc_list = [bcc] if isinstance(bcc, str) else (bcc or [])
# Validate email addresses
all_recipients = to_list + cc_list + bcc_list
for email in all_recipients:
if not self._is_valid_email(email):
return {"error": f"Invalid email address: {email}"}
# Create message
message = self._create_message(
to_list, subject, body, cc_list, bcc_list, attachments, is_html
)
# Create draft
draft = (
self.gmail_service.users()
.drafts()
.create(userId='me', body={'message': message})
.execute()
)
return {
"success": True,
"draft_id": draft.get('id'),
"message_id": draft.get('message', {}).get('id'),
"message": "Draft created successfully",
}
except Exception as e:
logger.error("Failed to create draft: %s", e)
return {"error": f"Failed to create draft: {e!s}"}
def gmail_send_draft(self, draft_id: str) -> Dict[str, Any]:
r"""Send a draft email.
Args:
draft_id (str): The unique identifier of the draft to send.
To get a draft ID, first use list_drafts() to list drafts,
or use the 'draft_id' returned from create_email_draft().
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
# Send draft
sent_message = (
self.gmail_service.users()
.drafts()
.send(userId='me', body={'id': draft_id})
.execute()
)
return {
"success": True,
"message_id": sent_message.get('id'),
"thread_id": sent_message.get('threadId'),
"message": "Draft sent successfully",
}
except Exception as e:
logger.error("Failed to send draft: %s", e)
return {"error": f"Failed to send draft: {e!s}"}
def gmail_fetch_emails(
self,
query: str = "",
max_results: int = 10,
include_spam_trash: bool = False,
label_ids: Optional[List[str]] = None,
page_token: Optional[str] = None,
) -> Dict[str, Any]:
r"""Fetch emails with filters and pagination.
Args:
query (str): Gmail search query string. Use Gmail's search syntax:
- 'from:example@domain.com' - emails from specific sender
- 'subject:meeting' - emails with specific subject text
- 'has:attachment' - emails with attachments
- 'is:unread' - unread emails
- 'in:sent' - emails in sent folder
- 'after:2024/01/01 before:2024/12/31' - date range
Examples: 'from:john@example.com subject:project', 'is:unread
has:attachment'
max_results (int): Maximum number of emails to fetch.
include_spam_trash (bool): Whether to include spam and trash.
label_ids (Optional[List[str]]): List of label IDs to filter
emails by. Only emails with ALL of the specified
labels will be returned.
Label IDs can be:
- System labels: 'INBOX', 'SENT', 'DRAFT', 'SPAM', 'TRASH',
'UNREAD', 'STARRED', 'IMPORTANT', 'CATEGORY_PERSONAL', etc.
- Custom label IDs: Retrieved from list_gmail_labels() method.
page_token (Optional[str]): Pagination token from a previous
response. If provided, fetches the next page of results.
Returns:
Dict[str, Any]: A dictionary containing the fetched emails.
"""
try:
# Build request parameters
request_params = {
'userId': 'me',
'maxResults': max_results,
'includeSpamTrash': include_spam_trash,
}
if query:
request_params['q'] = query
if label_ids:
request_params['labelIds'] = label_ids
# List messages
if page_token:
request_params['pageToken'] = page_token
messages_result = (
self.gmail_service.users()
.messages()
.list(**request_params)
.execute()
)
messages = messages_result.get('messages', [])
emails = []
# Fetch detailed information for each message
for msg in messages:
email_detail = self._get_message_details(msg['id'])
if email_detail:
emails.append(email_detail)
return {
"success": True,
"emails": emails,
"total_count": len(emails),
"next_page_token": messages_result.get('nextPageToken'),
}
except Exception as e:
logger.error("Failed to fetch emails: %s", e)
return {"error": f"Failed to fetch emails: {e!s}"}
def gmail_fetch_thread_by_id(self, thread_id: str) -> Dict[str, Any]:
r"""Fetch a thread by ID.
Args:
thread_id (str): The unique identifier of the thread to fetch.
To get a thread ID, first use list_threads() to list threads,
or use the 'thread_id' returned from send_email() or
reply_to_email().
Returns:
Dict[str, Any]: A dictionary containing the thread details.
"""
try:
thread = (
self.gmail_service.users()
.threads()
.get(userId='me', id=thread_id)
.execute()
)
messages = []
for message in thread.get('messages', []):
message_detail = self._get_message_details(message['id'])
if message_detail:
messages.append(message_detail)
return {
"success": True,
"thread_id": thread_id,
"messages": messages,
"message_count": len(messages),
}
except Exception as e:
logger.error("Failed to fetch thread: %s", e)
return {"error": f"Failed to fetch thread: {e!s}"}
def gmail_modify_email_labels(
self,
message_id: str,
add_labels: Optional[List[str]] = None,
remove_labels: Optional[List[str]] = None,
) -> Dict[str, Any]:
r"""Modify labels on an email message.
Args:
message_id (str): The unique identifier of the message to modify.
To get a message ID, first use fetch_emails() to list messages,
or use the 'message_id' returned from send_email() or
create_email_draft().
add_labels (Optional[List[str]]): List of label IDs to add to
the message.
Label IDs can be:
- System labels: 'INBOX', 'STARRED', 'IMPORTANT',
'UNREAD', etc.
- Custom label IDs: Retrieved from list_gmail_labels() method.
Example: ['STARRED', 'IMPORTANT'] marks email as starred
and important.
remove_labels (Optional[List[str]]): List of label IDs to
remove from the message. Uses the same format as add_labels.
Example: ['UNREAD'] marks the email as read.
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
body = {}
if add_labels:
body['addLabelIds'] = add_labels
if remove_labels:
body['removeLabelIds'] = remove_labels
if not body:
return {"error": "No labels to add or remove"}
modified_message = (
self.gmail_service.users()
.messages()
.modify(userId='me', id=message_id, body=body)
.execute()
)
return {
"success": True,
"message_id": message_id,
"label_ids": modified_message.get('labelIds', []),
"message": "Labels modified successfully",
}
except Exception as e:
logger.error("Failed to modify labels: %s", e)
return {"error": f"Failed to modify labels: {e!s}"}
def gmail_move_to_trash(self, message_id: str) -> Dict[str, Any]:
r"""Move a message to trash.
Args:
message_id (str): The unique identifier of the message to move to
trash. To get a message ID, first use fetch_emails() to list
messages, or use the 'message_id' returned from send_email()
or create_email_draft().
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
trashed_message = (
self.gmail_service.users()
.messages()
.trash(userId='me', id=message_id)
.execute()
)
return {
"success": True,
"message_id": message_id,
"label_ids": trashed_message.get('labelIds', []),
"message": "Message moved to trash successfully",
}
except Exception as e:
logger.error("Failed to move message to trash: %s", e)
return {"error": f"Failed to move message to trash: {e!s}"}
def gmail_get_attachment(
self,
message_id: str,
attachment_id: str,
save_path: Optional[str] = None,
) -> Dict[str, Any]:
r"""Get an attachment from a message.
Args:
message_id (str): The unique identifier of the message containing
the attachment. To get a message ID, first use fetch_emails()
to list messages, or use the 'message_id' returned from
send_email() or create_email_draft().
attachment_id (str): The unique identifier of the attachment to
download. To get an attachment ID, first use fetch_emails() to
get message details, then look for 'attachment_id' in the
'attachments' list of each message.
save_path (Optional[str]): Local file path where the attachment
should be saved. If provided, the attachment will be saved to
this location and the response will include a success message.
If not provided, the attachment data will be returned as
base64-encoded content in the response.
Returns:
Dict[str, Any]: A dictionary containing the attachment data or
save result.
"""
try:
import base64
attachment = (
self.gmail_service.users()
.messages()
.attachments()
.get(userId='me', messageId=message_id, id=attachment_id)
.execute()
)
# Decode the attachment data
file_data = base64.urlsafe_b64decode(attachment['data'])
if save_path:
with open(save_path, 'wb') as f:
f.write(file_data)
return {
"success": True,
"message": f"Attachment saved to {save_path}",
"file_size": len(file_data),
}
else:
return {
"success": True,
"data": base64.b64encode(file_data).decode('utf-8'),
"file_size": len(file_data),
}
except Exception as e:
logger.error("Failed to get attachment: %s", e)
return {"error": f"Failed to get attachment: {e!s}"}
def gmail_list_threads(
self,
query: str = "",
max_results: int = 10,
include_spam_trash: bool = False,
label_ids: Optional[List[str]] = None,
page_token: Optional[str] = None,
) -> Dict[str, Any]:
r"""List email threads.
Args:
query (str): Gmail search query string. Use Gmail's search syntax:
- 'from:example@domain.com' - threads from specific sender
- 'subject:meeting' - threads with specific subject text
- 'has:attachment' - threads with attachments
- 'is:unread' - unread threads
- 'in:sent' - threads in sent folder
- 'after:2024/01/01 before:2024/12/31' - date range
Examples: 'from:john@example.com subject:project', 'is:unread
has:attachment'
max_results (int): Maximum number of threads to fetch.
include_spam_trash (bool): Whether to include spam and trash.
label_ids (Optional[List[str]]): List of label IDs to filter
threads by. Only threads with ALL of the specified labels
will be returned.
Label IDs can be:
- System labels: 'INBOX', 'SENT', 'DRAFT', 'SPAM', 'TRASH',
'UNREAD', 'STARRED', 'IMPORTANT', 'CATEGORY_PERSONAL', etc.
- Custom label IDs: Retrieved from list_gmail_labels() method.
page_token (Optional[str]): Pagination token from a previous
response. If provided, fetches the next page of results.
Returns:
Dict[str, Any]: A dictionary containing the thread list.
"""
try:
# Build request parameters
request_params = {
'userId': 'me',
'maxResults': max_results,
'includeSpamTrash': include_spam_trash,
}
if query:
request_params['q'] = query
if label_ids:
request_params['labelIds'] = label_ids
# List threads
if page_token:
request_params['pageToken'] = page_token
threads_result = (
self.gmail_service.users()
.threads()
.list(**request_params)
.execute()
)
threads = threads_result.get('threads', [])
thread_list = []
for thread in threads:
thread_list.append(
{
"thread_id": thread['id'],
"snippet": thread.get('snippet', ''),
"history_id": thread.get('historyId', ''),
}
)
return {
"success": True,
"threads": thread_list,
"total_count": len(thread_list),
"next_page_token": threads_result.get('nextPageToken'),
}
except Exception as e:
logger.error("Failed to list threads: %s", e)
return {"error": f"Failed to list threads: {e!s}"}
def gmail_list_drafts(
self, max_results: int = 10, page_token: Optional[str] = None
) -> Dict[str, Any]:
r"""List email drafts.
Args:
max_results (int): Maximum number of drafts to fetch.
page_token (Optional[str]): Pagination token from a previous
response. If provided, fetches the next page of results.
Returns:
Dict[str, Any]: A dictionary containing the draft list.
"""
try:
drafts_result = (
self.gmail_service.users()
.drafts()
.list(
userId='me',
maxResults=max_results,
**({"pageToken": page_token} if page_token else {}),
)
.execute()
)
drafts = drafts_result.get('drafts', [])
draft_list = []
for draft in drafts:
draft_info = {
"draft_id": draft['id'],
"message_id": draft.get('message', {}).get('id', ''),
"thread_id": draft.get('message', {}).get('threadId', ''),
"snippet": draft.get('message', {}).get('snippet', ''),
}
draft_list.append(draft_info)
return {
"success": True,
"drafts": draft_list,
"total_count": len(draft_list),
"next_page_token": drafts_result.get('nextPageToken'),
}
except Exception as e:
logger.error("Failed to list drafts: %s", e)
return {"error": f"Failed to list drafts: {e!s}"}
def gmail_list_labels(self) -> Dict[str, Any]:
r"""List all Gmail labels.
Returns:
Dict[str, Any]: A dictionary containing the label list.
"""
try:
labels_result = (
self.gmail_service.users().labels().list(userId='me').execute()
)
labels = labels_result.get('labels', [])
label_list = []
for label in labels:
label_info = {
"id": label['id'],
"name": label['name'],
"type": label.get('type', 'user'),
"messages_total": label.get('messagesTotal', 0),
"messages_unread": label.get('messagesUnread', 0),
"threads_total": label.get('threadsTotal', 0),
"threads_unread": label.get('threadsUnread', 0),
}
label_list.append(label_info)
return {
"success": True,
"labels": label_list,
"total_count": len(label_list),
}
except Exception as e:
logger.error("Failed to list labels: %s", e)
return {"error": f"Failed to list labels: {e!s}"}
def gmail_create_label(
self,
name: str,
label_list_visibility: Literal["labelShow", "labelHide"] = "labelShow",
message_list_visibility: Literal["show", "hide"] = "show",
) -> Dict[str, Any]:
r"""Create a new Gmail label.
Args:
name (str): The name of the label to create.
label_list_visibility (str): How the label appears in Gmail's
label list. - 'labelShow': Label is visible in the label list
sidebar (default) - 'labelHide': Label is hidden from the
label list sidebar
message_list_visibility (str): How the label appears in message
lists. - 'show': Label is visible on messages in inbox/lists
(default) - 'hide': Label is hidden from message displays
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
label_object = {
'name': name,
'labelListVisibility': label_list_visibility,
'messageListVisibility': message_list_visibility,
}
created_label = (
self.gmail_service.users()
.labels()
.create(userId='me', body=label_object)
.execute()
)
return {
"success": True,
"label_id": created_label['id'],
"label_name": created_label['name'],
"message": "Label created successfully",
}
except Exception as e:
logger.error("Failed to create label: %s", e)
return {"error": f"Failed to create label: {e!s}"}
def gmail_delete_label(self, label_id: str) -> Dict[str, Any]:
r"""Delete a Gmail label.
Args:
label_id (str): The unique identifier of the user-created label to
delete. To get a label ID, first use list_gmail_labels() to
list all labels. Note: System labels (e.g., 'INBOX', 'SENT',
'DRAFT', 'SPAM', 'TRASH', 'UNREAD', 'STARRED', 'IMPORTANT',
'CATEGORY_PERSONAL', etc.) cannot be deleted.
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
self.gmail_service.users().labels().delete(
userId='me', id=label_id
).execute()
return {
"success": True,
"label_id": label_id,
"message": "Label deleted successfully",
}
except Exception as e:
logger.error("Failed to delete label: %s", e)
return {"error": f"Failed to delete label: {e!s}"}
def gmail_modify_thread_labels(
self,
thread_id: str,
add_labels: Optional[List[str]] = None,
remove_labels: Optional[List[str]] = None,
) -> Dict[str, Any]:
r"""Modify labels on a thread.
Args:
thread_id (str): The unique identifier of the thread to modify.
To get a thread ID, first use list_threads() to list threads,
or use the 'thread_id' returned from send_email() or
reply_to_email().
add_labels (Optional[List[str]]): List of label IDs to add to all
messages in the thread.
Label IDs can be:
- System labels: 'INBOX', 'STARRED', 'IMPORTANT',
'UNREAD', etc.
- Custom label IDs: Retrieved from list_gmail_labels().
Example: ['STARRED', 'IMPORTANT'] marks thread as
starred and important.
remove_labels (Optional[List[str]]): List of label IDs to
remove from all messages in the thread. Uses the same
format as add_labels.
Example: ['UNREAD'] marks the entire thread as read.
Returns:
Dict[str, Any]: A dictionary containing the result of the
operation.
"""
try:
body = {}
if add_labels:
body['addLabelIds'] = add_labels
if remove_labels:
body['removeLabelIds'] = remove_labels
if not body:
return {"error": "No labels to add or remove"}
modified_thread = (
self.gmail_service.users()
.threads()
.modify(userId='me', id=thread_id, body=body)
.execute()
)
return {
"success": True,
"thread_id": thread_id,
"label_ids": modified_thread.get('labelIds', []),
"message": "Thread labels modified successfully",
}
except Exception as e:
logger.error("Failed to modify thread labels: %s", e)
return {"error": f"Failed to modify thread labels: {e!s}"}
def gmail_get_profile(self) -> Dict[str, Any]:
r"""Get Gmail profile information.
Returns:
Dict[str, Any]: A dictionary containing the profile information.
"""
try:
profile = (
self.gmail_service.users().getProfile(userId='me').execute()
)
return {
"success": True,
"profile": {
"email_address": profile.get('emailAddress', ''),
"messages_total": profile.get('messagesTotal', 0),
"threads_total": profile.get('threadsTotal', 0),
"history_id": profile.get('historyId', ''),
},
}
except Exception as e:
logger.error("Failed to get profile: %s", e)
return {"error": f"Failed to get profile: {e!s}"}
def gmail_get_contacts(
self,
max_results: int = 100,
page_token: Optional[str] = None,
) -> Dict[str, Any]:
r"""List connections from Google People API.
Args:
max_results (int): Maximum number of contacts to fetch.
page_token (Optional[str]): Pagination token from a previous
response. If provided, fetches the next page of results.
Returns:
Dict[str, Any]: A dictionary containing the contacts.
"""
try:
# Build request parameters
request_params = {
'resourceName': 'people/me',
'personFields': (
'names,emailAddresses,phoneNumbers,organizations'
),
'pageSize': max_results,
}
# Search contacts
if page_token:
request_params['pageToken'] = page_token
contacts_result = (
self.people_service.people()
.connections()
.list(**request_params)
.execute()
)
connections = contacts_result.get('connections', [])
contact_list = []
for person in connections:
contact_info = {
"resource_name": person.get('resourceName', ''),
"names": person.get('names', []),
"email_addresses": person.get('emailAddresses', []),
"phone_numbers": person.get('phoneNumbers', []),
"organizations": person.get('organizations', []),
}
contact_list.append(contact_info)
return {
"success": True,
"contacts": contact_list,
"total_count": len(contact_list),
"next_page_token": contacts_result.get('nextPageToken'),
}
except Exception as e:
logger.error("Failed to get contacts: %s", e)
return {"error": f"Failed to get contacts: {e!s}"}
def gmail_search_people(
self,
query: str,
max_results: int = 10,
) -> Dict[str, Any]:
r"""Search for people in contacts.
Args:
query (str): Search query for people in contacts. Can search by:
- Name: 'John Smith' or partial names like 'John'
- Email: 'john@example.com'
- Organization: 'Google' or 'Acme Corp'
- Phone number: '+1234567890'
Examples: 'John Smith', 'john@example.com', 'Google'
max_results (int): Maximum number of results to fetch.
Returns:
Dict[str, Any]: A dictionary containing the search results.
"""
try:
# Search people
search_result = (
self.people_service.people()
.searchContacts(
query=query,
readMask='names,emailAddresses,phoneNumbers,organizations',
pageSize=max_results,
)
.execute()
)
results = search_result.get('results', [])
people_list = []
for result in results:
person = result.get('person', {})
person_info = {
"resource_name": person.get('resourceName', ''),
"names": person.get('names', []),
"email_addresses": person.get('emailAddresses', []),
"phone_numbers": person.get('phoneNumbers', []),
"organizations": person.get('organizations', []),
}
people_list.append(person_info)
return {
"success": True,
"people": people_list,
"total_count": len(people_list),
}
except Exception as e:
logger.error("Failed to search people: %s", e)
return {"error": f"Failed to search people: {e!s}"}
# Helper methods
def _get_gmail_service(self):
r"""Get Gmail service object."""
from googleapiclient.discovery import build
try:
# Build service with optional timeout
if self.timeout is not None:
import httplib2
http = httplib2.Http(timeout=self.timeout)
http = self._credentials.authorize(http)
service = build('gmail', 'v1', http=http)
else:
service = build('gmail', 'v1', credentials=self._credentials)
return service
except Exception as e:
raise ValueError(f"Failed to build Gmail service: {e}") from e
def _get_people_service(self):
r"""Get People service object."""
from googleapiclient.discovery import build
try:
# Build service with optional timeout
if self.timeout is not None:
import httplib2
http = httplib2.Http(timeout=self.timeout)
http = self._credentials.authorize(http)
service = build('people', 'v1', http=http)
else:
service = build('people', 'v1', credentials=self._credentials)
return service
except Exception as e:
raise ValueError(f"Failed to build People service: {e}") from e
def _authenticate(self):
r"""Authenticate with Google APIs using OAuth2.
Automatically saves and loads credentials from
~/.camel/gmail_token.json to avoid repeated
browser logins.
"""
import json
from pathlib import Path
from dotenv import load_dotenv
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
# Look for .env file in the project root (camel/)
env_file = Path(__file__).parent.parent.parent / '.env'
load_dotenv(env_file)
client_id = os.environ.get('GOOGLE_CLIENT_ID')
client_secret = os.environ.get('GOOGLE_CLIENT_SECRET')
if not client_id or not client_secret:
missing_vars = []
if not client_id:
missing_vars.append('GOOGLE_CLIENT_ID')
if not client_secret:
missing_vars.append('GOOGLE_CLIENT_SECRET')
raise ValueError(
f"Missing required environment variables: "
f"{', '.join(missing_vars)}. "
"Please set these in your .env file or environment variables."
)
token_file = Path.home() / '.camel' / 'gmail_token.json'
creds = None
# COMPONENT 1: Load saved credentials
if token_file.exists():
try:
with open(token_file, 'r') as f:
data = json.load(f)
creds = Credentials(
token=data.get('token'),
refresh_token=data.get('refresh_token'),
token_uri=data.get(
'token_uri', 'https://oauth2.googleapis.com/token'
),
client_id=client_id,
client_secret=client_secret,
scopes=SCOPES,
)
except Exception as e:
logger.warning(f"Failed to load saved token: {e}")
creds = None
# COMPONENT 2: Refresh if expired
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
logger.info("Access token refreshed")
# Save refreshed credentials to disk
token_file.parent.mkdir(parents=True, exist_ok=True)
try:
with open(token_file, 'w') as f:
json.dump(
{
'token': creds.token,
'refresh_token': creds.refresh_token,
'token_uri': creds.token_uri,
'scopes': creds.scopes,
},
f,
)
os.chmod(token_file, 0o600)
logger.info(f"Refreshed credentials saved to {token_file}")
except Exception as e:
logger.warning(
f"Failed to save refreshed credentials to "
f"{token_file}: {e}. "
"Token refreshed but not persisted."
)
return creds
except Exception as e:
logger.warning(f"Token refresh failed: {e}")
creds = None
# COMPONENT 3: Return if valid
if creds and creds.valid:
return creds
# COMPONENT 4: Browser OAuth (first-time or invalid credentials)
client_config = {
"installed": {
"client_id": client_id,
"client_secret": client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": ["http://localhost"],
}
}
flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
creds = flow.run_local_server(port=0)
# Save new credentials
token_file.parent.mkdir(parents=True, exist_ok=True)
try:
with open(token_file, 'w') as f:
json.dump(
{
'token': creds.token,
'refresh_token': creds.refresh_token,
'token_uri': creds.token_uri,
'scopes': creds.scopes,
},
f,
)
os.chmod(token_file, 0o600)
logger.info(f"Credentials saved to {token_file}")
except Exception as e:
logger.warning(
f"Failed to save credentials to {token_file}: {e}. "
"You may need to re-authenticate next time."
)
return creds
def _create_message(
self,
to_list: List[str],
subject: str,
body: str,
cc_list: Optional[List[str]] = None,
bcc_list: Optional[List[str]] = None,
attachments: Optional[List[str]] = None,
is_html: bool = False,
in_reply_to: Optional[str] = None,
references: Optional[List[str]] = None,
) -> Dict[str, str]:
r"""Create a message object for sending."""
import base64
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
message = MIMEMultipart()
message['to'] = ', '.join(to_list)
message['subject'] = subject
if cc_list:
message['cc'] = ', '.join(cc_list)
if bcc_list:
message['bcc'] = ', '.join(bcc_list)
# Set reply headers when provided
if in_reply_to:
message['In-Reply-To'] = in_reply_to
if references:
message['References'] = ' '.join(references)
# Add body
if is_html:
message.attach(MIMEText(body, 'html'))
else:
message.attach(MIMEText(body, 'plain'))
# Add attachments
if attachments:
for file_path in attachments:
if os.path.isfile(file_path):
with open(file_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= '
f'{os.path.basename(file_path)}',
)
message.attach(part)
# Encode message
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode(
'utf-8'
)
return {'raw': raw_message}
def _get_message_details(
self, message_id: str
) -> Optional[Dict[str, Any]]:
r"""Get detailed information about a message."""
try:
message = (
self.gmail_service.users()
.messages()
.get(userId='me', id=message_id)
.execute()
)
headers = message['payload'].get('headers', [])
# Build a name->value map in one pass (case-insensitive)
header_map = {}
for header in headers:
name = header.get('name')
if name:
header_map[name.lower()] = header.get('value', '')
return {
"message_id": message['id'],
"thread_id": message['threadId'],
"snippet": message.get('snippet', ''),
"subject": header_map.get('subject', ''),
"from": header_map.get('from', ''),
"to": header_map.get('to', ''),
"cc": header_map.get('cc', ''),
"bcc": header_map.get('bcc', ''),
"date": header_map.get('date', ''),
"body": self._extract_message_body(message),
"attachments": self._extract_attachments(
message, include_inline=True
),
"label_ids": message.get('labelIds', []),
"size_estimate": message.get('sizeEstimate', 0),
}
except Exception as e:
logger.error("Failed to get message details: %s", e)
return None
def _get_header_value(
self, headers: List[Dict[str, str]], name: str
) -> str:
r"""Get header value by name."""
for header in headers:
if header['name'].lower() == name.lower():
return header['value']
return ""
def _extract_message_body(self, message: Dict[str, Any]) -> str:
r"""Extract message body from message payload.
Recursively traverses the entire message tree and collects all text
content from text/plain and text/html parts. Special handling for
multipart/alternative containers: recursively searches for one format
(preferring plain text) to avoid duplication when both formats contain
the same content. All other text parts are collected to ensure no
information is lost.
Args:
message (Dict[str, Any]): The Gmail message dictionary containing
the payload to extract text from.
Returns:
str: The extracted message body text with multiple parts separated
by double newlines, or an empty string if no text content is
found.
"""
import base64
import re
text_parts = []
def decode_text_data(data: str, mime_type: str) -> Optional[str]:
"""Helper to decode base64 text data.
Args:
data: Base64 encoded text data.
mime_type: MIME type for logging purposes.
Returns:
Decoded text string, or None if decoding fails or text
is empty.
"""
if not data:
return None
try:
text = base64.urlsafe_b64decode(data).decode('utf-8')
return text if text.strip() else None
except Exception as e:
logger.warning(f"Failed to decode {mime_type}: {e}")
return None
def strip_html_tags(html_content: str) -> str:
"""Strip HTML tags and convert to readable plain text.
Uses regex to remove tags and clean up formatting while preserving
basic document structure.
Args:
html_content: HTML content to strip.
Returns:
Plain text version of HTML content.
"""
if not html_content or not html_content.strip():
return ""
text = html_content
# Remove script and style elements completely
text = re.sub(
r'<script[^>]*>.*?</script>',
'',
text,
flags=re.DOTALL | re.IGNORECASE,
)
text = re.sub(
r'<style[^>]*>.*?</style>',
'',
text,
flags=re.DOTALL | re.IGNORECASE,
)
# Convert common HTML entities
text = text.replace('&nbsp;', ' ')
text = text.replace('&amp;', '&')
text = text.replace('&lt;', '<')
text = text.replace('&gt;', '>')
text = text.replace('&quot;', '"')
text = text.replace('&#39;', "'")
text = text.replace('&rsquo;', "'")
text = text.replace('&lsquo;', "'")
text = text.replace('&rdquo;', '"')
text = text.replace('&ldquo;', '"')
text = text.replace('&mdash;', '')
text = text.replace('&ndash;', '-')
# Convert <br> and <br/> to newlines
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE)
# Convert block-level closing tags to newlines
text = re.sub(
r'</(p|div|h[1-6]|tr|li)>', '\n', text, flags=re.IGNORECASE
)
# Convert <hr> to separator
text = re.sub(r'<hr\s*/?>', '\n---\n', text, flags=re.IGNORECASE)
# Remove all remaining HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Clean up whitespace
text = re.sub(
r'\n\s*\n\s*\n+', '\n\n', text
) # Multiple blank lines to double newline
text = re.sub(r' +', ' ', text) # Multiple spaces to single space
text = re.sub(r'\n ', '\n', text) # Remove leading spaces on lines
text = re.sub(
r' \n', '\n', text
) # Remove trailing spaces on lines
return text.strip()
def find_text_recursive(
part: Dict[str, Any], target_mime: str
) -> Optional[str]:
"""Recursively search for text content of a specific MIME type.
Args:
part: Message part to search in.
target_mime: Target MIME type ('text/plain' or 'text/html').
Returns:
Decoded text string if found, None otherwise.
"""
mime = part.get('mimeType', '')
# Found the target type at this level
if mime == target_mime:
data = part.get('body', {}).get('data', '')
decoded = decode_text_data(data, target_mime)
# Strip HTML tags if this is HTML content
if decoded and target_mime == 'text/html':
return strip_html_tags(decoded)
return decoded
# Not found, but has nested parts? Search recursively
if 'parts' in part:
for nested_part in part['parts']:
result = find_text_recursive(nested_part, target_mime)
if result:
return result
return None
def extract_from_part(part: Dict[str, Any]):
"""Recursively collect all text from message parts."""
mime_type = part.get('mimeType', '')
# Special handling for multipart/alternative
if mime_type == 'multipart/alternative' and 'parts' in part:
# Recursively search for one format (prefer plain text)
plain_text = None
html_text = None
# Search each alternative branch recursively
for nested_part in part['parts']:
if not plain_text:
plain_text = find_text_recursive(
nested_part, 'text/plain'
)
if not html_text:
html_text = find_text_recursive(
nested_part, 'text/html'
)
# Prefer plain text, fall back to HTML
chosen_text = plain_text if plain_text else html_text
if chosen_text:
text_parts.append(chosen_text)
# If this part has nested parts (but not multipart/alternative)
elif 'parts' in part:
for nested_part in part['parts']:
extract_from_part(nested_part)
# If this is a text leaf, extract and collect it
elif mime_type == 'text/plain':
data = part.get('body', {}).get('data', '')
text = decode_text_data(data, 'plain text body')
if text:
text_parts.append(text)
elif mime_type == 'text/html':
data = part.get('body', {}).get('data', '')
html_text = decode_text_data(data, 'HTML body')
if html_text:
text = strip_html_tags(html_text)
if text:
text_parts.append(text)
# Traverse the entire tree and collect all text parts
payload = message.get('payload', {})
extract_from_part(payload)
if not text_parts:
return ""
# Return all text parts combined
return '\n\n'.join(text_parts)
def _extract_attachments(
self, message: Dict[str, Any], include_inline: bool = False
) -> List[Dict[str, Any]]:
r"""Extract attachment information from message payload.
Recursively traverses the message tree to find all attachments
and extracts their metadata. Distinguishes between regular attachments
and inline images embedded in HTML content.
Args:
message (Dict[str, Any]): The Gmail message dictionary containing
the payload to extract attachments from.
Returns:
List[Dict[str, Any]]: List of attachment dictionaries, each
containing:
- attachment_id: Gmail's unique identifier for the attachment
- filename: Name of the attached file
- mime_type: MIME type of the attachment
- size: Size of the attachment in bytes
- is_inline: Whether this is an inline image (embedded in HTML)
"""
attachments = []
def is_inline_image(part: Dict[str, Any]) -> bool:
"""Check if this part is an inline image."""
headers = part.get('headers', [])
for header in headers:
name = header.get('name', '').lower()
value = header.get('value', '').lower()
# Check for Content-Disposition: inline
if name == 'content-disposition' and 'inline' in value:
return True
# Check for Content-ID (usually indicates inline)
if name == 'content-id':
return True
return False
def find_attachments(part: Dict[str, Any]):
"""Recursively find attachments in message parts."""
# Check if this part has an attachmentId (indicates it's an
# attachment)
if 'body' in part and 'attachmentId' in part['body']:
attachment_info = {
'attachment_id': part['body']['attachmentId'],
'filename': part.get('filename', 'unnamed'),
'mime_type': part.get(
'mimeType', 'application/octet-stream'
),
'size': part['body'].get('size', 0),
'is_inline': is_inline_image(part),
}
attachments.append(attachment_info)
# Recurse into nested parts
if 'parts' in part:
for nested_part in part['parts']:
find_attachments(nested_part)
# Start traversal from the message payload
payload = message.get('payload', {})
if payload:
find_attachments(payload)
# Return based on include_inline toggle
if include_inline:
return attachments
return [att for att in attachments if not att['is_inline']]
def _is_valid_email(self, email: str) -> bool:
r"""Validate email address format.
Supports both formats:
- Plain email: john@example.com
- Named email: John Doe <john@example.com>
"""
# Extract email from "Name <email>" format if present
match = re.search(r'<([^>]+)>$', email.strip())
email_to_check = match.group(1) if match else email.strip()
# Validate the email address
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email_to_check) is not None
def get_tools(self) -> List[FunctionTool]:
r"""Returns a list of FunctionTool objects representing the
functions in the toolkit.
Returns:
List[FunctionTool]: A list of FunctionTool objects
representing the functions in the toolkit.
"""
return [
FunctionTool(self.gmail_send_email),
FunctionTool(self.gmail_reply_to_email),
FunctionTool(self.gmail_forward_email),
FunctionTool(self.gmail_create_draft),
FunctionTool(self.gmail_send_draft),
FunctionTool(self.gmail_fetch_emails),
FunctionTool(self.gmail_fetch_thread_by_id),
FunctionTool(self.gmail_modify_email_labels),
FunctionTool(self.gmail_move_to_trash),
FunctionTool(self.gmail_get_attachment),
FunctionTool(self.gmail_list_threads),
FunctionTool(self.gmail_list_drafts),
FunctionTool(self.gmail_list_labels),
FunctionTool(self.gmail_create_label),
FunctionTool(self.gmail_delete_label),
FunctionTool(self.gmail_modify_thread_labels),
FunctionTool(self.gmail_get_profile),
FunctionTool(self.gmail_get_contacts),
FunctionTool(self.gmail_search_people),
]