fix scopes issues for google services

This commit is contained in:
CREDO23 2025-08-04 20:26:04 +02:00
parent 2655692db3
commit 69f6a0a278
2 changed files with 225 additions and 64 deletions

View file

@ -41,6 +41,7 @@ from app.tasks.connector_indexers import (
index_discord_messages,
index_github_repos,
index_google_calendar_events,
index_google_gmail_messages,
index_jira_issues,
index_linear_issues,
index_notion_pages,
@ -507,6 +508,22 @@ async def index_connector_content(
indexing_to,
)
response_message = "Google Calendar indexing started in the background."
elif (
connector.connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR
):
# Run indexing in background
logger.info(
f"Triggering Google Gmail indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
background_tasks.add_task(
run_google_gmail_indexing_with_new_session,
connector_id,
search_space_id,
str(user.id),
indexing_from,
indexing_to,
)
response_message = "Google Gmail indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
# Run indexing in background
@ -1113,3 +1130,62 @@ async def run_google_calendar_indexing(
exc_info=True,
)
# Optionally update status in DB to indicate failure
async def run_google_gmail_indexing_with_new_session(
connector_id: int,
search_space_id: int,
user_id: str,
max_messages: int,
days_back: int,
):
"""Wrapper to run Google Gmail indexing with its own database session."""
logger.info(
f"Background task started: Indexing Google Gmail connector {connector_id} into space {search_space_id} for {max_messages} messages from the last {days_back} days"
)
async with async_session_maker() as session:
await run_google_gmail_indexing(
session, connector_id, search_space_id, user_id, max_messages, days_back
)
logger.info(
f"Background task finished: Indexing Google Gmail connector {connector_id}"
)
async def run_google_gmail_indexing(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
max_messages: int,
days_back: int,
):
"""Runs the Google Gmail indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_google_gmail_messages(
session,
connector_id,
search_space_id,
user_id,
max_messages,
days_back,
update_last_indexed=False,
)
if error_message:
logger.error(
f"Google Gmail indexing failed for connector {connector_id}: {error_message}"
)
# Optionally update status in DB to indicate failure
else:
logger.info(
f"Google Gmail indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
)
# Update the last indexed timestamp only on success
await update_connector_last_indexed(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_google_gmail_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure

View file

@ -3381,8 +3381,10 @@ async def index_google_gmail_messages(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
max_messages: int = 100,
days_back: int = 30,
) -> tuple[int, str]:
"""
Index Gmail messages for a specific connector.
@ -3392,14 +3394,24 @@ async def index_google_gmail_messages(
connector_id: ID of the Gmail connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for filtering messages (YYYY-MM-DD format)
end_date: End date for filtering messages (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
max_messages: Maximum number of messages to fetch (default: 100)
days_back: Number of days to look back (default: 30)
Returns:
Tuple of (number_of_indexed_messages, status_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Calculate days back based on start_date
if start_date:
try:
start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
days_back = (datetime.now() - start_date_obj).days
except ValueError:
days_back = 30 # Default to 30 days if start_date is invalid
# Log task start
log_entry = await task_logger.log_task_start(
task_name="google_gmail_messages_indexing",
@ -3426,8 +3438,8 @@ async def index_google_gmail_messages(
if not connector:
error_msg = f"Gmail connector with ID {connector_id} not found"
await task_logger.log_task_completion(
log_entry.id, "FAILED", error_msg, {"error_type": "ConnectorNotFound"}
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
@ -3442,31 +3454,53 @@ async def index_google_gmail_messages(
scopes=config_data.get("scopes", []),
)
# Initialize Gmail connector
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google gmail credentials not found in connector config for connector {connector_id}",
"Missing Google gmail credentials",
{"error_type": "MissingCredentials"},
)
return 0, "Google gmail credentials not found in connector config"
# Initialize Google gmail client
await task_logger.log_task_progress(
log_entry,
f"Initializing Google gmail client for connector {connector_id}",
{"stage": "client_initialization"},
)
# Initialize Google gmail connector
gmail_connector = GoogleGmailConnector(credentials)
# Fetch recent messages
logger.info(f"Fetching recent Gmail messages for connector {connector_id}")
# Fetch recent Google gmail messages
logger.info(f"Fetching recent emails for connector {connector_id}")
messages, error = gmail_connector.get_recent_messages(
max_results=max_messages, days_back=days_back
)
if error:
await task_logger.log_task_completion(
log_entry.id, "FAILED", f"Failed to fetch messages: {error}", {}
await task_logger.log_task_failure(
log_entry, f"Failed to fetch messages: {error}", {}
)
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
success_msg = "No Gmail messages found in the specified date range"
await task_logger.log_task_completion(
log_entry.id, "SUCCESS", success_msg, {"messages_count": 0}
success_msg = "No Google gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
return 0, success_msg
logger.info(f"Found {len(messages)} Gmail messages to index")
logger.info(f"Found {len(messages)} Google gmail messages to index")
indexed_count = 0
documents_indexed = 0
skipped_messages = []
documents_skipped = 0
for message in messages:
try:
# Extract message information
@ -3491,23 +3525,58 @@ async def index_google_gmail_messages(
elif name == "date":
date_str = value
# Check if document already exists
existing_doc_result = await session.execute(
select(Document).filter(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR,
Document.document_metadata["message_id"].astext == message_id,
)
)
existing_doc = existing_doc_result.scalars().first()
if existing_doc:
logger.info(f"Gmail message {message_id} already indexed, skipping")
if not message_id:
logger.warning(f"Skipping message with missing ID: {subject}")
skipped_messages.append(f"{subject} (missing ID)")
documents_skipped += 1
continue
# Format message to markdown
markdown_content = gmail_connector.format_message_to_markdown(message)
if not markdown_content.strip():
logger.warning(f"Skipping message with no content: {subject}")
skipped_messages.append(f"{subject} (no content)")
documents_skipped += 1
continue
# Create a simple summary
summary_content = f"Google Gmail Message: {subject}\n\n"
summary_content += f"Sender: {sender}\n"
summary_content += f"Date: {date_str}\n"
# Generate content hash
content_hash = generate_content_hash(markdown_content, search_space_id)
# Check if document already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_document_by_hash = (
existing_doc_by_hash_result.scalars().first()
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for message {message_id}. Skipping processing."
)
documents_skipped += 1
continue
# Generate embedding for the summary
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = [
Chunk(
content=chunk.text,
embedding=config.embedding_model_instance.embed(chunk.text),
)
for chunk in config.chunker_instance.chunk(markdown_content)
]
# Create and store new document
logger.info(f"Creating new document for Gmail message: {subject}")
document = Document(
@ -3523,56 +3592,72 @@ async def index_google_gmail_messages(
"connector_id": connector_id,
},
content=markdown_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks,
)
session.add(document)
await session.flush()
# Create chunks for the document
chunks = config.chunker_instance.chunk(markdown_content)
for i, chunk_text in enumerate(chunks):
chunk = Chunk(
document_id=document.id,
content=chunk_text,
chunk_index=i,
embedding=config.embedding_model_instance.embed_query(
chunk_text
),
)
session.add(chunk)
indexed_count += 1
logger.info(f"Successfully indexed Gmail message: {subject}")
documents_indexed += 1
logger.info(f"Successfully indexed new email {summary_content}")
except Exception as e:
logger.error(
f"Error indexing Gmail message {message_id}: {e!s}", exc_info=True
f"Error processing the email {message_id}: {e!s}",
exc_info=True,
)
continue
skipped_messages.append(f"{subject} (processing error)")
documents_skipped += 1
continue # Skip this message and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if update_last_indexed:
connector.last_indexed_at = datetime.now()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
# Commit all changes
await session.commit()
# Update connector's last_indexed_at timestamp
connector.last_indexed_at = datetime.now(UTC)
await session.commit()
success_msg = f"Successfully indexed {indexed_count} Gmail messages"
await task_logger.log_task_completion(
log_entry.id,
"SUCCESS",
success_msg,
{"indexed_count": indexed_count, "total_messages": len(messages)},
logger.info(
"Successfully committed all Google gmail document changes to database"
)
logger.info(success_msg)
return indexed_count, success_msg
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google gmail indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_messages_count": len(skipped_messages),
},
)
logger.info(
f"Google gmail indexing completed: {documents_indexed} new emails, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Google gmail indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
except Exception as e:
await task_logger.log_task_completion(
log_entry.id,
"FAILED",
f"Failed to index Gmail messages for connector {connector_id}",
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Google gmail emails for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Gmail messages: {e!s}", exc_info=True)
return 0, f"Failed to index Gmail messages: {e!s}"
logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True)
return 0, f"Failed to index Google gmail emails: {e!s}"