diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 288c7a4..244dd49 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -1,5 +1,7 @@ import asyncio import json +import logging +import traceback from typing import Any from langchain_core.messages import HumanMessage, SystemMessage @@ -1196,6 +1198,7 @@ async def fetch_relevant_documents( ) except Exception as e: + logging.error("Error in search_airtable: %s", traceback.format_exc()) error_message = f"Error searching connector {connector}: {e!s}" print(error_message) diff --git a/surfsense_backend/app/connectors/airtable_connector.py b/surfsense_backend/app/connectors/airtable_connector.py index 48c2f9a..840b227 100644 --- a/surfsense_backend/app/connectors/airtable_connector.py +++ b/surfsense_backend/app/connectors/airtable_connector.py @@ -305,19 +305,19 @@ class AirtableConnector: ) # Create Airtable formula for date filtering - filter_formula = ( - f"AND(" - f"IS_AFTER({{date_field}}, '{start_date}'), " - f"IS_BEFORE({{date_field}}, '{end_date}')" - f")" - ).replace("{date_field}", date_field) + # filter_formula = ( + # f"AND(" + # f"IS_AFTER({{date_field}}, '{start_date}'), " + # f"IS_BEFORE({{date_field}}, '{end_date}')" + # f")" + # ).replace("{date_field}", date_field) + # TODO: Investigate how to properly use filter formula return self.get_all_records( base_id=base_id, table_id=table_id, max_records=max_records, - filter_by_formula=filter_formula, - sort=[{"field": date_field, "direction": "desc"}], + # filter_by_formula=filter_formula, ) except Exception as e: @@ -338,7 +338,7 @@ class AirtableConnector: """ record_id = record.get("id", "Unknown") fields = record.get("fields", {}) - created_time = record.get("createdTime", "") + created_time = record.get("CREATED_TIME()", "") markdown_parts = [] diff --git a/surfsense_backend/app/routes/airtable_add_connector_route.py b/surfsense_backend/app/routes/airtable_add_connector_route.py index ee446e9..a31b5e5 100644 --- a/surfsense_backend/app/routes/airtable_add_connector_route.py +++ b/surfsense_backend/app/routes/airtable_add_connector_route.py @@ -36,6 +36,8 @@ TOKEN_URL = "https://airtable.com/oauth2/v1/token" SCOPES = [ "data.records:read", "data.recordComments:read", + "schema.bases:read", + "user.email:read", ] diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index d1f6108..6ea3929 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -36,6 +36,7 @@ from app.schemas import ( SearchSourceConnectorUpdate, ) from app.tasks.connector_indexers import ( + index_airtable_records, index_clickup_tasks, index_confluence_pages, index_discord_messages, @@ -508,6 +509,20 @@ async def index_connector_content( indexing_to, ) response_message = "Google Calendar indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.AIRTABLE_CONNECTOR: + # Run indexing in background + logger.info( + f"Triggering Airtable indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_airtable_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) + response_message = "Airtable indexing started in the background." elif ( connector.connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR ): @@ -1072,6 +1087,64 @@ async def run_clickup_indexing( # Optionally update status in DB to indicate failure +# Add new helper functions for Airtable indexing +async def run_airtable_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Wrapper to run Airtable indexing with its own database session.""" + logger.info( + f"Background task started: Indexing Airtable connector {connector_id} into space {search_space_id} from {start_date} to {end_date}" + ) + async with async_session_maker() as session: + await run_airtable_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + logger.info(f"Background task finished: Indexing Airtable connector {connector_id}") + + +async def run_airtable_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Runs the Airtable indexing task and updates the timestamp.""" + try: + indexed_count, error_message = await index_airtable_records( + session, + connector_id, + search_space_id, + user_id, + start_date, + end_date, + update_last_indexed=False, + ) + if error_message: + logger.error( + f"Airtable indexing failed for connector {connector_id}: {error_message}" + ) + # Optionally update status in DB to indicate failure + else: + logger.info( + f"Airtable indexing successful for connector {connector_id}. Indexed {indexed_count} records." + ) + # Update the last indexed timestamp only on success + await update_connector_last_indexed(session, connector_id) + await session.commit() # Commit timestamp update + except Exception as e: + logger.error( + f"Critical error in run_airtable_indexing for connector {connector_id}: {e}", + exc_info=True, + ) + # Optionally update status in DB to indicate failure + + # Add new helper functions for Google Calendar indexing async def run_google_calendar_indexing_with_new_session( connector_id: int, diff --git a/surfsense_backend/app/schemas/airtable_auth_credentials.py b/surfsense_backend/app/schemas/airtable_auth_credentials.py index 9fcf2c9..586e99e 100644 --- a/surfsense_backend/app/schemas/airtable_auth_credentials.py +++ b/surfsense_backend/app/schemas/airtable_auth_credentials.py @@ -1,6 +1,6 @@ from datetime import UTC, datetime -from pydantic import BaseModel +from pydantic import BaseModel, field_validator class AirtableAuthCredentialsBase(BaseModel): @@ -49,3 +49,18 @@ class AirtableAuthCredentialsBase(BaseModel): expires_at=expires_at, scope=data.get("scope"), ) + + @field_validator("expires_at", mode="before") + @classmethod + def ensure_aware_utc(cls, v): + # Strings like "2025-08-26T14:46:57.367184" + if isinstance(v, str): + # add +00:00 if missing tz info + if v.endswith("Z"): + return datetime.fromisoformat(v.replace("Z", "+00:00")) + dt = datetime.fromisoformat(v) + return dt if dt.tzinfo else dt.replace(tzinfo=UTC) + # datetime objects + if isinstance(v, datetime): + return v if v.tzinfo else v.replace(tzinfo=UTC) + return v diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index c2e3669..a6a75ba 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -1209,6 +1209,94 @@ class ConnectorService: return result_object, calendar_chunks + async def search_airtable( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: + """ + Search for Airtable records and return both the source information and langchain documents + + Args: + user_query: The user's query + user_id: The user's ID + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + search_mode: Search mode (CHUNKS or DOCUMENTS) + + Returns: + tuple: (sources_info, langchain_documents) + """ + if search_mode == SearchMode.CHUNKS: + airtable_chunks = await self.chunk_retriever.hybrid_search( + query_text=user_query, + top_k=top_k, + user_id=user_id, + search_space_id=search_space_id, + document_type="AIRTABLE_CONNECTOR", + ) + elif search_mode == SearchMode.DOCUMENTS: + airtable_chunks = await self.document_retriever.hybrid_search( + query_text=user_query, + top_k=top_k, + user_id=user_id, + search_space_id=search_space_id, + document_type="AIRTABLE_CONNECTOR", + ) + # Transform document retriever results to match expected format + airtable_chunks = self._transform_document_results(airtable_chunks) + + # Early return if no results + if not airtable_chunks: + return { + "id": 32, + "name": "Airtable Records", + "type": "AIRTABLE_CONNECTOR", + "sources": [], + }, [] + + # Process chunks to create sources + sources_list = [] + async with self.counter_lock: + for _i, chunk in enumerate(airtable_chunks): + # Extract document metadata + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) + + # Extract Airtable-specific metadata + record_id = metadata.get("record_id", "") + created_time = metadata.get("created_time", "") + + # Create a more descriptive title for Airtable records + title = f"Airtable Record: {record_id}" + + # Create a more descriptive description for Airtable records + description = f"Created: {created_time}" + + source = { + "id": chunk.get("chunk_id", self.source_id_counter), + "title": title, + "description": description, + "url": "", # TODO: Add URL to Airtable record + "record_id": record_id, + "created_time": created_time, + } + + self.source_id_counter += 1 + sources_list.append(source) + + result_object = { + "id": 32, + "name": "Airtable Records", + "type": "AIRTABLE_CONNECTOR", + "sources": sources_list, + } + + return result_object, airtable_chunks + async def search_google_gmail( self, user_query: str, diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 1d58797..9ba2c2b 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -5,6 +5,7 @@ Airtable connector indexer. from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession +from app.config import config from app.connectors.airtable_connector import AirtableConnector from app.db import Document, DocumentType, SearchSourceConnectorType from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase @@ -28,6 +29,8 @@ from .base import ( async def index_airtable_records( session: AsyncSession, connector_id: int, + search_space_id: int, + user_id: str, start_date: str | None = None, end_date: str | None = None, max_records: int = 2500, @@ -39,6 +42,8 @@ async def index_airtable_records( Args: session: Database session connector_id: ID of the Airtable connector + search_space_id: ID of the search space to store documents in + user_id: ID of the user start_date: Start date for filtering records (YYYY-MM-DD) end_date: End date for filtering records (YYYY-MM-DD) max_records: Maximum number of records to fetch per table @@ -47,11 +52,14 @@ async def index_airtable_records( Returns: Tuple of (number_of_documents_processed, error_message) """ - task_logger = TaskLoggingService(session) - log_entry = await task_logger.create_task_log( - task_name="index_airtable_records", - task_params={ + task_logger = TaskLoggingService(session, search_space_id) + log_entry = await task_logger.log_task_start( + task_name="airtable_indexing", + source="connector_indexing_task", + message=f"Starting Airtable indexing for connector {connector_id}", + metadata={ "connector_id": connector_id, + "user_id": str(user_id), "start_date": start_date, "end_date": end_date, "max_records": max_records, @@ -178,12 +186,13 @@ async def index_airtable_records( airtable_connector.get_records_by_date_range( base_id=base_id, table_id=table_id, - date_field="createdTime", + date_field="CREATED_TIME()", start_date=start_date_str, end_date=end_date_str, max_records=max_records, ) ) + else: # Fetch all records records, records_error = airtable_connector.get_all_records( @@ -204,6 +213,9 @@ async def index_airtable_records( logger.info(f"Found {len(records)} records in table {table_name}") + documents_indexed = 0 + skipped_messages = [] + documents_skipped = 0 # Process each record for record in records: try: @@ -214,89 +226,137 @@ async def index_airtable_records( ) ) - # Generate content hash - content_hash = generate_content_hash(markdown_content) - - # Check for duplicates - existing_doc = await check_duplicate_document_by_hash( - session, content_hash - ) - if existing_doc: - logger.debug( - f"Skipping duplicate record {record.get('id')}" + if not markdown_content.strip(): + logger.warning( + f"Skipping message with no content: {record.get('id')}" ) + skipped_messages.append( + f"{record.get('id')} (no content)" + ) + documents_skipped += 1 + continue + + # Generate content hash + content_hash = generate_content_hash( + markdown_content, search_space_id + ) + + # Check if document already exists + existing_document_by_hash = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if existing_document_by_hash: + logger.info( + f"Document with content hash {content_hash} already exists for message {record.get('id')}. Skipping processing." + ) + documents_skipped += 1 continue # Generate document summary - llm = get_user_long_context_llm(connector.user_id) - summary = await generate_document_summary( - markdown_content, llm - ) + user_llm = await get_user_long_context_llm(session, user_id) - # Create document + if user_llm: + document_metadata = { + "record_id": record.get("id", "Unknown"), + "created_time": record.get("CREATED_TIME()", ""), + "document_type": "Airtable Record", + "connector_type": "Airtable", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Airtable Record: {record.get('id', 'Unknown')}\n\n" + summary_embedding = ( + config.embedding_model_instance.embed( + summary_content + ) + ) + + # Process chunks + chunks = await create_document_chunks(markdown_content) + + # Create and store new document + logger.info( + f"Creating new document for Airtable record: {record.get('id', 'Unknown')}" + ) document = Document( - title=f"{base_name} - {table_name} - Record {record.get('id', 'Unknown')}", - content=markdown_content, - content_hash=content_hash, - summary=summary, + search_space_id=search_space_id, + title=f"Airtable Record: {record.get('id', 'Unknown')}", document_type=DocumentType.AIRTABLE_CONNECTOR, - source_url=f"https://airtable.com/{base_id}/{table_id}", - metadata={ - "base_id": base_id, - "base_name": base_name, - "table_id": table_id, - "table_name": table_name, - "record_id": record.get("id"), - "created_time": record.get("createdTime"), - "connector_id": connector_id, + document_metadata={ + "record_id": record.get("id", "Unknown"), + "created_time": record.get("CREATED_TIME()", ""), }, - user_id=connector.user_id, + content=summary_content, + content_hash=content_hash, + embedding=summary_embedding, + chunks=chunks, ) session.add(document) - await session.flush() - - # Create document chunks - await create_document_chunks( - session, document, markdown_content, llm - ) - - total_processed += 1 - logger.debug( - f"Processed record {record.get('id')} from {table_name}" + documents_indexed += 1 + logger.info( + f"Successfully indexed new Airtable record {summary_content}" ) except Exception as e: logger.error( - f"Error processing record {record.get('id')}: {e!s}" + f"Error processing the Airtable record {record.get('id', 'Unknown')}: {e!s}", + exc_info=True, ) - continue + skipped_messages.append( + f"{record.get('id', 'Unknown')} (processing error)" + ) + documents_skipped += 1 + continue # Skip this message and continue with others - # Update last indexed timestamp - if update_last_indexed: - await update_connector_last_indexed( - session, connector, update_last_indexed - ) + # Update the last_indexed_at timestamp for the connector only if requested + total_processed = documents_indexed + if total_processed > 0: + await update_connector_last_indexed( + session, connector, update_last_indexed + ) - await session.commit() + # Commit all changes + await session.commit() + logger.info( + "Successfully committed all Airtable document changes to database" + ) - success_msg = f"Successfully indexed {total_processed} Airtable records" - await task_logger.log_task_success( - log_entry, - success_msg, - { - "records_processed": total_processed, - "bases_count": len(bases), - "date_range": f"{start_date_str} to {end_date_str}", - }, + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Airtable indexing for connector {connector_id}", + { + "events_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_messages_count": len(skipped_messages), + }, + ) + + logger.info( + f"Airtable indexing completed: {documents_indexed} new records, {documents_skipped} skipped" + ) + return ( + total_processed, + None, + ) # Return None as the error message to indicate success + + except Exception as e: + logger.error( + f"Fetching Airtable bases for connector {connector_id} failed: {e!s}", + exc_info=True, ) - logger.info(success_msg) - return total_processed, None - - finally: - airtable_connector.close() - except SQLAlchemyError as db_error: await session.rollback() await task_logger.log_task_failure(