support google mail indexing

This commit is contained in:
CREDO23 2025-08-04 00:57:37 +02:00
parent 93e5887a87
commit b5d5e40d51

View file

@ -12,6 +12,7 @@ from app.connectors.clickup_connector import ClickUpConnector
from app.connectors.confluence_connector import ConfluenceConnector
from app.connectors.discord_connector import DiscordConnector
from app.connectors.github_connector import GitHubConnector
from app.connectors.google_gmail_connector import GoogleGmailConnector
from app.connectors.jira_connector import JiraConnector
from app.connectors.linear_connector import LinearConnector
from app.connectors.notion_history import NotionHistoryConnector
@ -3012,3 +3013,205 @@ async def index_clickup_tasks(
)
logger.error(f"Failed to index ClickUp tasks: {e!s}", exc_info=True)
return 0, f"Failed to index ClickUp tasks: {e!s}"
async def index_google_gmail_messages(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
max_messages: int = 100,
days_back: int = 30,
) -> tuple[int, str]:
"""
Index Gmail messages for a specific connector.
Args:
session: Database session
connector_id: ID of the Gmail connector
search_space_id: ID of the search space
user_id: ID of the user
max_messages: Maximum number of messages to fetch (default: 100)
days_back: Number of days to look back (default: 30)
Returns:
Tuple of (number_of_indexed_messages, status_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="google_gmail_messages_indexing",
source="connector_indexing_task",
message=f"Starting Gmail messages indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"max_messages": max_messages,
"days_back": days_back,
},
)
try:
# Get the connector from the database
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
error_msg = f"Gmail connector with ID {connector_id} not found"
await task_logger.log_task_completion(
log_entry.id, "FAILED", error_msg, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
# Create credentials from connector config
config_data = connector.config
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
)
# Initialize Gmail connector
gmail_connector = GoogleGmailConnector(credentials)
# Fetch recent messages
logger.info(f"Fetching recent Gmail messages for connector {connector_id}")
messages, error = gmail_connector.get_recent_messages(
max_results=max_messages, days_back=days_back
)
if error:
await task_logger.log_task_completion(
log_entry.id, "FAILED", f"Failed to fetch messages: {error}", {}
)
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
success_msg = "No Gmail messages found in the specified date range"
await task_logger.log_task_completion(
log_entry.id, "SUCCESS", success_msg, {"messages_count": 0}
)
return 0, success_msg
logger.info(f"Found {len(messages)} Gmail messages to index")
indexed_count = 0
for message in messages:
try:
# Extract message information
message_id = message.get("id", "")
thread_id = message.get("threadId", "")
# Extract headers for subject and sender
payload = message.get("payload", {})
headers = payload.get("headers", [])
subject = "No Subject"
sender = "Unknown Sender"
date_str = "Unknown Date"
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
if name == "subject":
subject = value
elif name == "from":
sender = value
elif name == "date":
date_str = value
# Check if document already exists
existing_doc_result = await session.execute(
select(Document).filter(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR,
Document.document_metadata["message_id"].astext == message_id,
)
)
existing_doc = existing_doc_result.scalars().first()
if existing_doc:
logger.info(f"Gmail message {message_id} already indexed, skipping")
continue
# Format message to markdown
markdown_content = gmail_connector.format_message_to_markdown(message)
# Create and store new document
logger.info(f"Creating new document for Gmail message: {subject}")
document = Document(
search_space_id=search_space_id,
title=f"Gmail: {subject}",
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
document_metadata={
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
},
content=markdown_content,
)
session.add(document)
await session.flush()
# Create chunks for the document
chunks = config.chunker_instance.chunk(markdown_content)
for i, chunk_text in enumerate(chunks):
chunk = Chunk(
document_id=document.id,
content=chunk_text,
chunk_index=i,
embedding=config.embedding_model_instance.embed_query(
chunk_text
),
)
session.add(chunk)
indexed_count += 1
logger.info(f"Successfully indexed Gmail message: {subject}")
except Exception as e:
logger.error(
f"Error indexing Gmail message {message_id}: {e!s}", exc_info=True
)
continue
# Commit all changes
await session.commit()
# Update connector's last_indexed_at timestamp
connector.last_indexed_at = datetime.now(UTC)
await session.commit()
success_msg = f"Successfully indexed {indexed_count} Gmail messages"
await task_logger.log_task_completion(
log_entry.id,
"SUCCESS",
success_msg,
{"indexed_count": indexed_count, "total_messages": len(messages)},
)
logger.info(success_msg)
return indexed_count, success_msg
except Exception as e:
await task_logger.log_task_completion(
log_entry.id,
"FAILED",
f"Failed to index Gmail messages for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Gmail messages: {e!s}", exc_info=True)
return 0, f"Failed to index Gmail messages: {e!s}"