refactor: unify all 3 google Composio and non-Composio connector types and pipelines keeping same credential adapters

This commit is contained in:
Anish Sarkar 2026-03-19 05:08:21 +05:30
parent 6c37b563c0
commit 83152e8e7e
24 changed files with 633 additions and 3596 deletions

View file

@ -65,10 +65,10 @@ _CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = {
"BOOKSTACK_CONNECTOR": "BOOKSTACK_CONNECTOR",
"CIRCLEBACK_CONNECTOR": "CIRCLEBACK", # Connector type differs from document type
"OBSIDIAN_CONNECTOR": "OBSIDIAN_CONNECTOR",
# Composio connectors
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"COMPOSIO_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
# Composio connectors (unified to native document types)
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "GOOGLE_DRIVE_FILE",
"COMPOSIO_GMAIL_CONNECTOR": "GOOGLE_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "GOOGLE_CALENDAR_CONNECTOR",
}
# Document types that don't come from SearchSourceConnector but should always be searchable

View file

@ -157,14 +157,18 @@ def create_create_google_drive_file_tool(
from app.db import SearchSourceConnector, SearchSourceConnectorType
_DRIVE_TYPES = [
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]
if final_connector_id is not None:
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnector.connector_type.in_(_DRIVE_TYPES),
)
)
connector = result.scalars().first()
@ -179,8 +183,7 @@ def create_create_google_drive_file_tool(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnector.connector_type.in_(_DRIVE_TYPES),
)
)
connector = result.scalars().first()
@ -194,8 +197,19 @@ def create_create_google_drive_file_tool(
logger.info(
f"Creating Google Drive file: name='{final_name}', type='{final_file_type}', connector={actual_connector_id}"
)
pre_built_creds = None
if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR:
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
pre_built_creds = build_composio_credentials(cca_id)
client = GoogleDriveClient(
session=db_session, connector_id=actual_connector_id
session=db_session,
connector_id=actual_connector_id,
credentials=pre_built_creds,
)
try:
created = await client.create_file(

View file

@ -151,13 +151,17 @@ def create_delete_google_drive_file_tool(
from app.db import SearchSourceConnector, SearchSourceConnectorType
_DRIVE_TYPES = [
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnector.connector_type.in_(_DRIVE_TYPES),
)
)
connector = result.scalars().first()
@ -170,7 +174,20 @@ def create_delete_google_drive_file_tool(
logger.info(
f"Deleting Google Drive file: file_id='{final_file_id}', connector={final_connector_id}"
)
client = GoogleDriveClient(session=db_session, connector_id=connector.id)
pre_built_creds = None
if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR:
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
pre_built_creds = build_composio_credentials(cca_id)
client = GoogleDriveClient(
session=db_session,
connector_id=connector.id,
credentials=pre_built_creds,
)
try:
await client.trash_file(file_id=final_file_id)
except HttpError as http_err:

View file

@ -60,7 +60,7 @@ def _is_degenerate_query(query: str) -> bool:
async def _browse_recent_documents(
search_space_id: int,
document_type: str | None,
document_type: str | list[str] | None,
top_k: int,
start_date: datetime | None,
end_date: datetime | None,
@ -83,14 +83,22 @@ async def _browse_recent_documents(
base_conditions = [Document.search_space_id == search_space_id]
if document_type is not None:
if isinstance(document_type, str):
try:
doc_type_enum = DocumentType[document_type]
base_conditions.append(Document.document_type == doc_type_enum)
except KeyError:
return []
type_list = document_type if isinstance(document_type, list) else [document_type]
doc_type_enums = []
for dt in type_list:
if isinstance(dt, str):
try:
doc_type_enums.append(DocumentType[dt])
except KeyError:
pass
else:
doc_type_enums.append(dt)
if not doc_type_enums:
return []
if len(doc_type_enums) == 1:
base_conditions.append(Document.document_type == doc_type_enums[0])
else:
base_conditions.append(Document.document_type == document_type)
base_conditions.append(Document.document_type.in_(doc_type_enums))
if start_date is not None:
base_conditions.append(Document.updated_at >= start_date)
@ -195,10 +203,6 @@ _ALL_CONNECTORS: list[str] = [
"CRAWLED_URL",
"CIRCLEBACK",
"OBSIDIAN_CONNECTOR",
# Composio connectors
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"COMPOSIO_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
]
# Human-readable descriptions for each connector type
@ -228,10 +232,6 @@ CONNECTOR_DESCRIPTIONS: dict[str, str] = {
"BOOKSTACK_CONNECTOR": "BookStack pages (personal documentation)",
"CIRCLEBACK": "Circleback meeting notes, transcripts, and action items",
"OBSIDIAN_CONNECTOR": "Obsidian vault notes and markdown files (personal notes)",
# Composio connectors
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "Google Drive files via Composio (personal cloud storage)",
"COMPOSIO_GMAIL_CONNECTOR": "Gmail emails via Composio (personal emails)",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "Google Calendar events via Composio (personal calendar)",
}
@ -654,6 +654,20 @@ async def search_knowledge_base_async(
)
browse_connectors = connectors if connectors else [None] # type: ignore[list-item]
# Expand native Google types to include legacy Composio equivalents
# so old documents remain searchable until re-indexed.
_LEGACY_ALIASES: dict[str, str] = {
"GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
expanded_browse = []
for c in browse_connectors:
if c is not None and c in _LEGACY_ALIASES:
expanded_browse.append([c, _LEGACY_ALIASES[c]])
else:
expanded_browse.append(c)
browse_results = await asyncio.gather(
*[
_browse_recent_documents(
@ -663,7 +677,7 @@ async def search_knowledge_base_async(
start_date=resolved_start_date,
end_date=resolved_end_date,
)
for c in browse_connectors
for c in expanded_browse
]
)
for docs in browse_results:

View file

@ -1,719 +0,0 @@
"""
Composio Gmail Connector Module.
Provides Gmail specific methods for data retrieval and indexing via Composio.
"""
import logging
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from typing import Any
from bs4 import BeautifulSoup
from markdownify import markdownify as md
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from app.connectors.composio_connector import ComposioConnector
from app.db import Document, DocumentStatus, DocumentType
from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
calculate_date_range,
check_duplicate_document_by_hash,
safe_set_chunks,
)
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
# Heartbeat configuration
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
HEARTBEAT_INTERVAL_SECONDS = 30
logger = logging.getLogger(__name__)
def get_current_timestamp() -> datetime:
"""Get the current timestamp with timezone for updated_at field."""
return datetime.now(UTC)
async def check_document_by_unique_identifier(
session: AsyncSession, unique_identifier_hash: str
) -> Document | None:
"""Check if a document with the given unique identifier hash already exists."""
existing_doc_result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
.where(Document.unique_identifier_hash == unique_identifier_hash)
)
return existing_doc_result.scalars().first()
async def update_connector_last_indexed(
session: AsyncSession,
connector,
update_last_indexed: bool = True,
) -> None:
"""Update the last_indexed_at timestamp for a connector."""
if update_last_indexed:
connector.last_indexed_at = datetime.now(UTC)
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
class ComposioGmailConnector(ComposioConnector):
"""
Gmail specific Composio connector.
Provides methods for listing messages, getting message details, and formatting
Gmail messages from Gmail via Composio.
"""
async def list_gmail_messages(
self,
query: str = "",
max_results: int = 50,
page_token: str | None = None,
) -> tuple[list[dict[str, Any]], str | None, int | None, str | None]:
"""
List Gmail messages via Composio with pagination support.
Args:
query: Gmail search query.
max_results: Maximum number of messages per page (default: 50).
page_token: Optional pagination token for next page.
Returns:
Tuple of (messages list, next_page_token, result_size_estimate, error message).
"""
connected_account_id = await self.get_connected_account_id()
if not connected_account_id:
return [], None, None, "No connected account ID found"
entity_id = await self.get_entity_id()
service = await self._get_service()
return await service.get_gmail_messages(
connected_account_id=connected_account_id,
entity_id=entity_id,
query=query,
max_results=max_results,
page_token=page_token,
)
async def get_gmail_message_detail(
self, message_id: str
) -> tuple[dict[str, Any] | None, str | None]:
"""
Get full details of a Gmail message via Composio.
Args:
message_id: Gmail message ID.
Returns:
Tuple of (message details, error message).
"""
connected_account_id = await self.get_connected_account_id()
if not connected_account_id:
return None, "No connected account ID found"
entity_id = await self.get_entity_id()
service = await self._get_service()
return await service.get_gmail_message_detail(
connected_account_id=connected_account_id,
entity_id=entity_id,
message_id=message_id,
)
@staticmethod
def _html_to_markdown(html: str) -> str:
"""Convert HTML (especially email layouts with nested tables) to clean markdown."""
soup = BeautifulSoup(html, "html.parser")
for tag in soup.find_all(["style", "script", "img"]):
tag.decompose()
for tag in soup.find_all(
["table", "thead", "tbody", "tfoot", "tr", "td", "th"]
):
tag.unwrap()
return md(str(soup)).strip()
def format_gmail_message_to_markdown(self, message: dict[str, Any]) -> str:
"""
Format a Gmail message to markdown.
Args:
message: Message object from Composio's GMAIL_FETCH_EMAILS response.
Composio structure: messageId, messageText, messageTimestamp,
payload.headers, labelIds, attachmentList
Returns:
Formatted markdown string.
"""
try:
# Composio uses 'messageId' (camelCase)
message_id = message.get("messageId", "") or message.get("id", "")
label_ids = message.get("labelIds", [])
# Extract headers from payload
payload = message.get("payload", {})
headers = payload.get("headers", [])
# Parse headers into a dict
header_dict = {}
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
header_dict[name] = value
# Extract key information
subject = header_dict.get("subject", "No Subject")
from_email = header_dict.get("from", "Unknown Sender")
to_email = header_dict.get("to", "Unknown Recipient")
# Composio provides messageTimestamp directly
date_str = message.get("messageTimestamp", "") or header_dict.get(
"date", "Unknown Date"
)
# Build markdown content
markdown_content = f"# {subject}\n\n"
markdown_content += f"**From:** {from_email}\n"
markdown_content += f"**To:** {to_email}\n"
markdown_content += f"**Date:** {date_str}\n"
if label_ids:
markdown_content += f"**Labels:** {', '.join(label_ids)}\n"
markdown_content += "\n---\n\n"
# Composio provides full message text in 'messageText' which is often raw HTML
message_text = message.get("messageText", "")
if message_text:
message_text = self._html_to_markdown(message_text)
markdown_content += f"## Content\n\n{message_text}\n\n"
else:
# Fallback to snippet if no messageText
snippet = message.get("snippet", "")
if snippet:
markdown_content += f"## Preview\n\n{snippet}\n\n"
# Add attachment info if present
attachments = message.get("attachmentList", [])
if attachments:
markdown_content += "## Attachments\n\n"
for att in attachments:
att_name = att.get("filename", att.get("name", "Unknown"))
markdown_content += f"- {att_name}\n"
markdown_content += "\n"
# Add message metadata
markdown_content += "## Message Details\n\n"
markdown_content += f"- **Message ID:** {message_id}\n"
return markdown_content
except Exception as e:
return f"Error formatting message to markdown: {e!s}"
# ============ Indexer Functions ============
async def _analyze_gmail_messages_phase1(
session: AsyncSession,
messages: list[dict[str, Any]],
composio_connector: ComposioGmailConnector,
connector_id: int,
search_space_id: int,
user_id: str,
) -> tuple[list[dict[str, Any]], int, int]:
"""
Phase 1: Analyze all messages, create pending documents.
Makes ALL documents visible in the UI immediately with pending status.
Returns:
Tuple of (messages_to_process, documents_skipped, duplicate_content_count)
"""
messages_to_process = []
documents_skipped = 0
duplicate_content_count = 0
for message in messages:
try:
# Composio uses 'messageId' (camelCase), not 'id'
message_id = message.get("messageId", "") or message.get("id", "")
if not message_id:
documents_skipped += 1
continue
# Extract message info from Composio response
payload = message.get("payload", {})
headers = payload.get("headers", [])
subject = "No Subject"
sender = "Unknown Sender"
date_str = message.get("messageTimestamp", "Unknown Date")
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
if name == "subject":
subject = value
elif name == "from":
sender = value
elif name == "date":
date_str = value
# Format to markdown using the full message data
markdown_content = composio_connector.format_gmail_message_to_markdown(
message
)
# Check for empty content
if not markdown_content.strip():
logger.warning(f"Skipping Gmail message with no content: {subject}")
documents_skipped += 1
continue
# Generate unique identifier
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"gmail_{message_id}", search_space_id
)
content_hash = generate_content_hash(markdown_content, search_space_id)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get label IDs and thread_id from Composio response
label_ids = message.get("labelIds", [])
thread_id = message.get("threadId", "") or message.get("thread_id", "")
if existing_document:
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append(
{
"document": existing_document,
"is_new": False,
"markdown_content": markdown_content,
"content_hash": content_hash,
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date_str": date_str,
"label_ids": label_ids,
}
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from standard connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Message {subject} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=subject,
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]),
document_metadata={
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"labels": label_ids,
"connector_id": connector_id,
"toolkit_id": "gmail",
"source": "composio",
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
messages_to_process.append(
{
"document": document,
"is_new": True,
"markdown_content": markdown_content,
"content_hash": content_hash,
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date_str": date_str,
"label_ids": label_ids,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True)
documents_skipped += 1
continue
return messages_to_process, documents_skipped, duplicate_content_count
async def _process_gmail_messages_phase2(
session: AsyncSession,
messages_to_process: list[dict[str, Any]],
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int]:
"""
Phase 2: Process each document one by one.
Each document transitions: pending processing ready/failed
Returns:
Tuple of (documents_indexed, documents_failed)
"""
documents_indexed = 0
documents_failed = 0
last_heartbeat_time = time.time()
for item in messages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm and enable_summary:
document_metadata_for_summary = {
"message_id": item["message_id"],
"thread_id": item["thread_id"],
"subject": item["subject"],
"sender": item["sender"],
"document_type": "Gmail Message (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
item["markdown_content"], user_llm, document_metadata_for_summary
)
else:
summary_content = f"Gmail: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}\n\n{item['markdown_content']}"
summary_embedding = embed_text(summary_content)
chunks = await create_document_chunks(item["markdown_content"])
# Update document to READY with actual content
document.title = item["subject"]
document.content = summary_content
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"message_id": item["message_id"],
"thread_id": item["thread_id"],
"subject": item["subject"],
"sender": item["sender"],
"date": item["date_str"],
"labels": item["label_ids"],
"connector_id": connector_id,
"source": "composio",
}
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Gmail messages processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Gmail message: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue
return documents_indexed, documents_failed
async def index_composio_gmail(
session: AsyncSession,
connector,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None,
end_date: str | None,
task_logger: TaskLoggingService,
log_entry,
update_last_indexed: bool = True,
max_items: int = 1000,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str]:
"""Index Gmail messages via Composio with real-time document status updates."""
try:
composio_connector = ComposioGmailConnector(session, connector_id)
# Normalize date values - handle "undefined" strings from frontend
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
# Use provided dates directly if both are provided, otherwise calculate from last_indexed_at
if start_date is not None and end_date is not None:
start_date_str = start_date
end_date_str = end_date
else:
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Build query with date range
query_parts = []
if start_date_str:
query_parts.append(f"after:{start_date_str.replace('-', '/')}")
if end_date_str:
query_parts.append(f"before:{end_date_str.replace('-', '/')}")
query = " ".join(query_parts) if query_parts else ""
logger.info(
f"Gmail query for connector {connector_id}: '{query}' "
f"(start_date={start_date_str}, end_date={end_date_str})"
)
await task_logger.log_task_progress(
log_entry,
f"Fetching Gmail messages via Composio for connector {connector_id}",
{"stage": "fetching_messages"},
)
# =======================================================================
# FETCH ALL MESSAGES FIRST
# =======================================================================
batch_size = 50
page_token = None
all_messages = []
result_size_estimate = None
last_heartbeat_time = time.time()
while len(all_messages) < max_items:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(len(all_messages))
last_heartbeat_time = current_time
remaining = max_items - len(all_messages)
current_batch_size = min(batch_size, remaining)
(
messages,
next_token,
result_size_estimate_batch,
error,
) = await composio_connector.list_gmail_messages(
query=query,
max_results=current_batch_size,
page_token=page_token,
)
if error:
await task_logger.log_task_failure(
log_entry, f"Failed to fetch Gmail messages: {error}", {}
)
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
break
if result_size_estimate is None and result_size_estimate_batch is not None:
result_size_estimate = result_size_estimate_batch
logger.info(
f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'"
)
all_messages.extend(messages)
logger.info(
f"Fetched {len(messages)} messages (total: {len(all_messages)})"
)
if not next_token or len(messages) < current_batch_size:
break
page_token = next_token
if not all_messages:
success_msg = "No Gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return (
0,
None,
) # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(all_messages)} Gmail messages to index via Composio")
# =======================================================================
# PHASE 1: Analyze all messages, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
await task_logger.log_task_progress(
log_entry,
f"Phase 1: Creating pending documents for {len(all_messages)} messages",
{"stage": "phase1_pending"},
)
(
messages_to_process,
documents_skipped,
duplicate_content_count,
) = await _analyze_gmail_messages_phase1(
session=session,
messages=all_messages,
composio_connector=composio_connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
# Commit all pending documents - they all appear in UI now
new_documents_count = len([m for m in messages_to_process if m["is_new"]])
if new_documents_count > 0:
logger.info(f"Phase 1: Committing {new_documents_count} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
await task_logger.log_task_progress(
log_entry,
f"Phase 2: Processing {len(messages_to_process)} documents",
{"stage": "phase2_processing"},
)
documents_indexed, documents_failed = await _process_gmail_messages_phase2(
session=session,
messages_to_process=messages_to_process,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=getattr(connector, "enable_summary", False),
on_heartbeat_callback=on_heartbeat_callback,
)
# CRITICAL: Always update timestamp so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted
logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed")
try:
await session.commit()
logger.info(
"Successfully committed all Composio Gmail document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
await task_logger.log_task_success(
log_entry,
f"Successfully completed Gmail indexing via Composio for connector {connector_id}",
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"Composio Gmail indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return documents_indexed, warning_message
except Exception as e:
logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True)
return 0, f"Failed to index Gmail via Composio: {e!s}"

View file

@ -1,566 +0,0 @@
"""
Composio Google Calendar Connector Module.
Provides Google Calendar specific methods for data retrieval and indexing via Composio.
"""
import logging
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from app.connectors.composio_connector import ComposioConnector
from app.db import Document, DocumentStatus, DocumentType
from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
calculate_date_range,
check_duplicate_document_by_hash,
safe_set_chunks,
)
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
# Heartbeat configuration
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
HEARTBEAT_INTERVAL_SECONDS = 30
logger = logging.getLogger(__name__)
def get_current_timestamp() -> datetime:
"""Get the current timestamp with timezone for updated_at field."""
return datetime.now(UTC)
async def check_document_by_unique_identifier(
session: AsyncSession, unique_identifier_hash: str
) -> Document | None:
"""Check if a document with the given unique identifier hash already exists."""
existing_doc_result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
.where(Document.unique_identifier_hash == unique_identifier_hash)
)
return existing_doc_result.scalars().first()
async def update_connector_last_indexed(
session: AsyncSession,
connector,
update_last_indexed: bool = True,
) -> None:
"""Update the last_indexed_at timestamp for a connector."""
if update_last_indexed:
connector.last_indexed_at = datetime.now(UTC)
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
class ComposioGoogleCalendarConnector(ComposioConnector):
"""
Google Calendar specific Composio connector.
Provides methods for listing calendar events and formatting them from
Google Calendar via Composio.
"""
async def list_calendar_events(
self,
time_min: str | None = None,
time_max: str | None = None,
max_results: int = 250,
) -> tuple[list[dict[str, Any]], str | None]:
"""
List Google Calendar events via Composio.
Args:
time_min: Start time (RFC3339 format).
time_max: End time (RFC3339 format).
max_results: Maximum number of events.
Returns:
Tuple of (events list, error message).
"""
connected_account_id = await self.get_connected_account_id()
if not connected_account_id:
return [], "No connected account ID found"
entity_id = await self.get_entity_id()
service = await self._get_service()
return await service.get_calendar_events(
connected_account_id=connected_account_id,
entity_id=entity_id,
time_min=time_min,
time_max=time_max,
max_results=max_results,
)
def format_calendar_event_to_markdown(self, event: dict[str, Any]) -> str:
"""
Format a Google Calendar event to markdown.
Args:
event: Event object from Google Calendar API.
Returns:
Formatted markdown string.
"""
try:
# Extract basic event information
summary = event.get("summary", "No Title")
description = event.get("description", "")
location = event.get("location", "")
# Extract start and end times
start = event.get("start", {})
end = event.get("end", {})
start_time = start.get("dateTime") or start.get("date", "")
end_time = end.get("dateTime") or end.get("date", "")
# Format times for display
def format_time(time_str: str) -> str:
if not time_str:
return "Unknown"
try:
if "T" in time_str:
dt = datetime.fromisoformat(time_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M")
return time_str
except Exception:
return time_str
start_formatted = format_time(start_time)
end_formatted = format_time(end_time)
# Extract attendees
attendees = event.get("attendees", [])
attendee_list = []
for attendee in attendees:
email = attendee.get("email", "")
display_name = attendee.get("displayName", email)
response_status = attendee.get("responseStatus", "")
attendee_list.append(f"- {display_name} ({response_status})")
# Build markdown content
markdown_content = f"# {summary}\n\n"
markdown_content += f"**Start:** {start_formatted}\n"
markdown_content += f"**End:** {end_formatted}\n"
if location:
markdown_content += f"**Location:** {location}\n"
markdown_content += "\n"
if description:
markdown_content += f"## Description\n\n{description}\n\n"
if attendee_list:
markdown_content += "## Attendees\n\n"
markdown_content += "\n".join(attendee_list)
markdown_content += "\n\n"
# Add event metadata
markdown_content += "## Event Details\n\n"
markdown_content += f"- **Event ID:** {event.get('id', 'Unknown')}\n"
markdown_content += f"- **Created:** {event.get('created', 'Unknown')}\n"
markdown_content += f"- **Updated:** {event.get('updated', 'Unknown')}\n"
return markdown_content
except Exception as e:
return f"Error formatting event to markdown: {e!s}"
# ============ Indexer Functions ============
async def index_composio_google_calendar(
session: AsyncSession,
connector,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None,
end_date: str | None,
task_logger: TaskLoggingService,
log_entry,
update_last_indexed: bool = True,
max_items: int = 2500,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str]:
"""Index Google Calendar events via Composio."""
try:
composio_connector = ComposioGoogleCalendarConnector(session, connector_id)
await task_logger.log_task_progress(
log_entry,
f"Fetching Google Calendar events via Composio for connector {connector_id}",
{"stage": "fetching_events"},
)
# Normalize date values - handle "undefined" strings from frontend
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
# Use provided dates directly if both are provided, otherwise calculate from last_indexed_at
# This ensures user-selected dates are respected (matching non-Composio Calendar connector behavior)
if start_date is not None and end_date is not None:
# User provided both dates - use them directly
start_date_str = start_date
end_date_str = end_date
else:
# Calculate date range with defaults (uses last_indexed_at or 365 days back)
# This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Build time range for API call
time_min = f"{start_date_str}T00:00:00Z"
time_max = f"{end_date_str}T23:59:59Z"
logger.info(
f"Google Calendar query for connector {connector_id}: "
f"(start_date={start_date_str}, end_date={end_date_str})"
)
events, error = await composio_connector.list_calendar_events(
time_min=time_min,
time_max=time_max,
max_results=max_items,
)
if error:
await task_logger.log_task_failure(
log_entry, f"Failed to fetch Calendar events: {error}", {}
)
return 0, f"Failed to fetch Calendar events: {error}"
if not events:
success_msg = "No Google Calendar events found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"events_count": 0}
)
# CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return (
0,
None,
) # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(events)} Google Calendar events to index via Composio")
documents_indexed = 0
documents_skipped = 0
documents_failed = 0 # Track events that failed processing
duplicate_content_count = (
0 # Track events skipped due to duplicate content_hash
)
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all events, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
events_to_process = [] # List of dicts with document and event data
new_documents_created = False
for event in events:
try:
# Handle both standard Google API and potential Composio variations
event_id = event.get("id", "") or event.get("eventId", "")
summary = (
event.get("summary", "") or event.get("title", "") or "No Title"
)
if not event_id:
documents_skipped += 1
continue
# Format to markdown
markdown_content = composio_connector.format_calendar_event_to_markdown(
event
)
# Generate unique identifier
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"calendar_{event_id}", search_space_id
)
content_hash = generate_content_hash(markdown_content, search_space_id)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Extract event times
start = event.get("start", {})
end = event.get("end", {})
start_time = start.get("dateTime") or start.get("date", "")
end_time = end.get("dateTime") or end.get("date", "")
location = event.get("location", "")
if existing_document:
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
events_to_process.append(
{
"document": existing_document,
"is_new": False,
"markdown_content": markdown_content,
"content_hash": content_hash,
"event_id": event_id,
"summary": summary,
"start_time": start_time,
"end_time": end_time,
"location": location,
}
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from standard connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Event {summary} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=summary,
document_type=DocumentType(
TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]
),
document_metadata={
"event_id": event_id,
"summary": summary,
"start_time": start_time,
"end_time": end_time,
"location": location,
"connector_id": connector_id,
"toolkit_id": "googlecalendar",
"source": "composio",
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
events_to_process.append(
{
"document": document,
"is_new": True,
"markdown_content": markdown_content,
"content_hash": content_hash,
"event_id": event_id,
"summary": summary,
"start_time": start_time,
"end_time": end_time,
"location": location,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(events_to_process)} documents")
for item in events_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"event_id": item["event_id"],
"summary": item["summary"],
"start_time": item["start_time"],
"document_type": "Google Calendar Event (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["markdown_content"],
user_llm,
document_metadata_for_summary,
)
else:
summary_content = (
f"Calendar: {item['summary']}\n\n{item['markdown_content']}"
)
summary_embedding = embed_text(summary_content)
chunks = await create_document_chunks(item["markdown_content"])
# Update document to READY with actual content
document.title = item["summary"]
document.content = summary_content
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"event_id": item["event_id"],
"summary": item["summary"],
"start_time": item["start_time"],
"end_time": item["end_time"],
"location": item["location"],
"connector_id": connector_id,
"source": "composio",
}
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Calendar event: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted (safety net)
# This matches the pattern used in non-Composio Gmail indexer
logger.info(
f"Final commit: Total {documents_indexed} Google Calendar events processed"
)
try:
await session.commit()
logger.info(
"Successfully committed all Composio Google Calendar document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same event was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google Calendar indexing via Composio for connector {connector_id}",
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"Composio Google Calendar indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return documents_indexed, warning_message
except Exception as e:
logger.error(
f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True
)
return 0, f"Failed to index Google Calendar via Composio: {e!s}"

View file

@ -52,44 +52,40 @@ class GoogleCalendarConnector:
) -> Credentials:
"""
Get valid Google OAuth credentials.
Returns:
Google OAuth credentials
Raises:
ValueError: If credentials have not been set
Exception: If credential refresh fails
Supports both native OAuth (with refresh_token) and Composio-sourced
credentials (with refresh_handler). For Composio credentials, validation
and DB persistence are skipped since Composio manages its own tokens.
"""
if not all(
[
self._credentials.client_id,
self._credentials.client_secret,
self._credentials.refresh_token,
]
):
raise ValueError(
"Google OAuth credentials (client_id, client_secret, refresh_token) must be set"
)
has_standard_refresh = bool(self._credentials.refresh_token)
if has_standard_refresh:
if not all(
[self._credentials.client_id, self._credentials.client_secret]
):
raise ValueError(
"Google OAuth credentials (client_id, client_secret) must be set"
)
if self._credentials and not self._credentials.expired:
return self._credentials
# Create credentials from refresh token
self._credentials = Credentials(
token=self._credentials.token,
refresh_token=self._credentials.refresh_token,
token_uri=self._credentials.token_uri,
client_id=self._credentials.client_id,
client_secret=self._credentials.client_secret,
scopes=self._credentials.scopes,
expiry=self._credentials.expiry,
)
if has_standard_refresh:
self._credentials = Credentials(
token=self._credentials.token,
refresh_token=self._credentials.refresh_token,
token_uri=self._credentials.token_uri,
client_id=self._credentials.client_id,
client_secret=self._credentials.client_secret,
scopes=self._credentials.scopes,
expiry=self._credentials.expiry,
)
# Refresh the token if needed
if self._credentials.expired or not self._credentials.valid:
try:
self._credentials.refresh(Request())
# Update the connector config in DB
if self._session:
# Use connector_id if available, otherwise fall back to user_id query
# Only persist refreshed token for native OAuth (Composio manages its own)
if has_standard_refresh and self._session:
if self._connector_id:
result = await self._session.execute(
select(SearchSourceConnector).filter(
@ -110,7 +106,6 @@ class GoogleCalendarConnector:
"GOOGLE_CALENDAR_CONNECTOR connector not found; cannot persist refreshed token."
)
# Encrypt sensitive credentials before storing
from app.config import config
from app.utils.oauth_security import TokenEncryption
@ -119,7 +114,6 @@ class GoogleCalendarConnector:
if token_encrypted and config.SECRET_KEY:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Encrypt sensitive fields
if creds_dict.get("token"):
creds_dict["token"] = token_encryption.encrypt_token(
creds_dict["token"]
@ -143,7 +137,6 @@ class GoogleCalendarConnector:
await self._session.commit()
except Exception as e:
error_str = str(e)
# Check if this is an invalid_grant error (token expired/revoked)
if (
"invalid_grant" in error_str.lower()
or "token has been expired or revoked" in error_str.lower()

View file

@ -15,16 +15,24 @@ from .file_types import GOOGLE_DOC, GOOGLE_SHEET
class GoogleDriveClient:
"""Client for Google Drive API operations."""
def __init__(self, session: AsyncSession, connector_id: int):
def __init__(
self,
session: AsyncSession,
connector_id: int,
credentials: "Credentials | None" = None,
):
"""
Initialize Google Drive client.
Args:
session: Database session
connector_id: ID of the Drive connector
credentials: Pre-built credentials (e.g. from Composio). If None,
credentials are loaded from the DB connector config.
"""
self.session = session
self.connector_id = connector_id
self._credentials = credentials
self.service = None
async def get_service(self):
@ -41,7 +49,12 @@ class GoogleDriveClient:
return self.service
try:
credentials = await get_valid_credentials(self.session, self.connector_id)
if self._credentials:
credentials = self._credentials
else:
credentials = await get_valid_credentials(
self.session, self.connector_id
)
self.service = build("drive", "v3", credentials=credentials)
return self.service
except Exception as e:

View file

@ -26,6 +26,7 @@ async def download_and_process_file(
task_logger: TaskLoggingService,
log_entry: Log,
connector_id: int | None = None,
enable_summary: bool = True,
) -> tuple[Any, str | None, dict[str, Any] | None]:
"""
Download Google Drive file and process using Surfsense file processors.
@ -95,6 +96,7 @@ async def download_and_process_file(
},
}
# Include connector_id for de-indexing support
connector_info["enable_summary"] = enable_summary
if connector_id is not None:
connector_info["connector_id"] = connector_id

View file

@ -81,44 +81,40 @@ class GoogleGmailConnector:
) -> Credentials:
"""
Get valid Google OAuth credentials.
Returns:
Google OAuth credentials
Raises:
ValueError: If credentials have not been set
Exception: If credential refresh fails
Supports both native OAuth (with refresh_token) and Composio-sourced
credentials (with refresh_handler). For Composio credentials, validation
and DB persistence are skipped since Composio manages its own tokens.
"""
if not all(
[
self._credentials.client_id,
self._credentials.client_secret,
self._credentials.refresh_token,
]
):
raise ValueError(
"Google OAuth credentials (client_id, client_secret, refresh_token) must be set"
)
has_standard_refresh = bool(self._credentials.refresh_token)
if has_standard_refresh:
if not all(
[self._credentials.client_id, self._credentials.client_secret]
):
raise ValueError(
"Google OAuth credentials (client_id, client_secret) must be set"
)
if self._credentials and not self._credentials.expired:
return self._credentials
# Create credentials from refresh token
self._credentials = Credentials(
token=self._credentials.token,
refresh_token=self._credentials.refresh_token,
token_uri=self._credentials.token_uri,
client_id=self._credentials.client_id,
client_secret=self._credentials.client_secret,
scopes=self._credentials.scopes,
expiry=self._credentials.expiry,
)
if has_standard_refresh:
self._credentials = Credentials(
token=self._credentials.token,
refresh_token=self._credentials.refresh_token,
token_uri=self._credentials.token_uri,
client_id=self._credentials.client_id,
client_secret=self._credentials.client_secret,
scopes=self._credentials.scopes,
expiry=self._credentials.expiry,
)
# Refresh the token if needed
if self._credentials.expired or not self._credentials.valid:
try:
self._credentials.refresh(Request())
# Update the connector config in DB
if self._session:
# Use connector_id if available, otherwise fall back to user_id query
# Only persist refreshed token for native OAuth (Composio manages its own)
if has_standard_refresh and self._session:
if self._connector_id:
result = await self._session.execute(
select(SearchSourceConnector).filter(
@ -143,7 +139,6 @@ class GoogleGmailConnector:
await self._session.commit()
except Exception as e:
error_str = str(e)
# Check if this is an invalid_grant error (token expired/revoked)
if (
"invalid_grant" in error_str.lower()
or "token has been expired or revoked" in error_str.lower()

View file

@ -157,7 +157,7 @@ class ChucksHybridSearchRetriever:
query_text: str,
top_k: int,
search_space_id: int,
document_type: str | None = None,
document_type: str | list[str] | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
query_embedding: list | None = None,
@ -217,18 +217,24 @@ class ChucksHybridSearchRetriever:
func.coalesce(Document.status["state"].astext, "ready") != "deleting",
]
# Add document type filter if provided
# Add document type filter if provided (single string or list of strings)
if document_type is not None:
# Convert string to enum value if needed
if isinstance(document_type, str):
try:
doc_type_enum = DocumentType[document_type]
base_conditions.append(Document.document_type == doc_type_enum)
except KeyError:
# If the document type doesn't exist in the enum, return empty results
return []
type_list = document_type if isinstance(document_type, list) else [document_type]
doc_type_enums = []
for dt in type_list:
if isinstance(dt, str):
try:
doc_type_enums.append(DocumentType[dt])
except KeyError:
pass
else:
doc_type_enums.append(dt)
if not doc_type_enums:
return []
if len(doc_type_enums) == 1:
base_conditions.append(Document.document_type == doc_type_enums[0])
else:
base_conditions.append(Document.document_type == document_type)
base_conditions.append(Document.document_type.in_(doc_type_enums))
# Add time-based filtering if provided
if start_date is not None:

View file

@ -149,7 +149,7 @@ class DocumentHybridSearchRetriever:
query_text: str,
top_k: int,
search_space_id: int,
document_type: str | None = None,
document_type: str | list[str] | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
query_embedding: list | None = None,
@ -197,18 +197,24 @@ class DocumentHybridSearchRetriever:
func.coalesce(Document.status["state"].astext, "ready") != "deleting",
]
# Add document type filter if provided
# Add document type filter if provided (single string or list of strings)
if document_type is not None:
# Convert string to enum value if needed
if isinstance(document_type, str):
try:
doc_type_enum = DocumentType[document_type]
base_conditions.append(Document.document_type == doc_type_enum)
except KeyError:
# If the document type doesn't exist in the enum, return empty results
return []
type_list = document_type if isinstance(document_type, list) else [document_type]
doc_type_enums = []
for dt in type_list:
if isinstance(dt, str):
try:
doc_type_enums.append(DocumentType[dt])
except KeyError:
pass
else:
doc_type_enums.append(dt)
if not doc_type_enums:
return []
if len(doc_type_enums) == 1:
base_conditions.append(Document.document_type == doc_type_enums[0])
else:
base_conditions.append(Document.document_type == document_type)
base_conditions.append(Document.document_type.in_(doc_type_enums))
# Add time-based filtering if provided
if start_date is not None:

View file

@ -1068,7 +1068,7 @@ async def index_connector_content(
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
):
from app.tasks.celery_tasks.connector_tasks import (
index_composio_connector_task,
index_google_drive_files_task,
)
# For Composio Google Drive, if drive_items is provided, update connector config
@ -1102,34 +1102,72 @@ async def index_connector_content(
else:
logger.info(
f"Triggering Composio Google Drive indexing for connector {connector_id} into search space {search_space_id} "
f"using existing config (from {indexing_from} to {indexing_to})"
f"using existing config"
)
index_composio_connector_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
# Extract config and build items_dict for index_google_drive_files_task
config = connector.config or {}
selected_folders = config.get("selected_folders", [])
selected_files = config.get("selected_files", [])
if not selected_folders and not selected_files:
raise HTTPException(
status_code=400,
detail="Composio Google Drive indexing requires folders or files to be configured. "
"Please select folders/files to index.",
)
indexing_options = config.get(
"indexing_options",
{
"max_files_per_folder": 100,
"incremental_sync": True,
"include_subfolders": True,
},
)
items_dict = {
"folders": selected_folders,
"files": selected_files,
"indexing_options": indexing_options,
}
index_google_drive_files_task.delay(
connector_id, search_space_id, str(user.id), items_dict
)
response_message = (
"Composio Google Drive indexing started in the background."
)
elif connector.connector_type in [
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]:
elif (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
):
from app.tasks.celery_tasks.connector_tasks import (
index_composio_connector_task,
index_google_gmail_messages_task,
)
# For Composio Gmail and Calendar, use the same date calculation logic as normal connectors
# This ensures consistent behavior and uses last_indexed_at to reduce API calls
# (includes special case: if indexed today, go back 1 day to avoid missing data)
logger.info(
f"Triggering Composio connector indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
f"Triggering Composio Gmail indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
index_composio_connector_task.delay(
index_google_gmail_messages_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = "Composio connector indexing started in the background."
response_message = "Composio Gmail indexing started in the background."
elif (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
from app.tasks.celery_tasks.connector_tasks import (
index_google_calendar_events_task,
)
logger.info(
f"Triggering Composio Google Calendar indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
index_google_calendar_events_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = (
"Composio Google Calendar indexing started in the background."
)
else:
raise HTTPException(

View file

@ -36,32 +36,14 @@ TOOLKIT_TO_CONNECTOR_TYPE = {
}
# Mapping of toolkit IDs to document types
TOOLKIT_TO_DOCUMENT_TYPE = {
"googledrive": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"gmail": "COMPOSIO_GMAIL_CONNECTOR",
"googlecalendar": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
# Google Drive, Gmail, Calendar use unified native indexers - not in this registry
TOOLKIT_TO_DOCUMENT_TYPE: dict[str, str] = {}
# Mapping of toolkit IDs to their indexer functions
# Format: toolkit_id -> (module_path, function_name, supports_date_filter)
# supports_date_filter: True if the indexer accepts start_date/end_date params
TOOLKIT_TO_INDEXER = {
"googledrive": (
"app.connectors.composio_google_drive_connector",
"index_composio_google_drive",
False, # Google Drive doesn't use date filtering
),
"gmail": (
"app.connectors.composio_gmail_connector",
"index_composio_gmail",
True, # Gmail uses date filtering
),
"googlecalendar": (
"app.connectors.composio_google_calendar_connector",
"index_composio_google_calendar",
True, # Calendar uses date filtering
),
}
# Google Drive, Gmail, Calendar use unified native indexers - not in this registry
TOOLKIT_TO_INDEXER: dict[str, tuple[str, str, bool]] = {}
class ComposioService:

View file

@ -215,11 +215,20 @@ class ConnectorService:
return result_object, files_docs
# Composio connectors that were unified into native Google pipelines.
# Old documents may still carry the legacy type until re-indexed; searching
# for a native type should transparently include its legacy equivalent.
_LEGACY_TYPE_ALIASES: dict[str, str] = {
"GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
async def _combined_rrf_search(
self,
query_text: str,
search_space_id: int,
document_type: str,
document_type: str | list[str],
top_k: int = 20,
start_date: datetime | None = None,
end_date: datetime | None = None,
@ -241,7 +250,8 @@ class ConnectorService:
Args:
query_text: The search query text
search_space_id: The search space ID to search within
document_type: Document type to filter (e.g., "FILE", "CRAWLED_URL")
document_type: Document type(s) to filter (e.g., "FILE", "CRAWLED_URL",
or a list for multi-type queries)
top_k: Number of results to return
start_date: Optional start date for filtering documents by updated_at
end_date: Optional end date for filtering documents by updated_at
@ -254,6 +264,16 @@ class ConnectorService:
perf = get_perf_logger()
t0 = time.perf_counter()
# Expand native Google types to include legacy Composio equivalents
# so old documents remain searchable until re-indexed.
if isinstance(document_type, str) and document_type in self._LEGACY_TYPE_ALIASES:
resolved_type: str | list[str] = [
document_type,
self._LEGACY_TYPE_ALIASES[document_type],
]
else:
resolved_type = document_type
# RRF constant
k = 60
@ -276,7 +296,7 @@ class ConnectorService:
"query_text": query_text,
"top_k": retriever_top_k,
"search_space_id": search_space_id,
"document_type": document_type,
"document_type": resolved_type,
"start_date": start_date,
"end_date": end_date,
"query_embedding": query_embedding,
@ -2746,299 +2766,6 @@ class ConnectorService:
return result_object, obsidian_docs
# =========================================================================
# Composio Connector Search Methods
# =========================================================================
async def search_composio_google_drive(
self,
user_query: str,
search_space_id: int,
top_k: int = 20,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> tuple:
"""
Search for Composio Google Drive files and return both the source information
and langchain documents.
Uses combined chunk-level and document-level hybrid search with RRF fusion.
Args:
user_query: The user's query
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
start_date: Optional start date for filtering documents by updated_at
end_date: Optional end date for filtering documents by updated_at
Returns:
tuple: (sources_info, langchain_documents)
"""
composio_drive_docs = await self._combined_rrf_search(
query_text=user_query,
search_space_id=search_space_id,
document_type="COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
top_k=top_k,
start_date=start_date,
end_date=end_date,
)
# Early return if no results
if not composio_drive_docs:
return {
"id": 54,
"name": "Google Drive (Composio)",
"type": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"sources": [],
}, []
def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
return (
doc_info.get("title")
or metadata.get("title")
or metadata.get("file_name")
or "Untitled Document"
)
def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
return metadata.get("url") or metadata.get("web_view_link") or ""
def _description_fn(
chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> str:
description = self._chunk_preview(chunk.get("content", ""), limit=200)
info_parts = []
mime_type = metadata.get("mime_type")
modified_time = metadata.get("modified_time")
if mime_type:
info_parts.append(f"Type: {mime_type}")
if modified_time:
info_parts.append(f"Modified: {modified_time}")
if info_parts:
description = (description + " | " + " | ".join(info_parts)).strip(" |")
return description
def _extra_fields_fn(
_chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> dict[str, Any]:
return {
"mime_type": metadata.get("mime_type", ""),
"file_id": metadata.get("file_id", ""),
"modified_time": metadata.get("modified_time", ""),
}
sources_list = self._build_chunk_sources_from_documents(
composio_drive_docs,
title_fn=_title_fn,
url_fn=_url_fn,
description_fn=_description_fn,
extra_fields_fn=_extra_fields_fn,
)
# Create result object
result_object = {
"id": 54,
"name": "Google Drive (Composio)",
"type": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"sources": sources_list,
}
return result_object, composio_drive_docs
async def search_composio_gmail(
self,
user_query: str,
search_space_id: int,
top_k: int = 20,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> tuple:
"""
Search for Composio Gmail messages and return both the source information
and langchain documents.
Uses combined chunk-level and document-level hybrid search with RRF fusion.
Args:
user_query: The user's query
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
start_date: Optional start date for filtering documents by updated_at
end_date: Optional end date for filtering documents by updated_at
Returns:
tuple: (sources_info, langchain_documents)
"""
composio_gmail_docs = await self._combined_rrf_search(
query_text=user_query,
search_space_id=search_space_id,
document_type="COMPOSIO_GMAIL_CONNECTOR",
top_k=top_k,
start_date=start_date,
end_date=end_date,
)
# Early return if no results
if not composio_gmail_docs:
return {
"id": 55,
"name": "Gmail (Composio)",
"type": "COMPOSIO_GMAIL_CONNECTOR",
"sources": [],
}, []
def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
return (
doc_info.get("title")
or metadata.get("subject")
or metadata.get("title")
or "Untitled Email"
)
def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
return metadata.get("url") or ""
def _description_fn(
chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> str:
description = self._chunk_preview(chunk.get("content", ""), limit=200)
info_parts = []
sender = metadata.get("from") or metadata.get("sender")
date = metadata.get("date") or metadata.get("received_at")
if sender:
info_parts.append(f"From: {sender}")
if date:
info_parts.append(f"Date: {date}")
if info_parts:
description = (description + " | " + " | ".join(info_parts)).strip(" |")
return description
def _extra_fields_fn(
_chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> dict[str, Any]:
return {
"message_id": metadata.get("message_id", ""),
"thread_id": metadata.get("thread_id", ""),
"from": metadata.get("from", ""),
"to": metadata.get("to", ""),
"date": metadata.get("date", ""),
}
sources_list = self._build_chunk_sources_from_documents(
composio_gmail_docs,
title_fn=_title_fn,
url_fn=_url_fn,
description_fn=_description_fn,
extra_fields_fn=_extra_fields_fn,
)
# Create result object
result_object = {
"id": 55,
"name": "Gmail (Composio)",
"type": "COMPOSIO_GMAIL_CONNECTOR",
"sources": sources_list,
}
return result_object, composio_gmail_docs
async def search_composio_google_calendar(
self,
user_query: str,
search_space_id: int,
top_k: int = 20,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> tuple:
"""
Search for Composio Google Calendar events and return both the source information
and langchain documents.
Uses combined chunk-level and document-level hybrid search with RRF fusion.
Args:
user_query: The user's query
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
start_date: Optional start date for filtering documents by updated_at
end_date: Optional end date for filtering documents by updated_at
Returns:
tuple: (sources_info, langchain_documents)
"""
composio_calendar_docs = await self._combined_rrf_search(
query_text=user_query,
search_space_id=search_space_id,
document_type="COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
top_k=top_k,
start_date=start_date,
end_date=end_date,
)
# Early return if no results
if not composio_calendar_docs:
return {
"id": 56,
"name": "Google Calendar (Composio)",
"type": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
"sources": [],
}, []
def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
return (
doc_info.get("title")
or metadata.get("summary")
or metadata.get("title")
or "Untitled Event"
)
def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
return metadata.get("url") or metadata.get("html_link") or ""
def _description_fn(
chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> str:
description = self._chunk_preview(chunk.get("content", ""), limit=200)
info_parts = []
start_time = metadata.get("start_time") or metadata.get("start")
end_time = metadata.get("end_time") or metadata.get("end")
if start_time:
info_parts.append(f"Start: {start_time}")
if end_time:
info_parts.append(f"End: {end_time}")
if info_parts:
description = (description + " | " + " | ".join(info_parts)).strip(" |")
return description
def _extra_fields_fn(
_chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> dict[str, Any]:
return {
"event_id": metadata.get("event_id", ""),
"calendar_id": metadata.get("calendar_id", ""),
"start_time": metadata.get("start_time", ""),
"end_time": metadata.get("end_time", ""),
"location": metadata.get("location", ""),
}
sources_list = self._build_chunk_sources_from_documents(
composio_calendar_docs,
title_fn=_title_fn,
url_fn=_url_fn,
description_fn=_description_fn,
extra_fields_fn=_extra_fields_fn,
)
# Create result object
result_object = {
"id": 56,
"name": "Google Calendar (Composio)",
"type": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
"sources": sources_list,
}
return result_object, composio_calendar_docs
# =========================================================================
# Utility Methods for Connector Discovery
# =========================================================================

View file

@ -112,8 +112,10 @@ class GoogleDriveToolMetadataService:
and_(
SearchSourceConnector.id == document.connector_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnector.connector_type.in_([
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]),
)
)
)
@ -139,8 +141,10 @@ class GoogleDriveToolMetadataService:
and_(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnector.connector_type.in_([
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]),
)
)
.order_by(SearchSourceConnector.last_indexed_at.desc())

View file

@ -55,7 +55,6 @@ async def _check_and_trigger_schedules():
from app.tasks.celery_tasks.connector_tasks import (
index_airtable_records_task,
index_clickup_tasks_task,
index_composio_connector_task,
index_confluence_pages_task,
index_crawled_urls_task,
index_discord_messages_task,
@ -88,10 +87,10 @@ async def _check_and_trigger_schedules():
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task,
# Composio connector types
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: index_composio_connector_task,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR: index_composio_connector_task,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: index_composio_connector_task,
# Composio connector types (unified with native Google tasks)
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR: index_google_gmail_messages_task,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task,
}
# Trigger indexing for each due connector
@ -129,11 +128,11 @@ async def _check_and_trigger_schedules():
f"({connector.connector_type.value})"
)
# Special handling for Google Drive - uses config for folder/file selection
if (
connector.connector_type
== SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
):
# Special handling for Google Drive (native and Composio) - uses config for folder/file selection
if connector.connector_type in [
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]:
connector_config = connector.config or {}
selected_folders = connector_config.get("selected_folders", [])
selected_files = connector_config.get("selected_files", [])

View file

@ -16,6 +16,15 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.google_calendar_connector import GoogleCalendarConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
ACCEPTED_CALENDAR_CONNECTOR_TYPES = {
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
}
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -87,10 +96,12 @@ async def index_google_calendar_events(
)
try:
# Get the connector from the database
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR
)
# Accept both native and Composio Calendar connectors
connector = None
for ct in ACCEPTED_CALENDAR_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
if connector:
break
if not connector:
await task_logger.log_task_failure(
@ -101,69 +112,80 @@ async def index_google_calendar_events(
)
return 0, f"Connector with ID {connector_id} not found"
# Get the Google Calendar credentials from the connector config
config_data = connector.config
# Decrypt sensitive credentials if encrypted (for backward compatibility)
from app.config import config
from app.utils.oauth_security import TokenEncryption
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt sensitive fields
if config_data.get("token"):
config_data["token"] = token_encryption.decrypt_token(
config_data["token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
if config_data.get("client_secret"):
config_data["client_secret"] = token_encryption.decrypt_token(
config_data["client_secret"]
)
logger.info(
f"Decrypted Google Calendar credentials for connector {connector_id}"
)
except Exception as e:
# Build credentials based on connector type
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get(
"composio_connected_account_id"
)
if not connected_account_id:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Google Calendar credentials for connector {connector_id}: {e!s}",
"Credential decryption failed",
{"error_type": "CredentialDecryptionError"},
f"Composio connected_account_id not found for connector {connector_id}",
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, f"Failed to decrypt Google Calendar credentials: {e!s}"
return 0, "Composio connected_account_id not found"
credentials = build_composio_credentials(connected_account_id)
else:
config_data = connector.config
exp = config_data.get("expiry", "").replace("Z", "")
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes"),
expiry=datetime.fromisoformat(exp) if exp else None,
)
from app.config import config
from app.utils.oauth_security import TokenEncryption
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google Calendar credentials not found in connector config for connector {connector_id}",
"Missing Google Calendar credentials",
{"error_type": "MissingCredentials"},
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
if config_data.get("token"):
config_data["token"] = token_encryption.decrypt_token(
config_data["token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
if config_data.get("client_secret"):
config_data["client_secret"] = token_encryption.decrypt_token(
config_data["client_secret"]
)
logger.info(
f"Decrypted Google Calendar credentials for connector {connector_id}"
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Google Calendar credentials for connector {connector_id}: {e!s}",
"Credential decryption failed",
{"error_type": "CredentialDecryptionError"},
)
return 0, f"Failed to decrypt Google Calendar credentials: {e!s}"
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
return 0, "Google Calendar credentials not found in connector config"
# Initialize Google Calendar client
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google Calendar credentials not found in connector config for connector {connector_id}",
"Missing Google Calendar credentials",
{"error_type": "MissingCredentials"},
)
return 0, "Google Calendar credentials not found in connector config"
await task_logger.log_task_progress(
log_entry,
f"Initializing Google Calendar client for connector {connector_id}",

View file

@ -31,6 +31,15 @@ from app.tasks.connector_indexers.base import (
update_connector_last_indexed,
)
from app.utils.document_converters import generate_unique_identifier_hash
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
ACCEPTED_DRIVE_CONNECTOR_TYPES = {
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
}
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
@ -89,14 +98,17 @@ async def index_google_drive_files(
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
)
# Accept both native and Composio Drive connectors
connector = None
for ct in ACCEPTED_DRIVE_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
if connector:
break
if not connector:
error_msg = f"Google Drive connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
@ -106,27 +118,43 @@ async def index_google_drive_files(
{"stage": "client_initialization"},
)
# Check if credentials are encrypted (only when explicitly marked)
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted:
# Credentials are explicitly marked as encrypted, will be decrypted during client initialization
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
# Build credentials based on connector type
pre_built_credentials = None
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get(
"composio_connected_account_id"
)
# If _token_encrypted is False or not set, treat credentials as plaintext
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(
log_entry, error_msg, "Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, error_msg
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted:
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
)
drive_client = GoogleDriveClient(session, connector_id)
connector_enable_summary = getattr(connector, "enable_summary", True)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
if not folder_id:
error_msg = "folder_id is required for Google Drive indexing"
@ -164,6 +192,7 @@ async def index_google_drive_files(
max_files=max_files,
include_subfolders=include_subfolders,
on_heartbeat_callback=on_heartbeat_callback,
enable_summary=connector_enable_summary,
)
else:
logger.info(f"Using full scan for connector {connector_id}")
@ -181,6 +210,7 @@ async def index_google_drive_files(
max_files=max_files,
include_subfolders=include_subfolders,
on_heartbeat_callback=on_heartbeat_callback,
enable_summary=connector_enable_summary,
)
documents_indexed, documents_skipped = result
@ -278,14 +308,17 @@ async def index_google_drive_single_file(
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
)
# Accept both native and Composio Drive connectors
connector = None
for ct in ACCEPTED_DRIVE_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
if connector:
break
if not connector:
error_msg = f"Google Drive connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
@ -295,27 +328,42 @@ async def index_google_drive_single_file(
{"stage": "client_initialization"},
)
# Check if credentials are encrypted (only when explicitly marked)
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted:
# Credentials are explicitly marked as encrypted, will be decrypted during client initialization
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
pre_built_credentials = None
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get(
"composio_connected_account_id"
)
# If _token_encrypted is False or not set, treat credentials as plaintext
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(
log_entry, error_msg, "Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, error_msg
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted:
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
)
drive_client = GoogleDriveClient(session, connector_id)
connector_enable_summary = getattr(connector, "enable_summary", True)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
# Fetch the file metadata
file, error = await get_file_by_id(drive_client, file_id)
@ -362,6 +410,7 @@ async def index_google_drive_single_file(
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
enable_summary=connector_enable_summary,
)
await session.commit()
@ -433,6 +482,7 @@ async def _index_full_scan(
max_files: int,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Perform full scan indexing of a folder.
@ -562,6 +612,7 @@ async def _index_full_scan(
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
enable_summary=enable_summary,
)
documents_indexed += indexed
@ -592,6 +643,7 @@ async def _index_with_delta_sync(
max_files: int,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Perform delta sync indexing using change tracking.
@ -703,6 +755,7 @@ async def _index_with_delta_sync(
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
enable_summary=enable_summary,
)
documents_indexed += indexed
@ -957,6 +1010,7 @@ async def _process_single_file(
task_logger: TaskLoggingService,
log_entry: any,
pending_document: Document | None = None,
enable_summary: bool = True,
) -> tuple[int, int, int]:
"""
Process a single file by downloading and using Surfsense's file processor.
@ -1020,6 +1074,7 @@ async def _process_single_file(
task_logger=task_logger,
log_entry=log_entry,
connector_id=connector_id,
enable_summary=enable_summary,
)
if error:

View file

@ -21,6 +21,15 @@ from app.db import (
DocumentType,
SearchSourceConnectorType,
)
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
ACCEPTED_GMAIL_CONNECTOR_TYPES = {
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
}
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -94,90 +103,100 @@ async def index_google_gmail_messages(
)
try:
# Get connector by id
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR
)
# Accept both native and Composio Gmail connectors
connector = None
for ct in ACCEPTED_GMAIL_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
if connector:
break
if not connector:
error_msg = f"Gmail connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
# Get the Google Gmail credentials from the connector config
config_data = connector.config
# Decrypt sensitive credentials if encrypted (for backward compatibility)
from app.config import config
from app.utils.oauth_security import TokenEncryption
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt sensitive fields
if config_data.get("token"):
config_data["token"] = token_encryption.decrypt_token(
config_data["token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
if config_data.get("client_secret"):
config_data["client_secret"] = token_encryption.decrypt_token(
config_data["client_secret"]
)
logger.info(
f"Decrypted Google Gmail credentials for connector {connector_id}"
)
except Exception as e:
# Build credentials based on connector type
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get(
"composio_connected_account_id"
)
if not connected_account_id:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Google Gmail credentials for connector {connector_id}: {e!s}",
"Credential decryption failed",
{"error_type": "CredentialDecryptionError"},
f"Composio connected_account_id not found for connector {connector_id}",
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, f"Failed to decrypt Google Gmail credentials: {e!s}"
return 0, "Composio connected_account_id not found"
credentials = build_composio_credentials(connected_account_id)
else:
config_data = connector.config
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
from app.config import config
from app.utils.oauth_security import TokenEncryption
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google gmail credentials not found in connector config for connector {connector_id}",
"Missing Google gmail credentials",
{"error_type": "MissingCredentials"},
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
if config_data.get("token"):
config_data["token"] = token_encryption.decrypt_token(
config_data["token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
if config_data.get("client_secret"):
config_data["client_secret"] = token_encryption.decrypt_token(
config_data["client_secret"]
)
logger.info(
f"Decrypted Google Gmail credentials for connector {connector_id}"
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Google Gmail credentials for connector {connector_id}: {e!s}",
"Credential decryption failed",
{"error_type": "CredentialDecryptionError"},
)
return 0, f"Failed to decrypt Google Gmail credentials: {e!s}"
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
return 0, "Google gmail credentials not found in connector config"
# Initialize Google gmail client
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google gmail credentials not found in connector config for connector {connector_id}",
"Missing Google gmail credentials",
{"error_type": "MissingCredentials"},
)
return 0, "Google gmail credentials not found in connector config"
await task_logger.log_task_progress(
log_entry,
f"Initializing Google gmail client for connector {connector_id}",
{"stage": "client_initialization"},
)
# Initialize Google gmail connector
gmail_connector = GoogleGmailConnector(
credentials, session, user_id, connector_id
)

View file

@ -411,6 +411,7 @@ async def add_received_file_document_using_unstructured(
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""
Process and store a file document using Unstructured service.
@ -471,9 +472,13 @@ async def add_received_file_document_using_unstructured(
"etl_service": "UNSTRUCTURED",
"document_type": "File Document",
}
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
if enable_summary:
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
else:
summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}"
summary_embedding = embed_text(summary_content)
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
@ -493,14 +498,13 @@ async def add_received_file_document_using_unstructured(
existing_document.source_markdown = file_in_markdown
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready
existing_document.status = DocumentStatus.ready()
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
@ -523,7 +527,7 @@ async def add_received_file_document_using_unstructured(
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(), # Mark as ready
status=DocumentStatus.ready(),
)
session.add(document)
@ -546,6 +550,7 @@ async def add_received_file_document_using_llamacloud(
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""
Process and store document content parsed by LlamaCloud.
@ -605,16 +610,19 @@ async def add_received_file_document_using_llamacloud(
"etl_service": "LLAMACLOUD",
"document_type": "File Document",
}
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
if enable_summary:
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
else:
summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}"
summary_embedding = embed_text(summary_content)
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
@ -627,14 +635,12 @@ async def add_received_file_document_using_llamacloud(
existing_document.source_markdown = file_in_markdown
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready
existing_document.status = DocumentStatus.ready()
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
@ -657,7 +663,7 @@ async def add_received_file_document_using_llamacloud(
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(), # Mark as ready
status=DocumentStatus.ready(),
)
session.add(document)
@ -682,6 +688,7 @@ async def add_received_file_document_using_docling(
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""
Process and store document content parsed by Docling.
@ -734,33 +741,32 @@ async def add_received_file_document_using_docling(
f"No long context LLM configured for user {user_id} in search_space {search_space_id}"
)
# Generate summary using chunked processing for large documents
from app.services.docling_service import create_docling_service
if enable_summary:
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
docling_service = create_docling_service()
summary_content = await docling_service.process_large_document_summary(
content=file_in_markdown, llm=user_llm, document_title=file_name
)
summary_content = await docling_service.process_large_document_summary(
content=file_in_markdown, llm=user_llm, document_title=file_name
)
# Enhance summary with metadata
document_metadata = {
"file_name": file_name,
"etl_service": "DOCLING",
"document_type": "File Document",
}
metadata_parts = []
metadata_parts.append("# DOCUMENT METADATA")
document_metadata = {
"file_name": file_name,
"etl_service": "DOCLING",
"document_type": "File Document",
}
metadata_parts = ["# DOCUMENT METADATA"]
for key, value in document_metadata.items():
if value:
formatted_key = key.replace("_", " ").title()
metadata_parts.append(f"**{formatted_key}:** {value}")
for key, value in document_metadata.items():
if value: # Only include non-empty values
formatted_key = key.replace("_", " ").title()
metadata_parts.append(f"**{formatted_key}:** {value}")
metadata_section = "\n".join(metadata_parts)
enhanced_summary_content = (
f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}"
)
metadata_section = "\n".join(metadata_parts)
enhanced_summary_content = (
f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}"
)
else:
enhanced_summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}"
summary_embedding = embed_text(enhanced_summary_content)
@ -1219,9 +1225,10 @@ async def process_file_in_background(
print("Error deleting temp file", e)
pass
# Pass the documents to the existing background task
enable_summary = connector.get("enable_summary", True) if connector else True
result = await add_received_file_document_using_unstructured(
session, filename, docs, search_space_id, user_id, connector
session, filename, docs, search_space_id, user_id, connector,
enable_summary=enable_summary,
)
if connector:
@ -1362,7 +1369,7 @@ async def process_file_in_background(
# Extract text content from the markdown documents
markdown_content = doc.text
# Process the documents using our LlamaCloud background task
enable_summary = connector.get("enable_summary", True) if connector else True
doc_result = await add_received_file_document_using_llamacloud(
session,
filename,
@ -1370,6 +1377,7 @@ async def process_file_in_background(
search_space_id=search_space_id,
user_id=user_id,
connector=connector,
enable_summary=enable_summary,
)
# Track if this document was successfully created
@ -1516,7 +1524,7 @@ async def process_file_in_background(
session, notification, stage="chunking"
)
# Process the document using our Docling background task
enable_summary = connector.get("enable_summary", True) if connector else True
doc_result = await add_received_file_document_using_docling(
session,
filename,
@ -1524,6 +1532,7 @@ async def process_file_in_background(
search_space_id=search_space_id,
user_id=user_id,
connector=connector,
enable_summary=enable_summary,
)
if doc_result:

View file

@ -0,0 +1,41 @@
"""Shared Google OAuth credential utilities for native and Composio connectors."""
import logging
from datetime import datetime, timedelta, timezone
from google.oauth2.credentials import Credentials
from app.db import SearchSourceConnectorType
logger = logging.getLogger(__name__)
COMPOSIO_GOOGLE_CONNECTOR_TYPES = {
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
}
def build_composio_credentials(connected_account_id: str) -> Credentials:
"""
Build Google OAuth Credentials backed by Composio's token management.
The returned Credentials object uses a refresh_handler that fetches
fresh access tokens from Composio on demand, so it works seamlessly
with googleapiclient.discovery.build().
"""
from app.services.composio_service import ComposioService
service = ComposioService()
access_token = service.get_access_token(connected_account_id)
def composio_refresh_handler(request, scopes):
fresh_token = service.get_access_token(connected_account_id)
expiry = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=55)
return fresh_token, expiry
return Credentials(
token=access_token,
expiry=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=55),
refresh_handler=composio_refresh_handler,
)

View file

@ -31,10 +31,10 @@ export const CONNECTOR_TO_DOCUMENT_TYPE: Record<string, string> = {
// Special mappings (connector type differs from document type)
GOOGLE_DRIVE_CONNECTOR: "GOOGLE_DRIVE_FILE",
WEBCRAWLER_CONNECTOR: "CRAWLED_URL",
// Composio connectors map to their own document types
COMPOSIO_GOOGLE_DRIVE_CONNECTOR: "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
COMPOSIO_GMAIL_CONNECTOR: "COMPOSIO_GMAIL_CONNECTOR",
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
// Composio connectors map to unified Google document types
COMPOSIO_GOOGLE_DRIVE_CONNECTOR: "GOOGLE_DRIVE_FILE",
COMPOSIO_GMAIL_CONNECTOR: "GOOGLE_GMAIL_CONNECTOR",
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: "GOOGLE_CALENDAR_CONNECTOR",
};
/**