mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-06 12:39:06 +00:00
381 lines
16 KiB
Python
381 lines
16 KiB
Python
"""
|
|
Airtable connector indexer.
|
|
"""
|
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import config
|
|
from app.connectors.airtable_connector import AirtableConnector
|
|
from app.db import Document, DocumentType, SearchSourceConnectorType
|
|
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
|
|
from app.services.llm_service import get_user_long_context_llm
|
|
from app.services.task_logging_service import TaskLoggingService
|
|
from app.utils.document_converters import (
|
|
create_document_chunks,
|
|
generate_content_hash,
|
|
generate_document_summary,
|
|
)
|
|
|
|
from .base import (
|
|
calculate_date_range,
|
|
check_duplicate_document_by_hash,
|
|
get_connector_by_id,
|
|
logger,
|
|
update_connector_last_indexed,
|
|
)
|
|
|
|
|
|
async def index_airtable_records(
|
|
session: AsyncSession,
|
|
connector_id: int,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
start_date: str | None = None,
|
|
end_date: str | None = None,
|
|
max_records: int = 2500,
|
|
update_last_indexed: bool = True,
|
|
) -> tuple[int, str | None]:
|
|
"""
|
|
Index Airtable records for a given connector.
|
|
|
|
Args:
|
|
session: Database session
|
|
connector_id: ID of the Airtable connector
|
|
search_space_id: ID of the search space to store documents in
|
|
user_id: ID of the user
|
|
start_date: Start date for filtering records (YYYY-MM-DD)
|
|
end_date: End date for filtering records (YYYY-MM-DD)
|
|
max_records: Maximum number of records to fetch per table
|
|
update_last_indexed: Whether to update the last_indexed_at timestamp
|
|
|
|
Returns:
|
|
Tuple of (number_of_documents_processed, error_message)
|
|
"""
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="airtable_indexing",
|
|
source="connector_indexing_task",
|
|
message=f"Starting Airtable indexing for connector {connector_id}",
|
|
metadata={
|
|
"connector_id": connector_id,
|
|
"user_id": str(user_id),
|
|
"start_date": start_date,
|
|
"end_date": end_date,
|
|
"max_records": max_records,
|
|
},
|
|
)
|
|
|
|
try:
|
|
# Get the connector from the database
|
|
connector = await get_connector_by_id(
|
|
session, connector_id, SearchSourceConnectorType.AIRTABLE_CONNECTOR
|
|
)
|
|
|
|
if not connector:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Connector with ID {connector_id} not found",
|
|
"Connector not found",
|
|
{"error_type": "ConnectorNotFound"},
|
|
)
|
|
return 0, f"Connector with ID {connector_id} not found"
|
|
|
|
# Create credentials from connector config
|
|
config_data = connector.config
|
|
try:
|
|
credentials = AirtableAuthCredentialsBase.from_dict(config_data)
|
|
except Exception as e:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Invalid Airtable credentials in connector {connector_id}",
|
|
str(e),
|
|
{"error_type": "InvalidCredentials"},
|
|
)
|
|
return 0, f"Invalid Airtable credentials: {e!s}"
|
|
|
|
# Check if credentials are expired
|
|
if credentials.is_expired:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Airtable credentials expired for connector {connector_id}",
|
|
"Credentials expired",
|
|
{"error_type": "ExpiredCredentials"},
|
|
)
|
|
return 0, "Airtable credentials have expired. Please re-authenticate."
|
|
|
|
# Calculate date range for indexing
|
|
start_date_str, end_date_str = calculate_date_range(
|
|
connector, start_date, end_date, default_days_back=365
|
|
)
|
|
|
|
logger.info(
|
|
f"Starting Airtable indexing for connector {connector_id} "
|
|
f"from {start_date_str} to {end_date_str}"
|
|
)
|
|
|
|
# Initialize Airtable connector
|
|
airtable_connector = AirtableConnector(credentials)
|
|
total_processed = 0
|
|
|
|
try:
|
|
# Get accessible bases
|
|
logger.info(f"Fetching Airtable bases for connector {connector_id}")
|
|
bases, error = airtable_connector.get_bases()
|
|
|
|
if error:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to fetch Airtable bases: {error}",
|
|
"API Error",
|
|
{"error_type": "APIError"},
|
|
)
|
|
return 0, f"Failed to fetch Airtable bases: {error}"
|
|
|
|
if not bases:
|
|
success_msg = "No Airtable bases found or accessible"
|
|
await task_logger.log_task_success(
|
|
log_entry, success_msg, {"bases_count": 0}
|
|
)
|
|
return 0, success_msg
|
|
|
|
logger.info(f"Found {len(bases)} Airtable bases to process")
|
|
|
|
# Process each base
|
|
for base in bases:
|
|
base_id = base.get("id")
|
|
base_name = base.get("name", "Unknown Base")
|
|
|
|
if not base_id:
|
|
logger.warning(f"Skipping base without ID: {base}")
|
|
continue
|
|
|
|
logger.info(f"Processing base: {base_name} ({base_id})")
|
|
|
|
# Get base schema to find tables
|
|
schema_data, schema_error = airtable_connector.get_base_schema(base_id)
|
|
|
|
if schema_error:
|
|
logger.warning(
|
|
f"Failed to get schema for base {base_id}: {schema_error}"
|
|
)
|
|
continue
|
|
|
|
if not schema_data or "tables" not in schema_data:
|
|
logger.warning(f"No tables found in base {base_id}")
|
|
continue
|
|
|
|
tables = schema_data["tables"]
|
|
logger.info(f"Found {len(tables)} tables in base {base_name}")
|
|
|
|
# Process each table
|
|
for table in tables:
|
|
table_id = table.get("id")
|
|
table_name = table.get("name", "Unknown Table")
|
|
|
|
if not table_id:
|
|
logger.warning(f"Skipping table without ID: {table}")
|
|
continue
|
|
|
|
logger.info(f"Processing table: {table_name} ({table_id})")
|
|
|
|
# Fetch records
|
|
if start_date_str and end_date_str:
|
|
# Use date filtering if available
|
|
records, records_error = (
|
|
airtable_connector.get_records_by_date_range(
|
|
base_id=base_id,
|
|
table_id=table_id,
|
|
date_field="CREATED_TIME()",
|
|
start_date=start_date_str,
|
|
end_date=end_date_str,
|
|
max_records=max_records,
|
|
)
|
|
)
|
|
|
|
else:
|
|
# Fetch all records
|
|
records, records_error = airtable_connector.get_all_records(
|
|
base_id=base_id,
|
|
table_id=table_id,
|
|
max_records=max_records,
|
|
)
|
|
|
|
if records_error:
|
|
logger.warning(
|
|
f"Failed to fetch records from table {table_name}: {records_error}"
|
|
)
|
|
continue
|
|
|
|
if not records:
|
|
logger.info(f"No records found in table {table_name}")
|
|
continue
|
|
|
|
logger.info(f"Found {len(records)} records in table {table_name}")
|
|
|
|
documents_indexed = 0
|
|
skipped_messages = []
|
|
documents_skipped = 0
|
|
# Process each record
|
|
for record in records:
|
|
try:
|
|
# Generate markdown content
|
|
markdown_content = (
|
|
airtable_connector.format_record_to_markdown(
|
|
record, f"{base_name} - {table_name}"
|
|
)
|
|
)
|
|
|
|
if not markdown_content.strip():
|
|
logger.warning(
|
|
f"Skipping message with no content: {record.get('id')}"
|
|
)
|
|
skipped_messages.append(
|
|
f"{record.get('id')} (no content)"
|
|
)
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Generate content hash
|
|
content_hash = generate_content_hash(
|
|
markdown_content, search_space_id
|
|
)
|
|
|
|
# Check if document already exists
|
|
existing_document_by_hash = (
|
|
await check_duplicate_document_by_hash(
|
|
session, content_hash
|
|
)
|
|
)
|
|
|
|
if existing_document_by_hash:
|
|
logger.info(
|
|
f"Document with content hash {content_hash} already exists for message {record.get('id')}. Skipping processing."
|
|
)
|
|
documents_skipped += 1
|
|
continue
|
|
|
|
# Generate document summary
|
|
user_llm = await get_user_long_context_llm(session, user_id)
|
|
|
|
if user_llm:
|
|
document_metadata = {
|
|
"record_id": record.get("id", "Unknown"),
|
|
"created_time": record.get("CREATED_TIME()", ""),
|
|
"document_type": "Airtable Record",
|
|
"connector_type": "Airtable",
|
|
}
|
|
(
|
|
summary_content,
|
|
summary_embedding,
|
|
) = await generate_document_summary(
|
|
markdown_content, user_llm, document_metadata
|
|
)
|
|
else:
|
|
# Fallback to simple summary if no LLM configured
|
|
summary_content = f"Airtable Record: {record.get('id', 'Unknown')}\n\n"
|
|
summary_embedding = (
|
|
config.embedding_model_instance.embed(
|
|
summary_content
|
|
)
|
|
)
|
|
|
|
# Process chunks
|
|
chunks = await create_document_chunks(markdown_content)
|
|
|
|
# Create and store new document
|
|
logger.info(
|
|
f"Creating new document for Airtable record: {record.get('id', 'Unknown')}"
|
|
)
|
|
document = Document(
|
|
search_space_id=search_space_id,
|
|
title=f"Airtable Record: {record.get('id', 'Unknown')}",
|
|
document_type=DocumentType.AIRTABLE_CONNECTOR,
|
|
document_metadata={
|
|
"record_id": record.get("id", "Unknown"),
|
|
"created_time": record.get("CREATED_TIME()", ""),
|
|
},
|
|
content=summary_content,
|
|
content_hash=content_hash,
|
|
embedding=summary_embedding,
|
|
chunks=chunks,
|
|
)
|
|
|
|
session.add(document)
|
|
documents_indexed += 1
|
|
logger.info(
|
|
f"Successfully indexed new Airtable record {summary_content}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing the Airtable record {record.get('id', 'Unknown')}: {e!s}",
|
|
exc_info=True,
|
|
)
|
|
skipped_messages.append(
|
|
f"{record.get('id', 'Unknown')} (processing error)"
|
|
)
|
|
documents_skipped += 1
|
|
continue # Skip this message and continue with others
|
|
|
|
# Update the last_indexed_at timestamp for the connector only if requested
|
|
total_processed = documents_indexed
|
|
if total_processed > 0:
|
|
await update_connector_last_indexed(
|
|
session, connector, update_last_indexed
|
|
)
|
|
|
|
# Commit all changes
|
|
await session.commit()
|
|
logger.info(
|
|
"Successfully committed all Airtable document changes to database"
|
|
)
|
|
|
|
# Log success
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully completed Airtable indexing for connector {connector_id}",
|
|
{
|
|
"events_processed": total_processed,
|
|
"documents_indexed": documents_indexed,
|
|
"documents_skipped": documents_skipped,
|
|
"skipped_messages_count": len(skipped_messages),
|
|
},
|
|
)
|
|
|
|
logger.info(
|
|
f"Airtable indexing completed: {documents_indexed} new records, {documents_skipped} skipped"
|
|
)
|
|
return (
|
|
total_processed,
|
|
None,
|
|
) # Return None as the error message to indicate success
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Fetching Airtable bases for connector {connector_id} failed: {e!s}",
|
|
exc_info=True,
|
|
)
|
|
|
|
except SQLAlchemyError as db_error:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Database error during Airtable indexing for connector {connector_id}",
|
|
str(db_error),
|
|
{"error_type": "SQLAlchemyError"},
|
|
)
|
|
logger.error(
|
|
f"Database error during Airtable indexing: {db_error!s}", exc_info=True
|
|
)
|
|
return 0, f"Database error: {db_error!s}"
|
|
except Exception as e:
|
|
await session.rollback()
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to index Airtable records for connector {connector_id}",
|
|
str(e),
|
|
{"error_type": type(e).__name__},
|
|
)
|
|
logger.error(f"Error during Airtable indexing: {e!s}", exc_info=True)
|
|
return 0, f"Failed to index Airtable records: {e!s}"
|