use new indexer files structureclear

This commit is contained in:
CREDO23 2025-08-15 10:11:50 +02:00
parent 869f848179
commit 089c9d1625
3 changed files with 303 additions and 3664 deletions

View file

@ -14,6 +14,7 @@ Available indexers:
- Confluence: Index pages from Confluence spaces
- Discord: Index messages from Discord servers
- ClickUp: Index tasks from ClickUp workspaces
- Google Gmail: Index messages from Google Gmail
- Google Calendar: Index events from Google Calendar
"""
@ -27,6 +28,7 @@ from .github_indexer import index_github_repos
# Calendar and scheduling
from .google_calendar_indexer import index_google_calendar_events
from .google_gmail_indexer import index_google_gmail_messages
from .jira_indexer import index_jira_issues
# Issue tracking and project management
@ -36,7 +38,7 @@ from .linear_indexer import index_linear_issues
from .notion_indexer import index_notion_pages
from .slack_indexer import index_slack_messages
__all__ = [
__all__ = [ # noqa: RUF022
"index_clickup_tasks",
"index_confluence_pages",
"index_discord_messages",
@ -51,4 +53,5 @@ __all__ = [
"index_notion_pages",
# Communication platforms
"index_slack_messages",
"index_google_gmail_messages",
]

View file

@ -0,0 +1,299 @@
"""
Google Gmail connector indexer.
"""
from datetime import datetime
from google.oauth2.credentials import Credentials
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.google_gmail_connector import GoogleGmailConnector
from app.db import (
Document,
DocumentType,
SearchSourceConnectorType,
)
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import generate_content_hash
from .base import (
check_duplicate_document_by_hash,
create_document_chunks,
get_connector_by_id,
logger,
update_connector_last_indexed,
)
async def index_google_gmail_messages(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
max_messages: int = 100,
) -> tuple[int, str]:
"""
Index Gmail messages for a specific connector.
Args:
session: Database session
connector_id: ID of the Gmail connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for filtering messages (YYYY-MM-DD format)
end_date: End date for filtering messages (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
max_messages: Maximum number of messages to fetch (default: 100)
Returns:
Tuple of (number_of_indexed_messages, status_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Calculate days back based on start_date
if start_date:
try:
start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
days_back = (datetime.now() - start_date_obj).days
except ValueError:
days_back = 30 # Default to 30 days if start_date is invalid
# Log task start
log_entry = await task_logger.log_task_start(
task_name="google_gmail_messages_indexing",
source="connector_indexing_task",
message=f"Starting Gmail messages indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"max_messages": max_messages,
"days_back": days_back,
},
)
try:
# Get connector by id
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR
)
if not connector:
error_msg = f"Gmail connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
# Create credentials from connector config
config_data = connector.config
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
)
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google gmail credentials not found in connector config for connector {connector_id}",
"Missing Google gmail credentials",
{"error_type": "MissingCredentials"},
)
return 0, "Google gmail credentials not found in connector config"
# Initialize Google gmail client
await task_logger.log_task_progress(
log_entry,
f"Initializing Google gmail client for connector {connector_id}",
{"stage": "client_initialization"},
)
# Initialize Google gmail connector
gmail_connector = GoogleGmailConnector(credentials)
# Fetch recent Google gmail messages
logger.info(f"Fetching recent emails for connector {connector_id}")
messages, error = gmail_connector.get_recent_messages(
max_results=max_messages, days_back=days_back
)
if error:
await task_logger.log_task_failure(
log_entry, f"Failed to fetch messages: {error}", {}
)
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
success_msg = "No Google gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
return 0, success_msg
logger.info(f"Found {len(messages)} Google gmail messages to index")
documents_indexed = 0
skipped_messages = []
documents_skipped = 0
for message in messages:
try:
# Extract message information
message_id = message.get("id", "")
thread_id = message.get("threadId", "")
# Extract headers for subject and sender
payload = message.get("payload", {})
headers = payload.get("headers", [])
subject = "No Subject"
sender = "Unknown Sender"
date_str = "Unknown Date"
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
if name == "subject":
subject = value
elif name == "from":
sender = value
elif name == "date":
date_str = value
if not message_id:
logger.warning(f"Skipping message with missing ID: {subject}")
skipped_messages.append(f"{subject} (missing ID)")
documents_skipped += 1
continue
# Format message to markdown
markdown_content = gmail_connector.format_message_to_markdown(message)
if not markdown_content.strip():
logger.warning(f"Skipping message with no content: {subject}")
skipped_messages.append(f"{subject} (no content)")
documents_skipped += 1
continue
# Create a simple summary
summary_content = f"Google Gmail Message: {subject}\n\n"
summary_content += f"Sender: {sender}\n"
summary_content += f"Date: {date_str}\n"
# Generate content hash
content_hash = generate_content_hash(markdown_content, search_space_id)
# Check if document already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for message {message_id}. Skipping processing."
)
documents_skipped += 1
continue
# Generate embedding for the summary
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Create and store new document
logger.info(f"Creating new document for Gmail message: {subject}")
document = Document(
search_space_id=search_space_id,
title=f"Gmail: {subject}",
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
document_metadata={
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
},
content=markdown_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new email {summary_content}")
except Exception as e:
logger.error(
f"Error processing the email {message_id}: {e!s}",
exc_info=True,
)
skipped_messages.append(f"{subject} (processing error)")
documents_skipped += 1
continue # Skip this message and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all changes
await session.commit()
logger.info(
"Successfully committed all Google gmail document changes to database"
)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google gmail indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_messages_count": len(skipped_messages),
},
)
logger.info(
f"Google gmail indexing completed: {documents_indexed} new emails, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Google gmail indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Google gmail emails for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True)
return 0, f"Failed to index Google gmail emails: {e!s}"

File diff suppressed because it is too large Load diff