mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-24 22:04:09 +00:00
1839 lines
68 KiB
Python
1839 lines
68 KiB
Python
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
|
|
import os
|
|
import re
|
|
from typing import Any, Dict, List, Literal, Optional, Union
|
|
|
|
from camel.logger import get_logger
|
|
from camel.toolkits import FunctionTool
|
|
from camel.toolkits.base import BaseToolkit
|
|
from camel.utils import MCPServer
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
SCOPES = [
|
|
'https://www.googleapis.com/auth/gmail.readonly',
|
|
'https://www.googleapis.com/auth/gmail.send',
|
|
'https://www.googleapis.com/auth/gmail.modify',
|
|
'https://www.googleapis.com/auth/gmail.compose',
|
|
'https://www.googleapis.com/auth/gmail.labels',
|
|
'https://www.googleapis.com/auth/contacts.readonly',
|
|
'https://www.googleapis.com/auth/userinfo.profile',
|
|
]
|
|
|
|
|
|
@MCPServer()
|
|
class GmailToolkit(BaseToolkit):
|
|
r"""A comprehensive toolkit for Gmail operations.
|
|
|
|
This class provides methods for Gmail operations including sending emails,
|
|
managing drafts, fetching messages, managing labels, and handling contacts.
|
|
API keys can be accessed in google cloud console (https://console.cloud.google.com/)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
timeout: Optional[float] = None,
|
|
):
|
|
r"""Initializes a new instance of the GmailToolkit class.
|
|
|
|
Args:
|
|
timeout (Optional[float]): The timeout value for API requests
|
|
in seconds. If None, no timeout is applied.
|
|
(default: :obj:`None`)
|
|
"""
|
|
super().__init__(timeout=timeout)
|
|
|
|
self._credentials = self._authenticate()
|
|
|
|
self.gmail_service: Any = self._get_gmail_service()
|
|
self._people_service: Any = None
|
|
|
|
@property
|
|
def people_service(self) -> Any:
|
|
r"""Lazily initialize and return the Google People service."""
|
|
if self._people_service is None:
|
|
self._people_service = self._get_people_service()
|
|
return self._people_service
|
|
|
|
@people_service.setter
|
|
def people_service(self, service: Any) -> None:
|
|
r"""Allow overriding/injecting the People service (e.g., in tests)."""
|
|
self._people_service = service
|
|
|
|
def gmail_send_email(
|
|
self,
|
|
to: Union[str, List[str]],
|
|
subject: str,
|
|
body: str,
|
|
cc: Optional[Union[str, List[str]]] = None,
|
|
bcc: Optional[Union[str, List[str]]] = None,
|
|
attachments: Optional[List[str]] = None,
|
|
is_html: bool = False,
|
|
) -> Dict[str, Any]:
|
|
r"""Send an email through Gmail.
|
|
|
|
Args:
|
|
to (Union[str, List[str]]): Recipient email address(es).
|
|
subject (str): Email subject.
|
|
body (str): Email body content.
|
|
cc (Optional[Union[str, List[str]]]): CC recipient email
|
|
address(es).
|
|
bcc (Optional[Union[str, List[str]]]): BCC recipient email
|
|
address(es).
|
|
attachments (Optional[List[str]]): List of file paths to attach.
|
|
is_html (bool): Whether the body is HTML format. Set to True when
|
|
sending formatted emails with HTML tags (e.g., bold,
|
|
links, images). Use False (default) for plain text emails.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
# Normalize recipients to lists
|
|
to_list = [to] if isinstance(to, str) else to
|
|
cc_list = [cc] if isinstance(cc, str) else (cc or [])
|
|
bcc_list = [bcc] if isinstance(bcc, str) else (bcc or [])
|
|
|
|
# Validate email addresses
|
|
all_recipients = to_list + cc_list + bcc_list
|
|
for email in all_recipients:
|
|
if not self._is_valid_email(email):
|
|
return {"error": f"Invalid email address: {email}"}
|
|
|
|
# Create message
|
|
message = self._create_message(
|
|
to_list, subject, body, cc_list, bcc_list, attachments, is_html
|
|
)
|
|
|
|
# Send message
|
|
sent_message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.send(userId='me', body=message)
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message_id": sent_message.get('id'),
|
|
"thread_id": sent_message.get('threadId'),
|
|
"message": "Email sent successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to send email: %s", e)
|
|
return {"error": f"Failed to send email: {e!s}"}
|
|
|
|
def gmail_reply_to_email(
|
|
self,
|
|
message_id: str,
|
|
reply_body: str,
|
|
reply_all: bool = False,
|
|
is_html: bool = False,
|
|
) -> Dict[str, Any]:
|
|
r"""Reply to an email message.
|
|
|
|
Args:
|
|
message_id (str): The unique identifier of the message to reply to.
|
|
To get a message ID, first use fetch_emails() to list messages,
|
|
or use the 'message_id' returned from send_email() or
|
|
create_email_draft().
|
|
reply_body (str): The reply message body.
|
|
reply_all (bool): Whether to reply to all recipients.
|
|
is_html (bool): Whether the body is HTML format. Set to True when
|
|
sending formatted emails with HTML tags (e.g., bold,
|
|
links, images). Use False (default) for plain text emails.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
# Get the original message
|
|
original_message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.get(userId='me', id=message_id)
|
|
.execute()
|
|
)
|
|
|
|
# Extract headers (single pass, case-insensitive)
|
|
headers = original_message['payload'].get('headers', [])
|
|
subject = from_email = to_emails = cc_emails = None
|
|
missing = {'subject', 'from', 'to', 'cc'}
|
|
|
|
for header in headers:
|
|
name = (header.get('name') or '').lower()
|
|
if name not in missing:
|
|
continue
|
|
value = header.get('value')
|
|
if name == 'subject':
|
|
subject = value
|
|
elif name == 'from':
|
|
from_email = value
|
|
elif name == 'to':
|
|
to_emails = value
|
|
elif name == 'cc':
|
|
cc_emails = value
|
|
missing.discard(name)
|
|
if not missing:
|
|
break
|
|
|
|
# Extract identifiers for reply context
|
|
message_id_header = self._get_header_value(
|
|
headers, 'Message-Id'
|
|
) or self._get_header_value(headers, 'Message-ID')
|
|
thread_id = original_message.get('threadId')
|
|
|
|
# Prepare reply subject
|
|
if subject and not subject.startswith('Re: '):
|
|
subject = f"Re: {subject}"
|
|
elif not subject:
|
|
subject = "Re: (No Subject)"
|
|
|
|
# Validate from_email
|
|
if not from_email:
|
|
return {"error": "Original message has no sender address"}
|
|
|
|
# Prepare recipients
|
|
if reply_all:
|
|
recipients = [from_email]
|
|
if to_emails:
|
|
recipients.extend(
|
|
[email.strip() for email in to_emails.split(',')]
|
|
)
|
|
if cc_emails:
|
|
recipients.extend(
|
|
[email.strip() for email in cc_emails.split(',')]
|
|
)
|
|
# Remove duplicates and None values
|
|
recipients = [r for r in list(set(recipients)) if r]
|
|
|
|
# Get current user's email and remove it from recipients
|
|
try:
|
|
profile_result = self.gmail_get_profile()
|
|
if profile_result.get('success'):
|
|
current_user_email = profile_result['profile'][
|
|
'email_address'
|
|
]
|
|
# Remove current user from recipients (handle both
|
|
# plain email and "Name <email>" format)
|
|
filtered_recipients = []
|
|
for email in recipients:
|
|
# Extract email from "Name <email>" format
|
|
match = re.search(r'<([^>]+)>$', email.strip())
|
|
email_addr = (
|
|
match.group(1) if match else email.strip()
|
|
)
|
|
if email_addr != current_user_email:
|
|
filtered_recipients.append(email)
|
|
recipients = filtered_recipients
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Could not get current user email to filter from "
|
|
"recipients: %s",
|
|
e,
|
|
)
|
|
else:
|
|
recipients = [from_email]
|
|
|
|
# Create reply message with reply headers
|
|
message = self._create_message(
|
|
recipients,
|
|
subject,
|
|
reply_body,
|
|
is_html=is_html,
|
|
in_reply_to=message_id_header or original_message.get('id'),
|
|
references=[message_id_header] if message_id_header else None,
|
|
)
|
|
|
|
# Send reply in the same thread
|
|
sent_message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.send(userId='me', body={**message, 'threadId': thread_id})
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message_id": sent_message.get('id'),
|
|
"thread_id": sent_message.get('threadId'),
|
|
"message": "Reply sent successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to reply to email: %s", e)
|
|
return {"error": f"Failed to reply to email: {e!s}"}
|
|
|
|
def gmail_forward_email(
|
|
self,
|
|
message_id: str,
|
|
to: Union[str, List[str]],
|
|
forward_body: Optional[str] = None,
|
|
cc: Optional[Union[str, List[str]]] = None,
|
|
bcc: Optional[Union[str, List[str]]] = None,
|
|
include_attachments: bool = True,
|
|
) -> Dict[str, Any]:
|
|
r"""Forward an email message.
|
|
|
|
Args:
|
|
message_id (str): The unique identifier of the message to forward.
|
|
To get a message ID, first use fetch_emails() to list messages,
|
|
or use the 'message_id' returned from send_email() or
|
|
create_email_draft().
|
|
to (Union[str, List[str]]): Recipient email address(es).
|
|
forward_body (Optional[str]): Additional message to include at
|
|
the top of the forwarded email, before the original message
|
|
content. If not provided, only the original message will be
|
|
forwarded.
|
|
cc (Optional[Union[str, List[str]]]): CC recipient email
|
|
address(es).
|
|
bcc (Optional[Union[str, List[str]]]): BCC recipient email
|
|
address(es).
|
|
include_attachments (bool): Whether to include original
|
|
attachments. Defaults to True. Only includes real
|
|
attachments, not inline images.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation, including the number of attachments forwarded.
|
|
"""
|
|
try:
|
|
import tempfile
|
|
|
|
# Get the original message
|
|
original_message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.get(userId='me', id=message_id)
|
|
.execute()
|
|
)
|
|
|
|
# Extract headers (single pass, case-insensitive)
|
|
headers = original_message['payload'].get('headers', [])
|
|
subject = from_email = date = None
|
|
missing = {'subject', 'from', 'date'}
|
|
|
|
for header in headers:
|
|
name = (header.get('name') or '').lower()
|
|
if name not in missing:
|
|
continue
|
|
value = header.get('value')
|
|
if name == 'subject':
|
|
subject = value
|
|
elif name == 'from':
|
|
from_email = value
|
|
elif name == 'date':
|
|
date = value
|
|
missing.discard(name)
|
|
if not missing:
|
|
break
|
|
|
|
# Prepare forward subject
|
|
if subject and not subject.startswith('Fwd: '):
|
|
subject = f"Fwd: {subject}"
|
|
elif not subject:
|
|
subject = "Fwd: (No Subject)"
|
|
|
|
# Prepare forward body
|
|
if forward_body:
|
|
body = f"{forward_body}\n\n--- Forwarded message ---\n"
|
|
else:
|
|
body = "--- Forwarded message ---\n"
|
|
|
|
body += f"From: {from_email}\n"
|
|
body += f"Date: {date}\n"
|
|
body += f"Subject: {subject.replace('Fwd: ', '')}\n\n"
|
|
|
|
# Add original message body
|
|
body += self._extract_message_body(original_message)
|
|
|
|
# Normalize recipients
|
|
to_list = [to] if isinstance(to, str) else to
|
|
cc_list = [cc] if isinstance(cc, str) else (cc or [])
|
|
bcc_list = [bcc] if isinstance(bcc, str) else (bcc or [])
|
|
|
|
# Handle attachments
|
|
attachment_paths = []
|
|
temp_files: List[str] = []
|
|
|
|
try:
|
|
if include_attachments:
|
|
# Extract attachment metadata
|
|
attachments = self._extract_attachments(original_message)
|
|
for att in attachments:
|
|
try:
|
|
# Create temp file
|
|
temp_file = tempfile.NamedTemporaryFile(
|
|
delete=False, suffix=f"_{att['filename']}"
|
|
)
|
|
temp_files.append(temp_file.name)
|
|
|
|
# Download attachment
|
|
result = self.gmail_get_attachment(
|
|
message_id=message_id,
|
|
attachment_id=att['attachment_id'],
|
|
save_path=temp_file.name,
|
|
)
|
|
|
|
if result.get('success'):
|
|
attachment_paths.append(temp_file.name)
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to download attachment "
|
|
f"{att['filename']}: {e}"
|
|
)
|
|
|
|
# Create forward message (now with attachments!)
|
|
message = self._create_message(
|
|
to_list,
|
|
subject,
|
|
body,
|
|
cc_list,
|
|
bcc_list,
|
|
attachments=attachment_paths if attachment_paths else None,
|
|
)
|
|
|
|
# Send forward
|
|
sent_message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.send(userId='me', body=message)
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message_id": sent_message.get('id'),
|
|
"thread_id": sent_message.get('threadId'),
|
|
"message": "Email forwarded successfully",
|
|
"attachments_forwarded": len(attachment_paths),
|
|
}
|
|
|
|
finally:
|
|
# Clean up temp files
|
|
for temp_file_path in temp_files:
|
|
try:
|
|
os.unlink(temp_file_path)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to delete temp file {temp_file_path}: {e}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to forward email: %s", e)
|
|
return {"error": f"Failed to forward email: {e!s}"}
|
|
|
|
def gmail_create_draft(
|
|
self,
|
|
to: Union[str, List[str]],
|
|
subject: str,
|
|
body: str,
|
|
cc: Optional[Union[str, List[str]]] = None,
|
|
bcc: Optional[Union[str, List[str]]] = None,
|
|
attachments: Optional[List[str]] = None,
|
|
is_html: bool = False,
|
|
) -> Dict[str, Any]:
|
|
r"""Create an email draft.
|
|
|
|
Args:
|
|
to (Union[str, List[str]]): Recipient email address(es).
|
|
subject (str): Email subject.
|
|
body (str): Email body content.
|
|
cc (Optional[Union[str, List[str]]]): CC recipient email
|
|
address(es).
|
|
bcc (Optional[Union[str, List[str]]]): BCC recipient email
|
|
address(es).
|
|
attachments (Optional[List[str]]): List of file paths to attach.
|
|
is_html (bool): Whether the body is HTML format. Set to True when
|
|
sending formatted emails with HTML tags (e.g., bold,
|
|
links, images). Use False (default) for plain text emails.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
# Normalize recipients to lists
|
|
to_list = [to] if isinstance(to, str) else to
|
|
cc_list = [cc] if isinstance(cc, str) else (cc or [])
|
|
bcc_list = [bcc] if isinstance(bcc, str) else (bcc or [])
|
|
|
|
# Validate email addresses
|
|
all_recipients = to_list + cc_list + bcc_list
|
|
for email in all_recipients:
|
|
if not self._is_valid_email(email):
|
|
return {"error": f"Invalid email address: {email}"}
|
|
|
|
# Create message
|
|
message = self._create_message(
|
|
to_list, subject, body, cc_list, bcc_list, attachments, is_html
|
|
)
|
|
|
|
# Create draft
|
|
draft = (
|
|
self.gmail_service.users()
|
|
.drafts()
|
|
.create(userId='me', body={'message': message})
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"draft_id": draft.get('id'),
|
|
"message_id": draft.get('message', {}).get('id'),
|
|
"message": "Draft created successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to create draft: %s", e)
|
|
return {"error": f"Failed to create draft: {e!s}"}
|
|
|
|
def gmail_send_draft(self, draft_id: str) -> Dict[str, Any]:
|
|
r"""Send a draft email.
|
|
|
|
Args:
|
|
draft_id (str): The unique identifier of the draft to send.
|
|
To get a draft ID, first use list_drafts() to list drafts,
|
|
or use the 'draft_id' returned from create_email_draft().
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
# Send draft
|
|
sent_message = (
|
|
self.gmail_service.users()
|
|
.drafts()
|
|
.send(userId='me', body={'id': draft_id})
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message_id": sent_message.get('id'),
|
|
"thread_id": sent_message.get('threadId'),
|
|
"message": "Draft sent successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to send draft: %s", e)
|
|
return {"error": f"Failed to send draft: {e!s}"}
|
|
|
|
def gmail_fetch_emails(
|
|
self,
|
|
query: str = "",
|
|
max_results: int = 10,
|
|
include_spam_trash: bool = False,
|
|
label_ids: Optional[List[str]] = None,
|
|
page_token: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""Fetch emails with filters and pagination.
|
|
|
|
Args:
|
|
query (str): Gmail search query string. Use Gmail's search syntax:
|
|
- 'from:example@domain.com' - emails from specific sender
|
|
- 'subject:meeting' - emails with specific subject text
|
|
- 'has:attachment' - emails with attachments
|
|
- 'is:unread' - unread emails
|
|
- 'in:sent' - emails in sent folder
|
|
- 'after:2024/01/01 before:2024/12/31' - date range
|
|
Examples: 'from:john@example.com subject:project', 'is:unread
|
|
has:attachment'
|
|
max_results (int): Maximum number of emails to fetch.
|
|
include_spam_trash (bool): Whether to include spam and trash.
|
|
label_ids (Optional[List[str]]): List of label IDs to filter
|
|
emails by. Only emails with ALL of the specified
|
|
labels will be returned.
|
|
Label IDs can be:
|
|
- System labels: 'INBOX', 'SENT', 'DRAFT', 'SPAM', 'TRASH',
|
|
'UNREAD', 'STARRED', 'IMPORTANT', 'CATEGORY_PERSONAL', etc.
|
|
- Custom label IDs: Retrieved from list_gmail_labels() method.
|
|
page_token (Optional[str]): Pagination token from a previous
|
|
response. If provided, fetches the next page of results.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the fetched emails.
|
|
"""
|
|
try:
|
|
# Build request parameters
|
|
request_params = {
|
|
'userId': 'me',
|
|
'maxResults': max_results,
|
|
'includeSpamTrash': include_spam_trash,
|
|
}
|
|
|
|
if query:
|
|
request_params['q'] = query
|
|
if label_ids:
|
|
request_params['labelIds'] = label_ids
|
|
|
|
# List messages
|
|
if page_token:
|
|
request_params['pageToken'] = page_token
|
|
|
|
messages_result = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.list(**request_params)
|
|
.execute()
|
|
)
|
|
|
|
messages = messages_result.get('messages', [])
|
|
emails = []
|
|
|
|
# Fetch detailed information for each message
|
|
for msg in messages:
|
|
email_detail = self._get_message_details(msg['id'])
|
|
if email_detail:
|
|
emails.append(email_detail)
|
|
|
|
return {
|
|
"success": True,
|
|
"emails": emails,
|
|
"total_count": len(emails),
|
|
"next_page_token": messages_result.get('nextPageToken'),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to fetch emails: %s", e)
|
|
return {"error": f"Failed to fetch emails: {e!s}"}
|
|
|
|
def gmail_fetch_thread_by_id(self, thread_id: str) -> Dict[str, Any]:
|
|
r"""Fetch a thread by ID.
|
|
|
|
Args:
|
|
thread_id (str): The unique identifier of the thread to fetch.
|
|
To get a thread ID, first use list_threads() to list threads,
|
|
or use the 'thread_id' returned from send_email() or
|
|
reply_to_email().
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the thread details.
|
|
"""
|
|
try:
|
|
thread = (
|
|
self.gmail_service.users()
|
|
.threads()
|
|
.get(userId='me', id=thread_id)
|
|
.execute()
|
|
)
|
|
|
|
messages = []
|
|
for message in thread.get('messages', []):
|
|
message_detail = self._get_message_details(message['id'])
|
|
if message_detail:
|
|
messages.append(message_detail)
|
|
|
|
return {
|
|
"success": True,
|
|
"thread_id": thread_id,
|
|
"messages": messages,
|
|
"message_count": len(messages),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to fetch thread: %s", e)
|
|
return {"error": f"Failed to fetch thread: {e!s}"}
|
|
|
|
def gmail_modify_email_labels(
|
|
self,
|
|
message_id: str,
|
|
add_labels: Optional[List[str]] = None,
|
|
remove_labels: Optional[List[str]] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""Modify labels on an email message.
|
|
|
|
Args:
|
|
message_id (str): The unique identifier of the message to modify.
|
|
To get a message ID, first use fetch_emails() to list messages,
|
|
or use the 'message_id' returned from send_email() or
|
|
create_email_draft().
|
|
add_labels (Optional[List[str]]): List of label IDs to add to
|
|
the message.
|
|
Label IDs can be:
|
|
- System labels: 'INBOX', 'STARRED', 'IMPORTANT',
|
|
'UNREAD', etc.
|
|
- Custom label IDs: Retrieved from list_gmail_labels() method.
|
|
Example: ['STARRED', 'IMPORTANT'] marks email as starred
|
|
and important.
|
|
remove_labels (Optional[List[str]]): List of label IDs to
|
|
remove from the message. Uses the same format as add_labels.
|
|
Example: ['UNREAD'] marks the email as read.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
body = {}
|
|
if add_labels:
|
|
body['addLabelIds'] = add_labels
|
|
if remove_labels:
|
|
body['removeLabelIds'] = remove_labels
|
|
|
|
if not body:
|
|
return {"error": "No labels to add or remove"}
|
|
|
|
modified_message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.modify(userId='me', id=message_id, body=body)
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message_id": message_id,
|
|
"label_ids": modified_message.get('labelIds', []),
|
|
"message": "Labels modified successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to modify labels: %s", e)
|
|
return {"error": f"Failed to modify labels: {e!s}"}
|
|
|
|
def gmail_move_to_trash(self, message_id: str) -> Dict[str, Any]:
|
|
r"""Move a message to trash.
|
|
|
|
Args:
|
|
message_id (str): The unique identifier of the message to move to
|
|
trash. To get a message ID, first use fetch_emails() to list
|
|
messages, or use the 'message_id' returned from send_email()
|
|
or create_email_draft().
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
trashed_message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.trash(userId='me', id=message_id)
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message_id": message_id,
|
|
"label_ids": trashed_message.get('labelIds', []),
|
|
"message": "Message moved to trash successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to move message to trash: %s", e)
|
|
return {"error": f"Failed to move message to trash: {e!s}"}
|
|
|
|
def gmail_get_attachment(
|
|
self,
|
|
message_id: str,
|
|
attachment_id: str,
|
|
save_path: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""Get an attachment from a message.
|
|
|
|
Args:
|
|
message_id (str): The unique identifier of the message containing
|
|
the attachment. To get a message ID, first use fetch_emails()
|
|
to list messages, or use the 'message_id' returned from
|
|
send_email() or create_email_draft().
|
|
attachment_id (str): The unique identifier of the attachment to
|
|
download. To get an attachment ID, first use fetch_emails() to
|
|
get message details, then look for 'attachment_id' in the
|
|
'attachments' list of each message.
|
|
save_path (Optional[str]): Local file path where the attachment
|
|
should be saved. If provided, the attachment will be saved to
|
|
this location and the response will include a success message.
|
|
If not provided, the attachment data will be returned as
|
|
base64-encoded content in the response.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the attachment data or
|
|
save result.
|
|
"""
|
|
try:
|
|
import base64
|
|
|
|
attachment = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.attachments()
|
|
.get(userId='me', messageId=message_id, id=attachment_id)
|
|
.execute()
|
|
)
|
|
|
|
# Decode the attachment data
|
|
file_data = base64.urlsafe_b64decode(attachment['data'])
|
|
|
|
if save_path:
|
|
with open(save_path, 'wb') as f:
|
|
f.write(file_data)
|
|
return {
|
|
"success": True,
|
|
"message": f"Attachment saved to {save_path}",
|
|
"file_size": len(file_data),
|
|
}
|
|
else:
|
|
return {
|
|
"success": True,
|
|
"data": base64.b64encode(file_data).decode('utf-8'),
|
|
"file_size": len(file_data),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get attachment: %s", e)
|
|
return {"error": f"Failed to get attachment: {e!s}"}
|
|
|
|
def gmail_list_threads(
|
|
self,
|
|
query: str = "",
|
|
max_results: int = 10,
|
|
include_spam_trash: bool = False,
|
|
label_ids: Optional[List[str]] = None,
|
|
page_token: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""List email threads.
|
|
|
|
Args:
|
|
query (str): Gmail search query string. Use Gmail's search syntax:
|
|
- 'from:example@domain.com' - threads from specific sender
|
|
- 'subject:meeting' - threads with specific subject text
|
|
- 'has:attachment' - threads with attachments
|
|
- 'is:unread' - unread threads
|
|
- 'in:sent' - threads in sent folder
|
|
- 'after:2024/01/01 before:2024/12/31' - date range
|
|
Examples: 'from:john@example.com subject:project', 'is:unread
|
|
has:attachment'
|
|
max_results (int): Maximum number of threads to fetch.
|
|
include_spam_trash (bool): Whether to include spam and trash.
|
|
label_ids (Optional[List[str]]): List of label IDs to filter
|
|
threads by. Only threads with ALL of the specified labels
|
|
will be returned.
|
|
Label IDs can be:
|
|
- System labels: 'INBOX', 'SENT', 'DRAFT', 'SPAM', 'TRASH',
|
|
'UNREAD', 'STARRED', 'IMPORTANT', 'CATEGORY_PERSONAL', etc.
|
|
- Custom label IDs: Retrieved from list_gmail_labels() method.
|
|
page_token (Optional[str]): Pagination token from a previous
|
|
response. If provided, fetches the next page of results.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the thread list.
|
|
"""
|
|
try:
|
|
# Build request parameters
|
|
request_params = {
|
|
'userId': 'me',
|
|
'maxResults': max_results,
|
|
'includeSpamTrash': include_spam_trash,
|
|
}
|
|
|
|
if query:
|
|
request_params['q'] = query
|
|
if label_ids:
|
|
request_params['labelIds'] = label_ids
|
|
|
|
# List threads
|
|
if page_token:
|
|
request_params['pageToken'] = page_token
|
|
|
|
threads_result = (
|
|
self.gmail_service.users()
|
|
.threads()
|
|
.list(**request_params)
|
|
.execute()
|
|
)
|
|
|
|
threads = threads_result.get('threads', [])
|
|
thread_list = []
|
|
|
|
for thread in threads:
|
|
thread_list.append(
|
|
{
|
|
"thread_id": thread['id'],
|
|
"snippet": thread.get('snippet', ''),
|
|
"history_id": thread.get('historyId', ''),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"threads": thread_list,
|
|
"total_count": len(thread_list),
|
|
"next_page_token": threads_result.get('nextPageToken'),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to list threads: %s", e)
|
|
return {"error": f"Failed to list threads: {e!s}"}
|
|
|
|
def gmail_list_drafts(
|
|
self, max_results: int = 10, page_token: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
r"""List email drafts.
|
|
|
|
Args:
|
|
max_results (int): Maximum number of drafts to fetch.
|
|
page_token (Optional[str]): Pagination token from a previous
|
|
response. If provided, fetches the next page of results.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the draft list.
|
|
"""
|
|
try:
|
|
drafts_result = (
|
|
self.gmail_service.users()
|
|
.drafts()
|
|
.list(
|
|
userId='me',
|
|
maxResults=max_results,
|
|
**({"pageToken": page_token} if page_token else {}),
|
|
)
|
|
.execute()
|
|
)
|
|
|
|
drafts = drafts_result.get('drafts', [])
|
|
draft_list = []
|
|
|
|
for draft in drafts:
|
|
draft_info = {
|
|
"draft_id": draft['id'],
|
|
"message_id": draft.get('message', {}).get('id', ''),
|
|
"thread_id": draft.get('message', {}).get('threadId', ''),
|
|
"snippet": draft.get('message', {}).get('snippet', ''),
|
|
}
|
|
draft_list.append(draft_info)
|
|
|
|
return {
|
|
"success": True,
|
|
"drafts": draft_list,
|
|
"total_count": len(draft_list),
|
|
"next_page_token": drafts_result.get('nextPageToken'),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to list drafts: %s", e)
|
|
return {"error": f"Failed to list drafts: {e!s}"}
|
|
|
|
def gmail_list_labels(self) -> Dict[str, Any]:
|
|
r"""List all Gmail labels.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the label list.
|
|
"""
|
|
try:
|
|
labels_result = (
|
|
self.gmail_service.users().labels().list(userId='me').execute()
|
|
)
|
|
|
|
labels = labels_result.get('labels', [])
|
|
label_list = []
|
|
|
|
for label in labels:
|
|
label_info = {
|
|
"id": label['id'],
|
|
"name": label['name'],
|
|
"type": label.get('type', 'user'),
|
|
"messages_total": label.get('messagesTotal', 0),
|
|
"messages_unread": label.get('messagesUnread', 0),
|
|
"threads_total": label.get('threadsTotal', 0),
|
|
"threads_unread": label.get('threadsUnread', 0),
|
|
}
|
|
label_list.append(label_info)
|
|
|
|
return {
|
|
"success": True,
|
|
"labels": label_list,
|
|
"total_count": len(label_list),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to list labels: %s", e)
|
|
return {"error": f"Failed to list labels: {e!s}"}
|
|
|
|
def gmail_create_label(
|
|
self,
|
|
name: str,
|
|
label_list_visibility: Literal["labelShow", "labelHide"] = "labelShow",
|
|
message_list_visibility: Literal["show", "hide"] = "show",
|
|
) -> Dict[str, Any]:
|
|
r"""Create a new Gmail label.
|
|
|
|
Args:
|
|
name (str): The name of the label to create.
|
|
label_list_visibility (str): How the label appears in Gmail's
|
|
label list. - 'labelShow': Label is visible in the label list
|
|
sidebar (default) - 'labelHide': Label is hidden from the
|
|
label list sidebar
|
|
message_list_visibility (str): How the label appears in message
|
|
lists. - 'show': Label is visible on messages in inbox/lists
|
|
(default) - 'hide': Label is hidden from message displays
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
label_object = {
|
|
'name': name,
|
|
'labelListVisibility': label_list_visibility,
|
|
'messageListVisibility': message_list_visibility,
|
|
}
|
|
|
|
created_label = (
|
|
self.gmail_service.users()
|
|
.labels()
|
|
.create(userId='me', body=label_object)
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"label_id": created_label['id'],
|
|
"label_name": created_label['name'],
|
|
"message": "Label created successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to create label: %s", e)
|
|
return {"error": f"Failed to create label: {e!s}"}
|
|
|
|
def gmail_delete_label(self, label_id: str) -> Dict[str, Any]:
|
|
r"""Delete a Gmail label.
|
|
|
|
Args:
|
|
label_id (str): The unique identifier of the user-created label to
|
|
delete. To get a label ID, first use list_gmail_labels() to
|
|
list all labels. Note: System labels (e.g., 'INBOX', 'SENT',
|
|
'DRAFT', 'SPAM', 'TRASH', 'UNREAD', 'STARRED', 'IMPORTANT',
|
|
'CATEGORY_PERSONAL', etc.) cannot be deleted.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
self.gmail_service.users().labels().delete(
|
|
userId='me', id=label_id
|
|
).execute()
|
|
|
|
return {
|
|
"success": True,
|
|
"label_id": label_id,
|
|
"message": "Label deleted successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to delete label: %s", e)
|
|
return {"error": f"Failed to delete label: {e!s}"}
|
|
|
|
def gmail_modify_thread_labels(
|
|
self,
|
|
thread_id: str,
|
|
add_labels: Optional[List[str]] = None,
|
|
remove_labels: Optional[List[str]] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""Modify labels on a thread.
|
|
|
|
Args:
|
|
thread_id (str): The unique identifier of the thread to modify.
|
|
To get a thread ID, first use list_threads() to list threads,
|
|
or use the 'thread_id' returned from send_email() or
|
|
reply_to_email().
|
|
add_labels (Optional[List[str]]): List of label IDs to add to all
|
|
messages in the thread.
|
|
Label IDs can be:
|
|
- System labels: 'INBOX', 'STARRED', 'IMPORTANT',
|
|
'UNREAD', etc.
|
|
- Custom label IDs: Retrieved from list_gmail_labels().
|
|
Example: ['STARRED', 'IMPORTANT'] marks thread as
|
|
starred and important.
|
|
remove_labels (Optional[List[str]]): List of label IDs to
|
|
remove from all messages in the thread. Uses the same
|
|
format as add_labels.
|
|
Example: ['UNREAD'] marks the entire thread as read.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the result of the
|
|
operation.
|
|
"""
|
|
try:
|
|
body = {}
|
|
if add_labels:
|
|
body['addLabelIds'] = add_labels
|
|
if remove_labels:
|
|
body['removeLabelIds'] = remove_labels
|
|
|
|
if not body:
|
|
return {"error": "No labels to add or remove"}
|
|
|
|
modified_thread = (
|
|
self.gmail_service.users()
|
|
.threads()
|
|
.modify(userId='me', id=thread_id, body=body)
|
|
.execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"thread_id": thread_id,
|
|
"label_ids": modified_thread.get('labelIds', []),
|
|
"message": "Thread labels modified successfully",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to modify thread labels: %s", e)
|
|
return {"error": f"Failed to modify thread labels: {e!s}"}
|
|
|
|
def gmail_get_profile(self) -> Dict[str, Any]:
|
|
r"""Get Gmail profile information.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the profile information.
|
|
"""
|
|
try:
|
|
profile = (
|
|
self.gmail_service.users().getProfile(userId='me').execute()
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"profile": {
|
|
"email_address": profile.get('emailAddress', ''),
|
|
"messages_total": profile.get('messagesTotal', 0),
|
|
"threads_total": profile.get('threadsTotal', 0),
|
|
"history_id": profile.get('historyId', ''),
|
|
},
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get profile: %s", e)
|
|
return {"error": f"Failed to get profile: {e!s}"}
|
|
|
|
def gmail_get_contacts(
|
|
self,
|
|
max_results: int = 100,
|
|
page_token: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""List connections from Google People API.
|
|
|
|
Args:
|
|
max_results (int): Maximum number of contacts to fetch.
|
|
page_token (Optional[str]): Pagination token from a previous
|
|
response. If provided, fetches the next page of results.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the contacts.
|
|
"""
|
|
try:
|
|
# Build request parameters
|
|
request_params = {
|
|
'resourceName': 'people/me',
|
|
'personFields': (
|
|
'names,emailAddresses,phoneNumbers,organizations'
|
|
),
|
|
'pageSize': max_results,
|
|
}
|
|
|
|
# Search contacts
|
|
if page_token:
|
|
request_params['pageToken'] = page_token
|
|
|
|
contacts_result = (
|
|
self.people_service.people()
|
|
.connections()
|
|
.list(**request_params)
|
|
.execute()
|
|
)
|
|
|
|
connections = contacts_result.get('connections', [])
|
|
contact_list = []
|
|
|
|
for person in connections:
|
|
contact_info = {
|
|
"resource_name": person.get('resourceName', ''),
|
|
"names": person.get('names', []),
|
|
"email_addresses": person.get('emailAddresses', []),
|
|
"phone_numbers": person.get('phoneNumbers', []),
|
|
"organizations": person.get('organizations', []),
|
|
}
|
|
contact_list.append(contact_info)
|
|
|
|
return {
|
|
"success": True,
|
|
"contacts": contact_list,
|
|
"total_count": len(contact_list),
|
|
"next_page_token": contacts_result.get('nextPageToken'),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get contacts: %s", e)
|
|
return {"error": f"Failed to get contacts: {e!s}"}
|
|
|
|
def gmail_search_people(
|
|
self,
|
|
query: str,
|
|
max_results: int = 10,
|
|
) -> Dict[str, Any]:
|
|
r"""Search for people in contacts.
|
|
|
|
Args:
|
|
query (str): Search query for people in contacts. Can search by:
|
|
- Name: 'John Smith' or partial names like 'John'
|
|
- Email: 'john@example.com'
|
|
- Organization: 'Google' or 'Acme Corp'
|
|
- Phone number: '+1234567890'
|
|
Examples: 'John Smith', 'john@example.com', 'Google'
|
|
max_results (int): Maximum number of results to fetch.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the search results.
|
|
"""
|
|
try:
|
|
# Search people
|
|
search_result = (
|
|
self.people_service.people()
|
|
.searchContacts(
|
|
query=query,
|
|
readMask='names,emailAddresses,phoneNumbers,organizations',
|
|
pageSize=max_results,
|
|
)
|
|
.execute()
|
|
)
|
|
|
|
results = search_result.get('results', [])
|
|
people_list = []
|
|
|
|
for result in results:
|
|
person = result.get('person', {})
|
|
person_info = {
|
|
"resource_name": person.get('resourceName', ''),
|
|
"names": person.get('names', []),
|
|
"email_addresses": person.get('emailAddresses', []),
|
|
"phone_numbers": person.get('phoneNumbers', []),
|
|
"organizations": person.get('organizations', []),
|
|
}
|
|
people_list.append(person_info)
|
|
|
|
return {
|
|
"success": True,
|
|
"people": people_list,
|
|
"total_count": len(people_list),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to search people: %s", e)
|
|
return {"error": f"Failed to search people: {e!s}"}
|
|
|
|
# Helper methods
|
|
def _get_gmail_service(self):
|
|
r"""Get Gmail service object."""
|
|
from googleapiclient.discovery import build
|
|
|
|
try:
|
|
# Build service with optional timeout
|
|
if self.timeout is not None:
|
|
import httplib2
|
|
|
|
http = httplib2.Http(timeout=self.timeout)
|
|
http = self._credentials.authorize(http)
|
|
service = build('gmail', 'v1', http=http)
|
|
else:
|
|
service = build('gmail', 'v1', credentials=self._credentials)
|
|
return service
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to build Gmail service: {e}") from e
|
|
|
|
def _get_people_service(self):
|
|
r"""Get People service object."""
|
|
from googleapiclient.discovery import build
|
|
|
|
try:
|
|
# Build service with optional timeout
|
|
if self.timeout is not None:
|
|
import httplib2
|
|
|
|
http = httplib2.Http(timeout=self.timeout)
|
|
http = self._credentials.authorize(http)
|
|
service = build('people', 'v1', http=http)
|
|
else:
|
|
service = build('people', 'v1', credentials=self._credentials)
|
|
return service
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to build People service: {e}") from e
|
|
|
|
def _authenticate(self):
|
|
r"""Authenticate with Google APIs using OAuth2.
|
|
|
|
Automatically saves and loads credentials from
|
|
~/.camel/gmail_token.json to avoid repeated
|
|
browser logins.
|
|
"""
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from dotenv import load_dotenv
|
|
from google.auth.transport.requests import Request
|
|
from google.oauth2.credentials import Credentials
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
|
|
# Look for .env file in the project root (camel/)
|
|
env_file = Path(__file__).parent.parent.parent / '.env'
|
|
load_dotenv(env_file)
|
|
|
|
client_id = os.environ.get('GOOGLE_CLIENT_ID')
|
|
client_secret = os.environ.get('GOOGLE_CLIENT_SECRET')
|
|
|
|
if not client_id or not client_secret:
|
|
missing_vars = []
|
|
if not client_id:
|
|
missing_vars.append('GOOGLE_CLIENT_ID')
|
|
if not client_secret:
|
|
missing_vars.append('GOOGLE_CLIENT_SECRET')
|
|
raise ValueError(
|
|
f"Missing required environment variables: "
|
|
f"{', '.join(missing_vars)}. "
|
|
"Please set these in your .env file or environment variables."
|
|
)
|
|
|
|
token_file = Path.home() / '.camel' / 'gmail_token.json'
|
|
creds = None
|
|
|
|
# COMPONENT 1: Load saved credentials
|
|
if token_file.exists():
|
|
try:
|
|
with open(token_file, 'r') as f:
|
|
data = json.load(f)
|
|
creds = Credentials(
|
|
token=data.get('token'),
|
|
refresh_token=data.get('refresh_token'),
|
|
token_uri=data.get(
|
|
'token_uri', 'https://oauth2.googleapis.com/token'
|
|
),
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
scopes=SCOPES,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load saved token: {e}")
|
|
creds = None
|
|
|
|
# COMPONENT 2: Refresh if expired
|
|
if creds and creds.expired and creds.refresh_token:
|
|
try:
|
|
creds.refresh(Request())
|
|
logger.info("Access token refreshed")
|
|
|
|
# Save refreshed credentials to disk
|
|
token_file.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
with open(token_file, 'w') as f:
|
|
json.dump(
|
|
{
|
|
'token': creds.token,
|
|
'refresh_token': creds.refresh_token,
|
|
'token_uri': creds.token_uri,
|
|
'scopes': creds.scopes,
|
|
},
|
|
f,
|
|
)
|
|
os.chmod(token_file, 0o600)
|
|
logger.info(f"Refreshed credentials saved to {token_file}")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to save refreshed credentials to "
|
|
f"{token_file}: {e}. "
|
|
"Token refreshed but not persisted."
|
|
)
|
|
|
|
return creds
|
|
except Exception as e:
|
|
logger.warning(f"Token refresh failed: {e}")
|
|
creds = None
|
|
|
|
# COMPONENT 3: Return if valid
|
|
if creds and creds.valid:
|
|
return creds
|
|
|
|
# COMPONENT 4: Browser OAuth (first-time or invalid credentials)
|
|
client_config = {
|
|
"installed": {
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
|
|
"token_uri": "https://oauth2.googleapis.com/token",
|
|
"redirect_uris": ["http://localhost"],
|
|
}
|
|
}
|
|
|
|
flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
|
|
creds = flow.run_local_server(port=0)
|
|
|
|
# Save new credentials
|
|
token_file.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
with open(token_file, 'w') as f:
|
|
json.dump(
|
|
{
|
|
'token': creds.token,
|
|
'refresh_token': creds.refresh_token,
|
|
'token_uri': creds.token_uri,
|
|
'scopes': creds.scopes,
|
|
},
|
|
f,
|
|
)
|
|
os.chmod(token_file, 0o600)
|
|
logger.info(f"Credentials saved to {token_file}")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to save credentials to {token_file}: {e}. "
|
|
"You may need to re-authenticate next time."
|
|
)
|
|
|
|
return creds
|
|
|
|
def _create_message(
|
|
self,
|
|
to_list: List[str],
|
|
subject: str,
|
|
body: str,
|
|
cc_list: Optional[List[str]] = None,
|
|
bcc_list: Optional[List[str]] = None,
|
|
attachments: Optional[List[str]] = None,
|
|
is_html: bool = False,
|
|
in_reply_to: Optional[str] = None,
|
|
references: Optional[List[str]] = None,
|
|
) -> Dict[str, str]:
|
|
r"""Create a message object for sending."""
|
|
|
|
import base64
|
|
from email import encoders
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
|
|
message = MIMEMultipart()
|
|
message['to'] = ', '.join(to_list)
|
|
message['subject'] = subject
|
|
|
|
if cc_list:
|
|
message['cc'] = ', '.join(cc_list)
|
|
if bcc_list:
|
|
message['bcc'] = ', '.join(bcc_list)
|
|
|
|
# Set reply headers when provided
|
|
if in_reply_to:
|
|
message['In-Reply-To'] = in_reply_to
|
|
if references:
|
|
message['References'] = ' '.join(references)
|
|
|
|
# Add body
|
|
if is_html:
|
|
message.attach(MIMEText(body, 'html'))
|
|
else:
|
|
message.attach(MIMEText(body, 'plain'))
|
|
|
|
# Add attachments
|
|
if attachments:
|
|
for file_path in attachments:
|
|
if os.path.isfile(file_path):
|
|
with open(file_path, "rb") as attachment:
|
|
part = MIMEBase('application', 'octet-stream')
|
|
part.set_payload(attachment.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
'Content-Disposition',
|
|
f'attachment; filename= '
|
|
f'{os.path.basename(file_path)}',
|
|
)
|
|
message.attach(part)
|
|
|
|
# Encode message
|
|
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode(
|
|
'utf-8'
|
|
)
|
|
return {'raw': raw_message}
|
|
|
|
def _get_message_details(
|
|
self, message_id: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
r"""Get detailed information about a message."""
|
|
try:
|
|
message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.get(userId='me', id=message_id)
|
|
.execute()
|
|
)
|
|
|
|
headers = message['payload'].get('headers', [])
|
|
# Build a name->value map in one pass (case-insensitive)
|
|
header_map = {}
|
|
for header in headers:
|
|
name = header.get('name')
|
|
if name:
|
|
header_map[name.lower()] = header.get('value', '')
|
|
|
|
return {
|
|
"message_id": message['id'],
|
|
"thread_id": message['threadId'],
|
|
"snippet": message.get('snippet', ''),
|
|
"subject": header_map.get('subject', ''),
|
|
"from": header_map.get('from', ''),
|
|
"to": header_map.get('to', ''),
|
|
"cc": header_map.get('cc', ''),
|
|
"bcc": header_map.get('bcc', ''),
|
|
"date": header_map.get('date', ''),
|
|
"body": self._extract_message_body(message),
|
|
"attachments": self._extract_attachments(
|
|
message, include_inline=True
|
|
),
|
|
"label_ids": message.get('labelIds', []),
|
|
"size_estimate": message.get('sizeEstimate', 0),
|
|
}
|
|
except Exception as e:
|
|
logger.error("Failed to get message details: %s", e)
|
|
return None
|
|
|
|
def _get_header_value(
|
|
self, headers: List[Dict[str, str]], name: str
|
|
) -> str:
|
|
r"""Get header value by name."""
|
|
for header in headers:
|
|
if header['name'].lower() == name.lower():
|
|
return header['value']
|
|
return ""
|
|
|
|
def _extract_message_body(self, message: Dict[str, Any]) -> str:
|
|
r"""Extract message body from message payload.
|
|
|
|
Recursively traverses the entire message tree and collects all text
|
|
content from text/plain and text/html parts. Special handling for
|
|
multipart/alternative containers: recursively searches for one format
|
|
(preferring plain text) to avoid duplication when both formats contain
|
|
the same content. All other text parts are collected to ensure no
|
|
information is lost.
|
|
|
|
Args:
|
|
message (Dict[str, Any]): The Gmail message dictionary containing
|
|
the payload to extract text from.
|
|
|
|
Returns:
|
|
str: The extracted message body text with multiple parts separated
|
|
by double newlines, or an empty string if no text content is
|
|
found.
|
|
"""
|
|
import base64
|
|
import re
|
|
|
|
text_parts = []
|
|
|
|
def decode_text_data(data: str, mime_type: str) -> Optional[str]:
|
|
"""Helper to decode base64 text data.
|
|
|
|
Args:
|
|
data: Base64 encoded text data.
|
|
mime_type: MIME type for logging purposes.
|
|
|
|
Returns:
|
|
Decoded text string, or None if decoding fails or text
|
|
is empty.
|
|
"""
|
|
if not data:
|
|
return None
|
|
try:
|
|
text = base64.urlsafe_b64decode(data).decode('utf-8')
|
|
return text if text.strip() else None
|
|
except Exception as e:
|
|
logger.warning(f"Failed to decode {mime_type}: {e}")
|
|
return None
|
|
|
|
def strip_html_tags(html_content: str) -> str:
|
|
"""Strip HTML tags and convert to readable plain text.
|
|
|
|
Uses regex to remove tags and clean up formatting while preserving
|
|
basic document structure.
|
|
|
|
Args:
|
|
html_content: HTML content to strip.
|
|
|
|
Returns:
|
|
Plain text version of HTML content.
|
|
"""
|
|
if not html_content or not html_content.strip():
|
|
return ""
|
|
|
|
text = html_content
|
|
|
|
# Remove script and style elements completely
|
|
text = re.sub(
|
|
r'<script[^>]*>.*?</script>',
|
|
'',
|
|
text,
|
|
flags=re.DOTALL | re.IGNORECASE,
|
|
)
|
|
text = re.sub(
|
|
r'<style[^>]*>.*?</style>',
|
|
'',
|
|
text,
|
|
flags=re.DOTALL | re.IGNORECASE,
|
|
)
|
|
|
|
# Convert common HTML entities
|
|
text = text.replace(' ', ' ')
|
|
text = text.replace('&', '&')
|
|
text = text.replace('<', '<')
|
|
text = text.replace('>', '>')
|
|
text = text.replace('"', '"')
|
|
text = text.replace(''', "'")
|
|
text = text.replace('’', "'")
|
|
text = text.replace('‘', "'")
|
|
text = text.replace('”', '"')
|
|
text = text.replace('“', '"')
|
|
text = text.replace('—', '—')
|
|
text = text.replace('–', '-')
|
|
|
|
# Convert <br> and <br/> to newlines
|
|
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE)
|
|
|
|
# Convert block-level closing tags to newlines
|
|
text = re.sub(
|
|
r'</(p|div|h[1-6]|tr|li)>', '\n', text, flags=re.IGNORECASE
|
|
)
|
|
|
|
# Convert <hr> to separator
|
|
text = re.sub(r'<hr\s*/?>', '\n---\n', text, flags=re.IGNORECASE)
|
|
|
|
# Remove all remaining HTML tags
|
|
text = re.sub(r'<[^>]+>', '', text)
|
|
|
|
# Clean up whitespace
|
|
text = re.sub(
|
|
r'\n\s*\n\s*\n+', '\n\n', text
|
|
) # Multiple blank lines to double newline
|
|
text = re.sub(r' +', ' ', text) # Multiple spaces to single space
|
|
text = re.sub(r'\n ', '\n', text) # Remove leading spaces on lines
|
|
text = re.sub(
|
|
r' \n', '\n', text
|
|
) # Remove trailing spaces on lines
|
|
|
|
return text.strip()
|
|
|
|
def find_text_recursive(
|
|
part: Dict[str, Any], target_mime: str
|
|
) -> Optional[str]:
|
|
"""Recursively search for text content of a specific MIME type.
|
|
|
|
Args:
|
|
part: Message part to search in.
|
|
target_mime: Target MIME type ('text/plain' or 'text/html').
|
|
|
|
Returns:
|
|
Decoded text string if found, None otherwise.
|
|
"""
|
|
mime = part.get('mimeType', '')
|
|
|
|
# Found the target type at this level
|
|
if mime == target_mime:
|
|
data = part.get('body', {}).get('data', '')
|
|
decoded = decode_text_data(data, target_mime)
|
|
# Strip HTML tags if this is HTML content
|
|
if decoded and target_mime == 'text/html':
|
|
return strip_html_tags(decoded)
|
|
return decoded
|
|
|
|
# Not found, but has nested parts? Search recursively
|
|
if 'parts' in part:
|
|
for nested_part in part['parts']:
|
|
result = find_text_recursive(nested_part, target_mime)
|
|
if result:
|
|
return result
|
|
|
|
return None
|
|
|
|
def extract_from_part(part: Dict[str, Any]):
|
|
"""Recursively collect all text from message parts."""
|
|
mime_type = part.get('mimeType', '')
|
|
|
|
# Special handling for multipart/alternative
|
|
if mime_type == 'multipart/alternative' and 'parts' in part:
|
|
# Recursively search for one format (prefer plain text)
|
|
plain_text = None
|
|
html_text = None
|
|
|
|
# Search each alternative branch recursively
|
|
for nested_part in part['parts']:
|
|
if not plain_text:
|
|
plain_text = find_text_recursive(
|
|
nested_part, 'text/plain'
|
|
)
|
|
if not html_text:
|
|
html_text = find_text_recursive(
|
|
nested_part, 'text/html'
|
|
)
|
|
|
|
# Prefer plain text, fall back to HTML
|
|
chosen_text = plain_text if plain_text else html_text
|
|
if chosen_text:
|
|
text_parts.append(chosen_text)
|
|
|
|
# If this part has nested parts (but not multipart/alternative)
|
|
elif 'parts' in part:
|
|
for nested_part in part['parts']:
|
|
extract_from_part(nested_part)
|
|
|
|
# If this is a text leaf, extract and collect it
|
|
elif mime_type == 'text/plain':
|
|
data = part.get('body', {}).get('data', '')
|
|
text = decode_text_data(data, 'plain text body')
|
|
if text:
|
|
text_parts.append(text)
|
|
|
|
elif mime_type == 'text/html':
|
|
data = part.get('body', {}).get('data', '')
|
|
html_text = decode_text_data(data, 'HTML body')
|
|
if html_text:
|
|
text = strip_html_tags(html_text)
|
|
if text:
|
|
text_parts.append(text)
|
|
|
|
# Traverse the entire tree and collect all text parts
|
|
payload = message.get('payload', {})
|
|
extract_from_part(payload)
|
|
|
|
if not text_parts:
|
|
return ""
|
|
|
|
# Return all text parts combined
|
|
return '\n\n'.join(text_parts)
|
|
|
|
def _extract_attachments(
|
|
self, message: Dict[str, Any], include_inline: bool = False
|
|
) -> List[Dict[str, Any]]:
|
|
r"""Extract attachment information from message payload.
|
|
|
|
Recursively traverses the message tree to find all attachments
|
|
and extracts their metadata. Distinguishes between regular attachments
|
|
and inline images embedded in HTML content.
|
|
|
|
Args:
|
|
message (Dict[str, Any]): The Gmail message dictionary containing
|
|
the payload to extract attachments from.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: List of attachment dictionaries, each
|
|
containing:
|
|
- attachment_id: Gmail's unique identifier for the attachment
|
|
- filename: Name of the attached file
|
|
- mime_type: MIME type of the attachment
|
|
- size: Size of the attachment in bytes
|
|
- is_inline: Whether this is an inline image (embedded in HTML)
|
|
"""
|
|
attachments = []
|
|
|
|
def is_inline_image(part: Dict[str, Any]) -> bool:
|
|
"""Check if this part is an inline image."""
|
|
headers = part.get('headers', [])
|
|
for header in headers:
|
|
name = header.get('name', '').lower()
|
|
value = header.get('value', '').lower()
|
|
# Check for Content-Disposition: inline
|
|
if name == 'content-disposition' and 'inline' in value:
|
|
return True
|
|
# Check for Content-ID (usually indicates inline)
|
|
if name == 'content-id':
|
|
return True
|
|
return False
|
|
|
|
def find_attachments(part: Dict[str, Any]):
|
|
"""Recursively find attachments in message parts."""
|
|
# Check if this part has an attachmentId (indicates it's an
|
|
# attachment)
|
|
if 'body' in part and 'attachmentId' in part['body']:
|
|
attachment_info = {
|
|
'attachment_id': part['body']['attachmentId'],
|
|
'filename': part.get('filename', 'unnamed'),
|
|
'mime_type': part.get(
|
|
'mimeType', 'application/octet-stream'
|
|
),
|
|
'size': part['body'].get('size', 0),
|
|
'is_inline': is_inline_image(part),
|
|
}
|
|
attachments.append(attachment_info)
|
|
|
|
# Recurse into nested parts
|
|
if 'parts' in part:
|
|
for nested_part in part['parts']:
|
|
find_attachments(nested_part)
|
|
|
|
# Start traversal from the message payload
|
|
payload = message.get('payload', {})
|
|
if payload:
|
|
find_attachments(payload)
|
|
|
|
# Return based on include_inline toggle
|
|
if include_inline:
|
|
return attachments
|
|
return [att for att in attachments if not att['is_inline']]
|
|
|
|
def _is_valid_email(self, email: str) -> bool:
|
|
r"""Validate email address format.
|
|
|
|
Supports both formats:
|
|
- Plain email: john@example.com
|
|
- Named email: John Doe <john@example.com>
|
|
"""
|
|
# Extract email from "Name <email>" format if present
|
|
match = re.search(r'<([^>]+)>$', email.strip())
|
|
email_to_check = match.group(1) if match else email.strip()
|
|
|
|
# Validate the email address
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
return re.match(pattern, email_to_check) is not None
|
|
|
|
def get_tools(self) -> List[FunctionTool]:
|
|
r"""Returns a list of FunctionTool objects representing the
|
|
functions in the toolkit.
|
|
|
|
Returns:
|
|
List[FunctionTool]: A list of FunctionTool objects
|
|
representing the functions in the toolkit.
|
|
"""
|
|
return [
|
|
FunctionTool(self.gmail_send_email),
|
|
FunctionTool(self.gmail_reply_to_email),
|
|
FunctionTool(self.gmail_forward_email),
|
|
FunctionTool(self.gmail_create_draft),
|
|
FunctionTool(self.gmail_send_draft),
|
|
FunctionTool(self.gmail_fetch_emails),
|
|
FunctionTool(self.gmail_fetch_thread_by_id),
|
|
FunctionTool(self.gmail_modify_email_labels),
|
|
FunctionTool(self.gmail_move_to_trash),
|
|
FunctionTool(self.gmail_get_attachment),
|
|
FunctionTool(self.gmail_list_threads),
|
|
FunctionTool(self.gmail_list_drafts),
|
|
FunctionTool(self.gmail_list_labels),
|
|
FunctionTool(self.gmail_create_label),
|
|
FunctionTool(self.gmail_delete_label),
|
|
FunctionTool(self.gmail_modify_thread_labels),
|
|
FunctionTool(self.gmail_get_profile),
|
|
FunctionTool(self.gmail_get_contacts),
|
|
FunctionTool(self.gmail_search_people),
|
|
]
|