update seach source connector schema

This commit is contained in:
CREDO23 2025-08-02 04:39:48 +02:00
parent 44d2338663
commit edf46e4de1
21 changed files with 1213 additions and 19 deletions

View file

@ -2,6 +2,7 @@ import asyncio
import logging
from datetime import UTC, datetime, timedelta
from google.oauth2.credentials import Credentials
from slack_sdk.errors import SlackApiError
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
@ -12,6 +13,7 @@ from app.connectors.clickup_connector import ClickUpConnector
from app.connectors.confluence_connector import ConfluenceConnector
from app.connectors.discord_connector import DiscordConnector
from app.connectors.github_connector import GitHubConnector
from app.connectors.google_calendar_connector import GoogleCalendarConnector
from app.connectors.jira_connector import JiraConnector
from app.connectors.linear_connector import LinearConnector
from app.connectors.notion_history import NotionHistoryConnector
@ -3012,3 +3014,362 @@ async def index_clickup_tasks(
)
logger.error(f"Failed to index ClickUp tasks: {e!s}", exc_info=True)
return 0, f"Failed to index ClickUp tasks: {e!s}"
async def index_google_calendar_events(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
) -> tuple[int, str | None]:
"""
Index Google Calendar events.
Args:
session: Database session
connector_id: ID of the Google Calendar connector
search_space_id: ID of the search space to store documents in
user_id: User ID
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="google_calendar_events_indexing",
source="connector_indexing_task",
message=f"Starting Google Calendar events indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"start_date": start_date,
"end_date": end_date,
},
)
try:
# Get the connector from the database
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found",
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return 0, f"Connector with ID {connector_id} not found"
# Get the Google Calendar credentials from the connector config
credentials = Credentials(
token=connector.config.get("token"),
refresh_token=connector.config.get("refresh_token"),
token_uri=connector.config.get("token_uri"),
client_id=connector.config.get("client_id"),
client_secret=connector.config.get("client_secret"),
scopes=connector.config.get("scopes"),
)
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google Calendar credentials not found in connector config for connector {connector_id}",
"Missing Google Calendar credentials",
{"error_type": "MissingCredentials"},
)
return 0, "Google Calendar credentials not found in connector config"
# Initialize Google Calendar client
await task_logger.log_task_progress(
log_entry,
f"Initializing Google Calendar client for connector {connector_id}",
{"stage": "client_initialization"},
)
calendar_client = GoogleCalendarConnector(credentials=credentials)
# Calculate date range
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now()
# Use last_indexed_at as start date if available, otherwise use 30 days ago
if connector.last_indexed_at:
# Convert dates to be comparable (both timezone-naive)
last_indexed_naive = (
connector.last_indexed_at.replace(tzinfo=None)
if connector.last_indexed_at.tzinfo
else connector.last_indexed_at
)
# Check if last_indexed_at is in the future or after end_date
if last_indexed_naive > calculated_end_date:
logger.warning(
f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 30 days ago instead."
)
calculated_start_date = calculated_end_date - timedelta(days=30)
else:
calculated_start_date = last_indexed_naive
logger.info(
f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
)
else:
calculated_start_date = calculated_end_date - timedelta(
days=30
) # Use 30 days as default for calendar events
logger.info(
f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (30 days ago) as start date"
)
# Use calculated dates if not provided
start_date_str = (
start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
)
end_date_str = (
end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
)
else:
# Use provided dates
start_date_str = start_date
end_date_str = end_date
await task_logger.log_task_progress(
log_entry,
f"Fetching Google Calendar events from {start_date_str} to {end_date_str}",
{
"stage": "fetching_events",
"start_date": start_date_str,
"end_date": end_date_str,
},
)
# Get events within date range from primary calendar
try:
events, error = calendar_client.get_all_primary_calendar_events(
start_date=start_date_str, end_date=end_date_str
)
if error:
logger.error(f"Failed to get Google Calendar events: {error}")
# Don't treat "No events found" as an error that should stop indexing
if "No events found" in error:
logger.info(
"No events found is not a critical error, continuing with update"
)
if update_last_indexed:
connector.last_indexed_at = datetime.now()
await session.commit()
logger.info(
f"Updated last_indexed_at to {connector.last_indexed_at} despite no events found"
)
await task_logger.log_task_success(
log_entry,
f"No Google Calendar events found in date range {start_date_str} to {end_date_str}",
{"events_found": 0},
)
return 0, None
else:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Google Calendar events: {error}",
"API Error",
{"error_type": "APIError"},
)
return 0, f"Failed to get Google Calendar events: {error}"
logger.info(f"Retrieved {len(events)} events from Google Calendar API")
except Exception as e:
logger.error(f"Error fetching Google Calendar events: {e!s}", exc_info=True)
return 0, f"Error fetching Google Calendar events: {e!s}"
# Process and index each event
documents_indexed = 0
skipped_events = []
documents_skipped = 0
for event in events:
try:
event_id = event.get("id")
event_summary = event.get("summary", "No Title")
calendar_id = event.get("calendarId", "")
if not event_id:
logger.warning(f"Skipping event with missing ID: {event_summary}")
skipped_events.append(f"{event_summary} (missing ID)")
documents_skipped += 1
continue
# Format event as markdown
event_markdown = calendar_client.format_event_to_markdown(event)
if not event_markdown.strip():
logger.warning(f"Skipping event with no content: {event_summary}")
skipped_events.append(f"{event_summary} (no content)")
documents_skipped += 1
continue
# Create a simple summary for the document
start = event.get("start", {})
end = event.get("end", {})
start_time = start.get("dateTime") or start.get("date", "")
end_time = end.get("dateTime") or end.get("date", "")
location = event.get("location", "")
description = event.get("description", "")
summary_content = f"Google Calendar Event: {event_summary}\n\n"
summary_content += f"Calendar: {calendar_id}\n"
summary_content += f"Start: {start_time}\n"
summary_content += f"End: {end_time}\n"
if location:
summary_content += f"Location: {location}\n"
if description:
# Take first 300 characters of description for summary
desc_preview = description[:300]
if len(description) > 300:
desc_preview += "..."
summary_content += f"Description: {desc_preview}\n"
# Generate content hash
content_hash = generate_content_hash(event_markdown, search_space_id)
# Check if document already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_document_by_hash = (
existing_doc_by_hash_result.scalars().first()
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for event {event_summary}. Skipping processing."
)
documents_skipped += 1
continue
# Generate embedding for the summary
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full event markdown
chunks = [
Chunk(
content=chunk.text,
embedding=config.embedding_model_instance.embed(chunk.text),
)
for chunk in config.chunker_instance.chunk(event_markdown)
]
# Create and store new document
logger.info(f"Creating new document for event {event_summary}")
document = Document(
search_space_id=search_space_id,
title=f"Calendar Event - {event_summary}",
document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR,
document_metadata={
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new event {event_summary}")
except Exception as e:
logger.error(
f"Error processing event {event.get('summary', 'Unknown')}: {e!s}",
exc_info=True,
)
skipped_events.append(
f"{event.get('summary', 'Unknown')} (processing error)"
)
documents_skipped += 1
continue # Skip this event and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if update_last_indexed:
connector.last_indexed_at = datetime.now()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
# Commit all changes
await session.commit()
logger.info(
"Successfully committed all Google Calendar document changes to database"
)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google Calendar indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_events_count": len(skipped_events),
},
)
logger.info(
f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Google Calendar indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Google Calendar events for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google Calendar events: {e!s}", exc_info=True)
return 0, f"Failed to index Google Calendar events: {e!s}"