update airtable indexer

This commit is contained in:
CREDO23 2025-08-26 19:17:46 +02:00
parent 1e0f3a1067
commit 45d2c18c16
7 changed files with 318 additions and 77 deletions

View file

@ -1,5 +1,7 @@
import asyncio import asyncio
import json import json
import logging
import traceback
from typing import Any from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.messages import HumanMessage, SystemMessage
@ -1196,6 +1198,7 @@ async def fetch_relevant_documents(
) )
except Exception as e: except Exception as e:
logging.error("Error in search_airtable: %s", traceback.format_exc())
error_message = f"Error searching connector {connector}: {e!s}" error_message = f"Error searching connector {connector}: {e!s}"
print(error_message) print(error_message)

View file

@ -305,19 +305,19 @@ class AirtableConnector:
) )
# Create Airtable formula for date filtering # Create Airtable formula for date filtering
filter_formula = ( # filter_formula = (
f"AND(" # f"AND("
f"IS_AFTER({{date_field}}, '{start_date}'), " # f"IS_AFTER({{date_field}}, '{start_date}'), "
f"IS_BEFORE({{date_field}}, '{end_date}')" # f"IS_BEFORE({{date_field}}, '{end_date}')"
f")" # f")"
).replace("{date_field}", date_field) # ).replace("{date_field}", date_field)
# TODO: Investigate how to properly use filter formula
return self.get_all_records( return self.get_all_records(
base_id=base_id, base_id=base_id,
table_id=table_id, table_id=table_id,
max_records=max_records, max_records=max_records,
filter_by_formula=filter_formula, # filter_by_formula=filter_formula,
sort=[{"field": date_field, "direction": "desc"}],
) )
except Exception as e: except Exception as e:
@ -338,7 +338,7 @@ class AirtableConnector:
""" """
record_id = record.get("id", "Unknown") record_id = record.get("id", "Unknown")
fields = record.get("fields", {}) fields = record.get("fields", {})
created_time = record.get("createdTime", "") created_time = record.get("CREATED_TIME()", "")
markdown_parts = [] markdown_parts = []

View file

@ -36,6 +36,8 @@ TOKEN_URL = "https://airtable.com/oauth2/v1/token"
SCOPES = [ SCOPES = [
"data.records:read", "data.records:read",
"data.recordComments:read", "data.recordComments:read",
"schema.bases:read",
"user.email:read",
] ]

View file

@ -36,6 +36,7 @@ from app.schemas import (
SearchSourceConnectorUpdate, SearchSourceConnectorUpdate,
) )
from app.tasks.connector_indexers import ( from app.tasks.connector_indexers import (
index_airtable_records,
index_clickup_tasks, index_clickup_tasks,
index_confluence_pages, index_confluence_pages,
index_discord_messages, index_discord_messages,
@ -508,6 +509,20 @@ async def index_connector_content(
indexing_to, indexing_to,
) )
response_message = "Google Calendar indexing started in the background." response_message = "Google Calendar indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.AIRTABLE_CONNECTOR:
# Run indexing in background
logger.info(
f"Triggering Airtable indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
background_tasks.add_task(
run_airtable_indexing_with_new_session,
connector_id,
search_space_id,
str(user.id),
indexing_from,
indexing_to,
)
response_message = "Airtable indexing started in the background."
elif ( elif (
connector.connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR connector.connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR
): ):
@ -1072,6 +1087,64 @@ async def run_clickup_indexing(
# Optionally update status in DB to indicate failure # Optionally update status in DB to indicate failure
# Add new helper functions for Airtable indexing
async def run_airtable_indexing_with_new_session(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Wrapper to run Airtable indexing with its own database session."""
logger.info(
f"Background task started: Indexing Airtable connector {connector_id} into space {search_space_id} from {start_date} to {end_date}"
)
async with async_session_maker() as session:
await run_airtable_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
logger.info(f"Background task finished: Indexing Airtable connector {connector_id}")
async def run_airtable_indexing(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Runs the Airtable indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_airtable_records(
session,
connector_id,
search_space_id,
user_id,
start_date,
end_date,
update_last_indexed=False,
)
if error_message:
logger.error(
f"Airtable indexing failed for connector {connector_id}: {error_message}"
)
# Optionally update status in DB to indicate failure
else:
logger.info(
f"Airtable indexing successful for connector {connector_id}. Indexed {indexed_count} records."
)
# Update the last indexed timestamp only on success
await update_connector_last_indexed(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_airtable_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
# Add new helper functions for Google Calendar indexing # Add new helper functions for Google Calendar indexing
async def run_google_calendar_indexing_with_new_session( async def run_google_calendar_indexing_with_new_session(
connector_id: int, connector_id: int,

View file

@ -1,6 +1,6 @@
from datetime import UTC, datetime from datetime import UTC, datetime
from pydantic import BaseModel from pydantic import BaseModel, field_validator
class AirtableAuthCredentialsBase(BaseModel): class AirtableAuthCredentialsBase(BaseModel):
@ -49,3 +49,18 @@ class AirtableAuthCredentialsBase(BaseModel):
expires_at=expires_at, expires_at=expires_at,
scope=data.get("scope"), scope=data.get("scope"),
) )
@field_validator("expires_at", mode="before")
@classmethod
def ensure_aware_utc(cls, v):
# Strings like "2025-08-26T14:46:57.367184"
if isinstance(v, str):
# add +00:00 if missing tz info
if v.endswith("Z"):
return datetime.fromisoformat(v.replace("Z", "+00:00"))
dt = datetime.fromisoformat(v)
return dt if dt.tzinfo else dt.replace(tzinfo=UTC)
# datetime objects
if isinstance(v, datetime):
return v if v.tzinfo else v.replace(tzinfo=UTC)
return v

View file

@ -1209,6 +1209,94 @@ class ConnectorService:
return result_object, calendar_chunks return result_object, calendar_chunks
async def search_airtable(
self,
user_query: str,
user_id: str,
search_space_id: int,
top_k: int = 20,
search_mode: SearchMode = SearchMode.CHUNKS,
) -> tuple:
"""
Search for Airtable records and return both the source information and langchain documents
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
search_mode: Search mode (CHUNKS or DOCUMENTS)
Returns:
tuple: (sources_info, langchain_documents)
"""
if search_mode == SearchMode.CHUNKS:
airtable_chunks = await self.chunk_retriever.hybrid_search(
query_text=user_query,
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
document_type="AIRTABLE_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
airtable_chunks = await self.document_retriever.hybrid_search(
query_text=user_query,
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
document_type="AIRTABLE_CONNECTOR",
)
# Transform document retriever results to match expected format
airtable_chunks = self._transform_document_results(airtable_chunks)
# Early return if no results
if not airtable_chunks:
return {
"id": 32,
"name": "Airtable Records",
"type": "AIRTABLE_CONNECTOR",
"sources": [],
}, []
# Process chunks to create sources
sources_list = []
async with self.counter_lock:
for _i, chunk in enumerate(airtable_chunks):
# Extract document metadata
document = chunk.get("document", {})
metadata = document.get("metadata", {})
# Extract Airtable-specific metadata
record_id = metadata.get("record_id", "")
created_time = metadata.get("created_time", "")
# Create a more descriptive title for Airtable records
title = f"Airtable Record: {record_id}"
# Create a more descriptive description for Airtable records
description = f"Created: {created_time}"
source = {
"id": chunk.get("chunk_id", self.source_id_counter),
"title": title,
"description": description,
"url": "", # TODO: Add URL to Airtable record
"record_id": record_id,
"created_time": created_time,
}
self.source_id_counter += 1
sources_list.append(source)
result_object = {
"id": 32,
"name": "Airtable Records",
"type": "AIRTABLE_CONNECTOR",
"sources": sources_list,
}
return result_object, airtable_chunks
async def search_google_gmail( async def search_google_gmail(
self, self,
user_query: str, user_query: str,

View file

@ -5,6 +5,7 @@ Airtable connector indexer.
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.airtable_connector import AirtableConnector from app.connectors.airtable_connector import AirtableConnector
from app.db import Document, DocumentType, SearchSourceConnectorType from app.db import Document, DocumentType, SearchSourceConnectorType
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
@ -28,6 +29,8 @@ from .base import (
async def index_airtable_records( async def index_airtable_records(
session: AsyncSession, session: AsyncSession,
connector_id: int, connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None, start_date: str | None = None,
end_date: str | None = None, end_date: str | None = None,
max_records: int = 2500, max_records: int = 2500,
@ -39,6 +42,8 @@ async def index_airtable_records(
Args: Args:
session: Database session session: Database session
connector_id: ID of the Airtable connector connector_id: ID of the Airtable connector
search_space_id: ID of the search space to store documents in
user_id: ID of the user
start_date: Start date for filtering records (YYYY-MM-DD) start_date: Start date for filtering records (YYYY-MM-DD)
end_date: End date for filtering records (YYYY-MM-DD) end_date: End date for filtering records (YYYY-MM-DD)
max_records: Maximum number of records to fetch per table max_records: Maximum number of records to fetch per table
@ -47,11 +52,14 @@ async def index_airtable_records(
Returns: Returns:
Tuple of (number_of_documents_processed, error_message) Tuple of (number_of_documents_processed, error_message)
""" """
task_logger = TaskLoggingService(session) task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.create_task_log( log_entry = await task_logger.log_task_start(
task_name="index_airtable_records", task_name="airtable_indexing",
task_params={ source="connector_indexing_task",
message=f"Starting Airtable indexing for connector {connector_id}",
metadata={
"connector_id": connector_id, "connector_id": connector_id,
"user_id": str(user_id),
"start_date": start_date, "start_date": start_date,
"end_date": end_date, "end_date": end_date,
"max_records": max_records, "max_records": max_records,
@ -178,12 +186,13 @@ async def index_airtable_records(
airtable_connector.get_records_by_date_range( airtable_connector.get_records_by_date_range(
base_id=base_id, base_id=base_id,
table_id=table_id, table_id=table_id,
date_field="createdTime", date_field="CREATED_TIME()",
start_date=start_date_str, start_date=start_date_str,
end_date=end_date_str, end_date=end_date_str,
max_records=max_records, max_records=max_records,
) )
) )
else: else:
# Fetch all records # Fetch all records
records, records_error = airtable_connector.get_all_records( records, records_error = airtable_connector.get_all_records(
@ -204,6 +213,9 @@ async def index_airtable_records(
logger.info(f"Found {len(records)} records in table {table_name}") logger.info(f"Found {len(records)} records in table {table_name}")
documents_indexed = 0
skipped_messages = []
documents_skipped = 0
# Process each record # Process each record
for record in records: for record in records:
try: try:
@ -214,89 +226,137 @@ async def index_airtable_records(
) )
) )
# Generate content hash if not markdown_content.strip():
content_hash = generate_content_hash(markdown_content) logger.warning(
f"Skipping message with no content: {record.get('id')}"
# Check for duplicates
existing_doc = await check_duplicate_document_by_hash(
session, content_hash
)
if existing_doc:
logger.debug(
f"Skipping duplicate record {record.get('id')}"
) )
skipped_messages.append(
f"{record.get('id')} (no content)"
)
documents_skipped += 1
continue
# Generate content hash
content_hash = generate_content_hash(
markdown_content, search_space_id
)
# Check if document already exists
existing_document_by_hash = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for message {record.get('id')}. Skipping processing."
)
documents_skipped += 1
continue continue
# Generate document summary # Generate document summary
llm = get_user_long_context_llm(connector.user_id) user_llm = await get_user_long_context_llm(session, user_id)
summary = await generate_document_summary(
markdown_content, llm
)
# Create document if user_llm:
document_metadata = {
"record_id": record.get("id", "Unknown"),
"created_time": record.get("CREATED_TIME()", ""),
"document_type": "Airtable Record",
"connector_type": "Airtable",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Airtable Record: {record.get('id', 'Unknown')}\n\n"
summary_embedding = (
config.embedding_model_instance.embed(
summary_content
)
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Create and store new document
logger.info(
f"Creating new document for Airtable record: {record.get('id', 'Unknown')}"
)
document = Document( document = Document(
title=f"{base_name} - {table_name} - Record {record.get('id', 'Unknown')}", search_space_id=search_space_id,
content=markdown_content, title=f"Airtable Record: {record.get('id', 'Unknown')}",
content_hash=content_hash,
summary=summary,
document_type=DocumentType.AIRTABLE_CONNECTOR, document_type=DocumentType.AIRTABLE_CONNECTOR,
source_url=f"https://airtable.com/{base_id}/{table_id}", document_metadata={
metadata={ "record_id": record.get("id", "Unknown"),
"base_id": base_id, "created_time": record.get("CREATED_TIME()", ""),
"base_name": base_name,
"table_id": table_id,
"table_name": table_name,
"record_id": record.get("id"),
"created_time": record.get("createdTime"),
"connector_id": connector_id,
}, },
user_id=connector.user_id, content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks,
) )
session.add(document) session.add(document)
await session.flush() documents_indexed += 1
logger.info(
# Create document chunks f"Successfully indexed new Airtable record {summary_content}"
await create_document_chunks(
session, document, markdown_content, llm
)
total_processed += 1
logger.debug(
f"Processed record {record.get('id')} from {table_name}"
) )
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Error processing record {record.get('id')}: {e!s}" f"Error processing the Airtable record {record.get('id', 'Unknown')}: {e!s}",
exc_info=True,
) )
continue skipped_messages.append(
f"{record.get('id', 'Unknown')} (processing error)"
)
documents_skipped += 1
continue # Skip this message and continue with others
# Update last indexed timestamp # Update the last_indexed_at timestamp for the connector only if requested
if update_last_indexed: total_processed = documents_indexed
await update_connector_last_indexed( if total_processed > 0:
session, connector, update_last_indexed await update_connector_last_indexed(
) session, connector, update_last_indexed
)
await session.commit() # Commit all changes
await session.commit()
logger.info(
"Successfully committed all Airtable document changes to database"
)
success_msg = f"Successfully indexed {total_processed} Airtable records" # Log success
await task_logger.log_task_success( await task_logger.log_task_success(
log_entry, log_entry,
success_msg, f"Successfully completed Airtable indexing for connector {connector_id}",
{ {
"records_processed": total_processed, "events_processed": total_processed,
"bases_count": len(bases), "documents_indexed": documents_indexed,
"date_range": f"{start_date_str} to {end_date_str}", "documents_skipped": documents_skipped,
}, "skipped_messages_count": len(skipped_messages),
},
)
logger.info(
f"Airtable indexing completed: {documents_indexed} new records, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except Exception as e:
logger.error(
f"Fetching Airtable bases for connector {connector_id} failed: {e!s}",
exc_info=True,
) )
logger.info(success_msg)
return total_processed, None
finally:
airtable_connector.close()
except SQLAlchemyError as db_error: except SQLAlchemyError as db_error:
await session.rollback() await session.rollback()
await task_logger.log_task_failure( await task_logger.log_task_failure(