Merge pull request #257 from CREDO23/feature/google-gmail-connector

[Feature] Add Gmail connector
This commit is contained in:
Rohan Verma 2025-08-17 14:35:13 -07:00 committed by GitHub
commit df3e681ee2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 1319 additions and 4 deletions

View file

@ -9,6 +9,7 @@ AUTH_TYPE=GOOGLE or LOCAL
GOOGLE_OAUTH_CLIENT_ID=924507538m
GOOGLE_OAUTH_CLIENT_SECRET=GOCSV
GOOGLE_CALENDAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/calendar/connector/callback
GOOGLE_GMAIL_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/gmail/connector/callback
# Embedding Model
EMBEDDING_MODEL=mixedbread-ai/mxbai-embed-large-v1

View file

@ -0,0 +1,65 @@
"""Add Google Gmail connector enums
Revision ID: 18
Revises: 17
Create Date: 2024-02-01 12:00:00.000000
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "18"
down_revision: str | None = "17"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Safely add 'GOOGLE_GMAIL_CONNECTOR' to enum types if missing."""
# Add to searchsourceconnectortype enum
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'GOOGLE_GMAIL_CONNECTOR'
) THEN
ALTER TYPE searchsourceconnectortype ADD VALUE 'GOOGLE_GMAIL_CONNECTOR';
END IF;
END
$$;
"""
)
# Add to documenttype enum
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'documenttype' AND e.enumlabel = 'GOOGLE_GMAIL_CONNECTOR'
) THEN
ALTER TYPE documenttype ADD VALUE 'GOOGLE_GMAIL_CONNECTOR';
END IF;
END
$$;
"""
)
def downgrade() -> None:
"""Remove 'GOOGLE_GMAIL_CONNECTOR' from enum types."""
# Note: PostgreSQL doesn't support removing enum values directly
# This would require recreating the enum type, which is complex
# For now, we'll leave the enum values in place
# In a production environment, you might want to implement a more sophisticated downgrade
pass

View file

@ -988,6 +988,32 @@ async def fetch_relevant_documents(
)
}
)
elif connector == "GOOGLE_GMAIL_CONNECTOR":
(
source_object,
gmail_chunks,
) = await connector_service.search_google_gmail(
user_query=reformulated_query,
user_id=user_id,
search_space_id=search_space_id,
top_k=top_k,
search_mode=search_mode,
)
# Add to sources and raw documents
if source_object:
all_sources.append(source_object)
all_raw_documents.extend(gmail_chunks)
# Stream found document count
if streaming_service and writer:
writer(
{
"yield_value": streaming_service.format_terminal_info_delta(
f"📧 Found {len(gmail_chunks)} Gmail messages related to your query"
)
}
)
elif connector == "CONFLUENCE_CONNECTOR":
(
source_object,

View file

@ -19,6 +19,7 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel
- CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation)
- CLICKUP_CONNECTOR: "ClickUp tasks and project data" (personal task management)
- GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management)
- GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications)
- DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications)
- TAVILY_API: "Tavily search API results" (personalized search results)
- LINKUP_API: "Linkup search API results" (personalized search results)

View file

@ -19,6 +19,7 @@ You are SurfSense, an advanced AI research assistant that synthesizes informatio
- CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation)
- CLICKUP_CONNECTOR: "ClickUp tasks and project data" (personal task management)
- GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management)
- GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications)
- DISCORD_CONNECTOR: "Discord server messages and channels" (personal community interactions)
- TAVILY_API: "Tavily search API results" (personalized search results)
- LINKUP_API: "Linkup search API results" (personalized search results)

View file

@ -51,6 +51,9 @@ class Config:
# Google Calendar redirect URI
GOOGLE_CALENDAR_REDIRECT_URI = os.getenv("GOOGLE_CALENDAR_REDIRECT_URI")
# Google Gmail redirect URI
GOOGLE_GMAIL_REDIRECT_URI = os.getenv("GOOGLE_GMAIL_REDIRECT_URI")
# LLM instances are now managed per-user through the LLMConfig system
# Legacy environment variables removed in favor of user-specific configurations

View file

@ -0,0 +1,337 @@
"""
Google Gmail Connector Module | Google OAuth Credentials | Gmail API
A module for retrieving emails from Gmail using Google OAuth credentials.
Allows fetching emails from Gmail mailbox using Google OAuth credentials.
"""
import base64
import re
from typing import Any
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
class GoogleGmailConnector:
"""Class for retrieving emails from Gmail using Google OAuth credentials."""
def __init__(
self,
credentials: Credentials,
):
"""
Initialize the GoogleGmailConnector class.
Args:
credentials: Google OAuth Credentials object
"""
self._credentials = credentials
self.service = None
def _get_credentials(self) -> Credentials:
"""
Get valid Google OAuth credentials.
Returns:
Google OAuth credentials
Raises:
ValueError: If credentials have not been set
Exception: If credential refresh fails
"""
if not all(
[
self._credentials.client_id,
self._credentials.client_secret,
self._credentials.refresh_token,
]
):
raise ValueError(
"Google OAuth credentials (client_id, client_secret, refresh_token) must be set"
)
if self._credentials and not self._credentials.expired:
return self._credentials
# Create credentials from refresh token
self._credentials = Credentials(
token=self._credentials.token,
refresh_token=self._credentials.refresh_token,
token_uri=self._credentials.token_uri,
client_id=self._credentials.client_id,
client_secret=self._credentials.client_secret,
scopes=self._credentials.scopes,
)
# Refresh the token if needed
if self._credentials.expired or not self._credentials.valid:
try:
self._credentials.refresh(Request())
except Exception as e:
raise Exception(
f"Failed to refresh Google OAuth credentials: {e!s}"
) from e
return self._credentials
def _get_service(self):
"""
Get the Gmail service instance using Google OAuth credentials.
Returns:
Gmail service instance
Raises:
ValueError: If credentials have not been set
Exception: If service creation fails
"""
if self.service:
return self.service
try:
credentials = self._get_credentials()
self.service = build("gmail", "v1", credentials=credentials)
return self.service
except Exception as e:
raise Exception(f"Failed to create Gmail service: {e!s}") from e
def get_user_profile(self) -> tuple[dict[str, Any], str | None]:
"""
Fetch user's Gmail profile information.
Returns:
Tuple containing (profile dict, error message or None)
"""
try:
service = self._get_service()
profile = service.users().getProfile(userId="me").execute()
return {
"email_address": profile.get("emailAddress"),
"messages_total": profile.get("messagesTotal", 0),
"threads_total": profile.get("threadsTotal", 0),
"history_id": profile.get("historyId"),
}, None
except Exception as e:
return {}, f"Error fetching user profile: {e!s}"
def get_messages_list(
self,
max_results: int = 100,
query: str = "",
label_ids: list[str] | None = None,
include_spam_trash: bool = False,
) -> tuple[list[dict[str, Any]], str | None]:
"""
Fetch list of messages from Gmail.
Args:
max_results: Maximum number of messages to fetch (default: 100)
query: Gmail search query (e.g., "is:unread", "from:example@gmail.com")
label_ids: List of label IDs to filter by
include_spam_trash: Whether to include spam and trash
Returns:
Tuple containing (messages list, error message or None)
"""
try:
service = self._get_service()
# Build request parameters
request_params = {
"userId": "me",
"maxResults": max_results,
"includeSpamTrash": include_spam_trash,
}
if query:
request_params["q"] = query
if label_ids:
request_params["labelIds"] = label_ids
# Get messages list
result = service.users().messages().list(**request_params).execute()
messages = result.get("messages", [])
return messages, None
except Exception as e:
return [], f"Error fetching messages list: {e!s}"
def get_message_details(self, message_id: str) -> tuple[dict[str, Any], str | None]:
"""
Fetch detailed information for a specific message.
Args:
message_id: The ID of the message to fetch
Returns:
Tuple containing (message details dict, error message or None)
"""
try:
service = self._get_service()
# Get full message details
message = (
service.users()
.messages()
.get(userId="me", id=message_id, format="full")
.execute()
)
return message, None
except Exception as e:
return {}, f"Error fetching message details: {e!s}"
def get_recent_messages(
self,
max_results: int = 50,
days_back: int = 30,
) -> tuple[list[dict[str, Any]], str | None]:
"""
Fetch recent messages from Gmail within specified days.
Args:
max_results: Maximum number of messages to fetch (default: 50)
days_back: Number of days to look back (default: 30)
Returns:
Tuple containing (messages list with details, error message or None)
"""
try:
# Calculate date query
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days_back)
date_query = cutoff_date.strftime("%Y/%m/%d")
query = f"after:{date_query}"
# Get messages list
messages_list, error = self.get_messages_list(
max_results=max_results, query=query
)
if error:
return [], error
# Get detailed information for each message
detailed_messages = []
for msg in messages_list:
message_details, detail_error = self.get_message_details(msg["id"])
if detail_error:
continue # Skip messages that can't be fetched
detailed_messages.append(message_details)
return detailed_messages, None
except Exception as e:
return [], f"Error fetching recent messages: {e!s}"
def extract_message_text(self, message: dict[str, Any]) -> str:
"""
Extract text content from a Gmail message.
Args:
message: Gmail message object
Returns:
Extracted text content
"""
def get_message_parts(payload):
"""Recursively extract message parts."""
parts = []
if "parts" in payload:
for part in payload["parts"]:
parts.extend(get_message_parts(part))
else:
parts.append(payload)
return parts
try:
payload = message.get("payload", {})
parts = get_message_parts(payload)
text_content = ""
for part in parts:
mime_type = part.get("mimeType", "")
body = part.get("body", {})
data = body.get("data", "")
if mime_type == "text/plain" and data:
# Decode base64 content
decoded_data = base64.urlsafe_b64decode(data + "===").decode(
"utf-8", errors="ignore"
)
text_content += decoded_data + "\n"
elif mime_type == "text/html" and data and not text_content:
# Use HTML as fallback if no plain text
decoded_data = base64.urlsafe_b64decode(data + "===").decode(
"utf-8", errors="ignore"
)
# Basic HTML tag removal (you might want to use a proper HTML parser)
text_content = re.sub(r"<[^>]+>", "", decoded_data)
return text_content.strip()
except Exception as e:
return f"Error extracting message text: {e!s}"
def format_message_to_markdown(self, message: dict[str, Any]) -> str:
"""
Format a Gmail message to markdown.
Args:
message: Message object from Gmail API
Returns:
Formatted markdown string
"""
try:
# Extract basic message information
message_id = message.get("id", "")
thread_id = message.get("threadId", "")
label_ids = message.get("labelIds", [])
# Extract headers
payload = message.get("payload", {})
headers = payload.get("headers", [])
# Parse headers into a dict
header_dict = {}
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
header_dict[name] = value
# Extract key information
subject = header_dict.get("subject", "No Subject")
from_email = header_dict.get("from", "Unknown Sender")
to_email = header_dict.get("to", "Unknown Recipient")
date_str = header_dict.get("date", "Unknown Date")
# Extract message content
message_text = self.extract_message_text(message)
# Build markdown content
markdown_content = f"# {subject}\n\n"
# Add message details
markdown_content += f"**From:** {from_email}\n"
markdown_content += f"**To:** {to_email}\n"
markdown_content += f"**Date:** {date_str}\n"
if label_ids:
markdown_content += f"**Labels:** {', '.join(label_ids)}\n"
markdown_content += "\n"
# Add message content
if message_text:
markdown_content += f"## Message Content\n\n{message_text}\n\n"
# Add message metadata
markdown_content += "## Message Details\n\n"
markdown_content += f"- **Message ID:** {message_id}\n"
markdown_content += f"- **Thread ID:** {thread_id}\n"
# Add snippet if available
snippet = message.get("snippet", "")
if snippet:
markdown_content += f"- **Snippet:** {snippet}\n"
return markdown_content
except Exception as e:
return f"Error formatting message to markdown: {e!s}"

View file

@ -47,6 +47,7 @@ class DocumentType(str, Enum):
CONFLUENCE_CONNECTOR = "CONFLUENCE_CONNECTOR"
CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR"
GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR"
GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR"
class SearchSourceConnectorType(str, Enum):
@ -62,6 +63,7 @@ class SearchSourceConnectorType(str, Enum):
CONFLUENCE_CONNECTOR = "CONFLUENCE_CONNECTOR"
CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR"
GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR"
GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR"
class ChatType(str, Enum):

View file

@ -5,6 +5,9 @@ from .documents_routes import router as documents_router
from .google_calendar_add_connector_route import (
router as google_calendar_add_connector_router,
)
from .google_gmail_add_connector_route import (
router as google_gmail_add_connector_router,
)
from .llm_config_routes import router as llm_config_router
from .logs_routes import router as logs_router
from .podcasts_routes import router as podcasts_router
@ -19,5 +22,6 @@ router.include_router(podcasts_router)
router.include_router(chats_router)
router.include_router(search_source_connectors_router)
router.include_router(google_calendar_add_connector_router)
router.include_router(google_gmail_add_connector_router)
router.include_router(llm_config_router)
router.include_router(logs_router)

View file

@ -0,0 +1,159 @@
import os
os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1"
import base64
import json
import logging
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import RedirectResponse
from google_auth_oauthlib.flow import Flow
from pydantic import ValidationError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.config import config
from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
User,
get_async_session,
)
from app.users import current_active_user
logger = logging.getLogger(__name__)
router = APIRouter()
def get_google_flow():
"""Create and return a Google OAuth flow for Gmail API."""
flow = Flow.from_client_config(
{
"web": {
"client_id": config.GOOGLE_OAUTH_CLIENT_ID,
"client_secret": config.GOOGLE_OAUTH_CLIENT_SECRET,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [config.GOOGLE_GMAIL_REDIRECT_URI],
}
},
scopes=[
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
"openid",
],
)
flow.redirect_uri = config.GOOGLE_GMAIL_REDIRECT_URI
return flow
@router.get("/auth/google/gmail/connector/add/")
async def connect_gmail(space_id: int, user: User = Depends(current_active_user)):
try:
if not space_id:
raise HTTPException(status_code=400, detail="space_id is required")
flow = get_google_flow()
# Encode space_id and user_id in state
state_payload = json.dumps(
{
"space_id": space_id,
"user_id": str(user.id),
}
)
state_encoded = base64.urlsafe_b64encode(state_payload.encode()).decode()
auth_url, _ = flow.authorization_url(
access_type="offline",
prompt="consent",
include_granted_scopes="true",
state=state_encoded,
)
return {"auth_url": auth_url}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to initiate Google OAuth: {e!s}"
) from e
@router.get("/auth/google/gmail/connector/callback/")
async def gmail_callback(
request: Request,
code: str,
state: str,
session: AsyncSession = Depends(get_async_session),
):
try:
# Decode and parse the state
decoded_state = base64.urlsafe_b64decode(state.encode()).decode()
data = json.loads(decoded_state)
user_id = UUID(data["user_id"])
space_id = data["space_id"]
flow = get_google_flow()
flow.fetch_token(code=code)
creds = flow.credentials
creds_dict = json.loads(creds.to_json())
try:
# Check if a connector with the same type already exists for this user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
)
)
existing_connector = result.scalars().first()
if existing_connector:
raise HTTPException(
status_code=409,
detail="A GOOGLE_GMAIL_CONNECTOR connector already exists. Each user can have only one connector of each type.",
)
db_connector = SearchSourceConnector(
name="Google Gmail Connector",
connector_type=SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
config=creds_dict,
user_id=user_id,
is_indexable=True,
)
session.add(db_connector)
await session.commit()
await session.refresh(db_connector)
logger.info(
f"Successfully created Gmail connector for user {user_id} with ID {db_connector.id}"
)
# Redirect to the frontend success page
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/add/google-gmail-connector?success=true"
)
except IntegrityError as e:
await session.rollback()
logger.error(f"Database integrity error: {e!s}")
raise HTTPException(
status_code=409,
detail="A connector with this configuration already exists.",
) from e
except ValidationError as e:
await session.rollback()
logger.error(f"Validation error: {e!s}")
raise HTTPException(
status_code=400, detail=f"Invalid connector configuration: {e!s}"
) from e
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except Exception as e:
logger.error(f"Unexpected error in Gmail callback: {e!s}", exc_info=True)

View file

@ -41,6 +41,7 @@ from app.tasks.connector_indexers import (
index_discord_messages,
index_github_repos,
index_google_calendar_events,
index_google_gmail_messages,
index_jira_issues,
index_linear_issues,
index_notion_pages,
@ -507,6 +508,22 @@ async def index_connector_content(
indexing_to,
)
response_message = "Google Calendar indexing started in the background."
elif (
connector.connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR
):
# Run indexing in background
logger.info(
f"Triggering Google Gmail indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
background_tasks.add_task(
run_google_gmail_indexing_with_new_session,
connector_id,
search_space_id,
str(user.id),
indexing_from,
indexing_to,
)
response_message = "Google Gmail indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
# Run indexing in background
@ -1113,3 +1130,62 @@ async def run_google_calendar_indexing(
exc_info=True,
)
# Optionally update status in DB to indicate failure
async def run_google_gmail_indexing_with_new_session(
connector_id: int,
search_space_id: int,
user_id: str,
max_messages: int,
days_back: int,
):
"""Wrapper to run Google Gmail indexing with its own database session."""
logger.info(
f"Background task started: Indexing Google Gmail connector {connector_id} into space {search_space_id} for {max_messages} messages from the last {days_back} days"
)
async with async_session_maker() as session:
await run_google_gmail_indexing(
session, connector_id, search_space_id, user_id, max_messages, days_back
)
logger.info(
f"Background task finished: Indexing Google Gmail connector {connector_id}"
)
async def run_google_gmail_indexing(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
max_messages: int,
days_back: int,
):
"""Runs the Google Gmail indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_google_gmail_messages(
session,
connector_id,
search_space_id,
user_id,
max_messages,
days_back,
update_last_indexed=False,
)
if error_message:
logger.error(
f"Google Gmail indexing failed for connector {connector_id}: {error_message}"
)
# Optionally update status in DB to indicate failure
else:
logger.info(
f"Google Gmail indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
)
# Update the last indexed timestamp only on success
await update_connector_last_indexed(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_google_gmail_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure

View file

@ -188,6 +188,14 @@ class SearchSourceConnectorBase(BaseModel):
if key not in config or config[key] in (None, ""):
raise ValueError(f"{key} is required and cannot be empty")
elif connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR:
# Required fields for Gmail connector (same as Calendar - uses Google OAuth)
required_keys = list(GoogleAuthCredentialsBase.model_fields.keys())
for key in required_keys:
if key not in config or config[key] in (None, ""):
raise ValueError(f"{key} is required and cannot be empty")
return config

View file

@ -1208,6 +1208,132 @@ class ConnectorService:
return result_object, calendar_chunks
async def search_google_gmail(
self,
user_query: str,
user_id: str,
search_space_id: int,
top_k: int = 20,
search_mode: SearchMode = SearchMode.CHUNKS,
) -> tuple:
"""
Search for Gmail messages and return both the source information and langchain documents
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
search_mode: Search mode (CHUNKS or DOCUMENTS)
Returns:
tuple: (sources_info, langchain_documents)
"""
if search_mode == SearchMode.CHUNKS:
gmail_chunks = await self.chunk_retriever.hybrid_search(
query_text=user_query,
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
document_type="GOOGLE_GMAIL_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
gmail_chunks = await self.document_retriever.hybrid_search(
query_text=user_query,
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
document_type="GOOGLE_GMAIL_CONNECTOR",
)
# Transform document retriever results to match expected format
gmail_chunks = self._transform_document_results(gmail_chunks)
# Early return if no results
if not gmail_chunks:
return {
"id": 32,
"name": "Gmail Messages",
"type": "GOOGLE_GMAIL_CONNECTOR",
"sources": [],
}, []
# Process each chunk and create sources directly without deduplication
sources_list = []
async with self.counter_lock:
for _i, chunk in enumerate(gmail_chunks):
# Extract document metadata
document = chunk.get("document", {})
metadata = document.get("metadata", {})
# Extract Gmail-specific metadata
message_id = metadata.get("message_id", "")
subject = metadata.get("subject", "No Subject")
sender = metadata.get("sender", "Unknown Sender")
date_str = metadata.get("date", "")
thread_id = metadata.get("thread_id", "")
# Create a more descriptive title for Gmail messages
title = f"Email: {subject}"
if sender:
# Extract just the email address or name from sender
import re
sender_match = re.search(r"<([^>]+)>", sender)
if sender_match:
sender_email = sender_match.group(1)
title += f" (from {sender_email})"
else:
title += f" (from {sender})"
# Create a more descriptive description for Gmail messages
description = chunk.get("content", "")[:150]
if len(description) == 150:
description += "..."
# Add message info to description
info_parts = []
if date_str:
info_parts.append(f"Date: {date_str}")
if thread_id:
info_parts.append(f"Thread: {thread_id}")
if info_parts:
if description:
description += f" | {' | '.join(info_parts)}"
else:
description = " | ".join(info_parts)
# For URL, we could construct a URL to the Gmail message
url = ""
if message_id:
# Gmail message URL format
url = f"https://mail.google.com/mail/u/0/#inbox/{message_id}"
source = {
"id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
"url": url,
"message_id": message_id,
"subject": subject,
"sender": sender,
"date": date_str,
"thread_id": thread_id,
}
self.source_id_counter += 1
sources_list.append(source)
# Create result object
result_object = {
"id": 32, # Assign a unique ID for the Gmail connector
"name": "Gmail Messages",
"type": "GOOGLE_GMAIL_CONNECTOR",
"sources": sources_list,
}
return result_object, gmail_chunks
async def search_confluence(
self,
user_query: str,

View file

@ -14,6 +14,7 @@ Available indexers:
- Confluence: Index pages from Confluence spaces
- Discord: Index messages from Discord servers
- ClickUp: Index tasks from ClickUp workspaces
- Google Gmail: Index messages from Google Gmail
- Google Calendar: Index events from Google Calendar
"""
@ -27,6 +28,7 @@ from .github_indexer import index_github_repos
# Calendar and scheduling
from .google_calendar_indexer import index_google_calendar_events
from .google_gmail_indexer import index_google_gmail_messages
from .jira_indexer import index_jira_issues
# Issue tracking and project management
@ -36,7 +38,7 @@ from .linear_indexer import index_linear_issues
from .notion_indexer import index_notion_pages
from .slack_indexer import index_slack_messages
__all__ = [
__all__ = [ # noqa: RUF022
"index_clickup_tasks",
"index_confluence_pages",
"index_discord_messages",
@ -51,4 +53,5 @@ __all__ = [
"index_notion_pages",
# Communication platforms
"index_slack_messages",
"index_google_gmail_messages",
]

View file

@ -0,0 +1,299 @@
"""
Google Gmail connector indexer.
"""
from datetime import datetime
from google.oauth2.credentials import Credentials
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.google_gmail_connector import GoogleGmailConnector
from app.db import (
Document,
DocumentType,
SearchSourceConnectorType,
)
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import generate_content_hash
from .base import (
check_duplicate_document_by_hash,
create_document_chunks,
get_connector_by_id,
logger,
update_connector_last_indexed,
)
async def index_google_gmail_messages(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
max_messages: int = 100,
) -> tuple[int, str]:
"""
Index Gmail messages for a specific connector.
Args:
session: Database session
connector_id: ID of the Gmail connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for filtering messages (YYYY-MM-DD format)
end_date: End date for filtering messages (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
max_messages: Maximum number of messages to fetch (default: 100)
Returns:
Tuple of (number_of_indexed_messages, status_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Calculate days back based on start_date
if start_date:
try:
start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
days_back = (datetime.now() - start_date_obj).days
except ValueError:
days_back = 30 # Default to 30 days if start_date is invalid
# Log task start
log_entry = await task_logger.log_task_start(
task_name="google_gmail_messages_indexing",
source="connector_indexing_task",
message=f"Starting Gmail messages indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"max_messages": max_messages,
"days_back": days_back,
},
)
try:
# Get connector by id
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR
)
if not connector:
error_msg = f"Gmail connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
# Create credentials from connector config
config_data = connector.config
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
)
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google gmail credentials not found in connector config for connector {connector_id}",
"Missing Google gmail credentials",
{"error_type": "MissingCredentials"},
)
return 0, "Google gmail credentials not found in connector config"
# Initialize Google gmail client
await task_logger.log_task_progress(
log_entry,
f"Initializing Google gmail client for connector {connector_id}",
{"stage": "client_initialization"},
)
# Initialize Google gmail connector
gmail_connector = GoogleGmailConnector(credentials)
# Fetch recent Google gmail messages
logger.info(f"Fetching recent emails for connector {connector_id}")
messages, error = gmail_connector.get_recent_messages(
max_results=max_messages, days_back=days_back
)
if error:
await task_logger.log_task_failure(
log_entry, f"Failed to fetch messages: {error}", {}
)
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
success_msg = "No Google gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
return 0, success_msg
logger.info(f"Found {len(messages)} Google gmail messages to index")
documents_indexed = 0
skipped_messages = []
documents_skipped = 0
for message in messages:
try:
# Extract message information
message_id = message.get("id", "")
thread_id = message.get("threadId", "")
# Extract headers for subject and sender
payload = message.get("payload", {})
headers = payload.get("headers", [])
subject = "No Subject"
sender = "Unknown Sender"
date_str = "Unknown Date"
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
if name == "subject":
subject = value
elif name == "from":
sender = value
elif name == "date":
date_str = value
if not message_id:
logger.warning(f"Skipping message with missing ID: {subject}")
skipped_messages.append(f"{subject} (missing ID)")
documents_skipped += 1
continue
# Format message to markdown
markdown_content = gmail_connector.format_message_to_markdown(message)
if not markdown_content.strip():
logger.warning(f"Skipping message with no content: {subject}")
skipped_messages.append(f"{subject} (no content)")
documents_skipped += 1
continue
# Create a simple summary
summary_content = f"Google Gmail Message: {subject}\n\n"
summary_content += f"Sender: {sender}\n"
summary_content += f"Date: {date_str}\n"
# Generate content hash
content_hash = generate_content_hash(markdown_content, search_space_id)
# Check if document already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for message {message_id}. Skipping processing."
)
documents_skipped += 1
continue
# Generate embedding for the summary
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Create and store new document
logger.info(f"Creating new document for Gmail message: {subject}")
document = Document(
search_space_id=search_space_id,
title=f"Gmail: {subject}",
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
document_metadata={
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
},
content=markdown_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new email {summary_content}")
except Exception as e:
logger.error(
f"Error processing the email {message_id}: {e!s}",
exc_info=True,
)
skipped_messages.append(f"{subject} (processing error)")
documents_skipped += 1
continue # Skip this message and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all changes
await session.commit()
logger.info(
"Successfully committed all Google gmail document changes to database"
)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google gmail indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_messages_count": len(skipped_messages),
},
)
logger.info(
f"Google gmail indexing completed: {documents_indexed} new emails, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Google gmail indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Google gmail emails for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True)
return 0, f"Failed to index Google gmail emails: {e!s}"

View file

@ -0,0 +1,199 @@
"use client";
import { zodResolver } from "@hookform/resolvers/zod";
import { IconMail } from "@tabler/icons-react";
import { motion } from "framer-motion";
import { ArrowLeft, Check, ExternalLink, Loader2 } from "lucide-react";
import Link from "next/link";
import { useParams, useRouter, useSearchParams } from "next/navigation";
import { useEffect, useState } from "react";
import { useForm } from "react-hook-form";
import { toast } from "sonner";
import { z } from "zod";
import { Button } from "@/components/ui/button";
import {
Card,
CardContent,
CardDescription,
CardFooter,
CardHeader,
CardTitle,
} from "@/components/ui/card";
import {
type SearchSourceConnector,
useSearchSourceConnectors,
} from "@/hooks/useSearchSourceConnectors";
export default function GoogleGmailConnectorPage() {
const router = useRouter();
const params = useParams();
const searchSpaceId = params.search_space_id as string;
const [isConnecting, setIsConnecting] = useState(false);
const [doesConnectorExist, setDoesConnectorExist] = useState(false);
const { fetchConnectors } = useSearchSourceConnectors();
useEffect(() => {
fetchConnectors().then((data) => {
const connector = data.find(
(c: SearchSourceConnector) => c.connector_type === "GOOGLE_GMAIL_CONNECTOR"
);
if (connector) {
setDoesConnectorExist(true);
}
});
}, []);
// Handle Google OAuth connection
const handleConnectGoogle = async () => {
try {
setIsConnecting(true);
// Call backend to initiate authorization flow
const response = await fetch(
`${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/auth/google/gmail/connector/add/?space_id=${searchSpaceId}`,
{
method: "GET",
headers: {
Authorization: `Bearer ${localStorage.getItem("surfsense_bearer_token")}`,
},
}
);
if (!response.ok) {
throw new Error("Failed to initiate Google OAuth");
}
const data = await response.json();
// Redirect to Google for authentication
window.location.href = data.auth_url;
} catch (error) {
console.error("Error connecting to Google:", error);
toast.error("Failed to connect to Google Gmail");
} finally {
setIsConnecting(false);
}
};
return (
<div className="container mx-auto py-8 max-w-2xl">
<motion.div
initial={{ opacity: 0, y: 20 }}
animate={{ opacity: 1, y: 0 }}
transition={{ duration: 0.5 }}
>
{/* Header */}
<div className="mb-8">
<Link
href={`/dashboard/${searchSpaceId}/connectors/add`}
className="inline-flex items-center text-sm text-muted-foreground hover:text-foreground mb-4"
>
<ArrowLeft className="mr-2 h-4 w-4" />
Back to connectors
</Link>
<div className="flex items-center gap-4">
<div className="flex h-12 w-12 items-center justify-center rounded-lg bg-red-100 dark:bg-red-900">
<IconMail className="h-6 w-6 text-red-600 dark:text-red-400" />
</div>
<div>
<h1 className="text-3xl font-bold tracking-tight">Connect Google Gmail</h1>
<p className="text-muted-foreground">
Connect your Gmail account to search through your emails
</p>
</div>
</div>
</div>
{/* Connection Card */}
{!doesConnectorExist ? (
<Card>
<CardHeader>
<CardTitle>Connect Your Gmail Account</CardTitle>
<CardDescription>
Securely connect your Gmail account to enable email search within SurfSense. We'll
only access your emails with read-only permissions.
</CardDescription>
</CardHeader>
<CardContent className="space-y-4">
<div className="flex items-center gap-3 text-sm text-muted-foreground">
<Check className="h-4 w-4 text-green-500" />
<span>Read-only access to your emails</span>
</div>
<div className="flex items-center gap-3 text-sm text-muted-foreground">
<Check className="h-4 w-4 text-green-500" />
<span>Search through email content and metadata</span>
</div>
<div className="flex items-center gap-3 text-sm text-muted-foreground">
<Check className="h-4 w-4 text-green-500" />
<span>Secure OAuth 2.0 authentication</span>
</div>
<div className="flex items-center gap-3 text-sm text-muted-foreground">
<Check className="h-4 w-4 text-green-500" />
<span>You can disconnect anytime</span>
</div>
</CardContent>
<CardFooter className="flex justify-between">
<Button
type="button"
variant="outline"
onClick={() => router.push(`/dashboard/${searchSpaceId}/connectors/add`)}
>
Cancel
</Button>
<Button onClick={handleConnectGoogle} disabled={isConnecting}>
{isConnecting ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Connecting...
</>
) : (
<>
<ExternalLink className="mr-2 h-4 w-4" />
Connect Your Google Account
</>
)}
</Button>
</CardFooter>
</Card>
) : (
/* Configuration Form Card */
<Card>
<CardHeader>
<CardTitle> Your Gmail is successfully connected!</CardTitle>
</CardHeader>
</Card>
)}
{/* Information Card */}
<Card className="mt-6">
<CardHeader>
<CardTitle>What data will be indexed?</CardTitle>
</CardHeader>
<CardContent className="space-y-4">
<div className="space-y-2">
<h4 className="font-medium">Email Content</h4>
<p className="text-sm text-muted-foreground">
We'll index the content of your emails including subject lines, sender information,
and message body text to make them searchable.
</p>
</div>
<div className="space-y-2">
<h4 className="font-medium">Email Metadata</h4>
<p className="text-sm text-muted-foreground">
Information like sender, recipient, date, and labels will be indexed to provide
better search context and filtering options.
</p>
</div>
<div className="space-y-2">
<h4 className="font-medium">Privacy & Security</h4>
<p className="text-sm text-muted-foreground">
Your emails are processed securely and stored with encryption. We only access emails
with read-only permissions and never modify or send emails on your behalf.
</p>
</div>
</CardContent>
</Card>
</motion.div>
</div>
);
}

View file

@ -157,11 +157,11 @@ const connectorCategories: ConnectorCategory[] = [
status: "available",
},
{
id: "gmail",
id: "google-gmail-connector",
title: "Gmail",
description: "Connect to your Gmail account to access emails.",
description: "Connect to your Gmail account to search through your emails.",
icon: <IconMail className="h-6 w-6" />,
status: "coming-soon",
status: "available",
},
{
id: "zoom",

View file

@ -10,6 +10,7 @@ import {
IconCalendar,
IconChecklist,
IconLayoutKanban,
IconMail,
IconTicket,
} from "@tabler/icons-react";
import { File, Globe, Webhook } from "lucide-react";
@ -31,6 +32,7 @@ const documentTypeIcons: Record<string, IconComponent> = {
CONFLUENCE_CONNECTOR: IconBook,
CLICKUP_CONNECTOR: IconChecklist,
GOOGLE_CALENDAR_CONNECTOR: IconCalendar,
GOOGLE_GMAIL_CONNECTOR: IconMail,
};
export function getDocumentTypeIcon(type: string): IconComponent {

View file

@ -7,6 +7,7 @@ import {
IconCalendar,
IconLayoutKanban,
IconLinkPlus,
IconMail,
IconTicket,
} from "@tabler/icons-react";
import {
@ -59,6 +60,8 @@ export const getConnectorIcon = (connectorType: string) => {
return <IconTicket {...iconProps} />;
case "GOOGLE_CALENDAR_CONNECTOR":
return <IconCalendar {...iconProps} />;
case "GOOGLE_GMAIL_CONNECTOR":
return <IconMail {...iconProps} />;
case "DEEP":
return <Sparkles {...iconProps} />;
case "DEEPER":