diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 5d4417f..1f2b897 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -11,6 +11,11 @@ GOOGLE_OAUTH_CLIENT_SECRET=GOCSV GOOGLE_CALENDAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/calendar/connector/callback GOOGLE_GMAIL_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/gmail/connector/callback +# Airtable OAuth +AIRTABLE_CLIENT_ID=your_airtable_client_id +AIRTABLE_CLIENT_SECRET=your_airtable_client_secret +AIRTABLE_REDIRECT_URI=http://localhost:8000/api/v1/auth/airtable/connector/callback + # Embedding Model EMBEDDING_MODEL=mixedbread-ai/mxbai-embed-large-v1 diff --git a/surfsense_backend/alembic/versions/19_add_airtable_connector_enums.py b/surfsense_backend/alembic/versions/19_add_airtable_connector_enums.py new file mode 100644 index 0000000..ee0b0b0 --- /dev/null +++ b/surfsense_backend/alembic/versions/19_add_airtable_connector_enums.py @@ -0,0 +1,55 @@ +"""Add AIRTABLE_CONNECTOR to enums + +Revision ID: 19 +Revises: 18 +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "19" +down_revision = "18" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Upgrade schema - add AIRTABLE_CONNECTOR to enums.""" + # Add to searchsourceconnectortype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'AIRTABLE_CONNECTOR' + ) THEN + ALTER TYPE searchsourceconnectortype ADD VALUE 'AIRTABLE_CONNECTOR'; + END IF; + END + $$; + """ + ) + + # Add to documenttype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'AIRTABLE_CONNECTOR' + ) THEN + ALTER TYPE documenttype ADD VALUE 'AIRTABLE_CONNECTOR'; + END IF; + END + $$; + """ + ) + + +def downgrade() -> None: + """Downgrade schema - remove AIRTABLE_CONNECTOR from enums.""" + pass diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 5f153d6..244dd49 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -1,5 +1,7 @@ import asyncio import json +import logging +import traceback from typing import Any from langchain_core.messages import HumanMessage, SystemMessage @@ -369,6 +371,30 @@ async def fetch_documents_by_ids( except Exception: title += f" ({start_time})" + elif doc_type == "AIRTABLE_CONNECTOR": + # Extract Airtable-specific metadata + base_name = metadata.get("base_name", "Unknown Base") + table_name = metadata.get("table_name", "Unknown Table") + record_id = metadata.get("record_id", "Unknown Record") + created_time = metadata.get("created_time", "") + + title = f"Airtable: {base_name} - {table_name}" + if record_id: + title += f" (Record: {record_id[:8]}...)" + if created_time: + # Format the created time for display + try: + if "T" in created_time: + from datetime import datetime + + created_dt = datetime.fromisoformat( + created_time.replace("Z", "+00:00") + ) + formatted_time = created_dt.strftime("%Y-%m-%d %H:%M") + title += f" - {formatted_time}" + except Exception: + pass + description = ( doc.content[:100] + "..." if len(doc.content) > 100 @@ -456,6 +482,11 @@ async def fetch_documents_by_ids( "EXTENSION": "Browser Extension (Selected)", "CRAWLED_URL": "Web Pages (Selected)", "FILE": "Files (Selected)", + "GOOGLE_CALENDAR_CONNECTOR": "Google Calendar (Selected)", + "GOOGLE_GMAIL_CONNECTOR": "Google Gmail (Selected)", + "CONFLUENCE_CONNECTOR": "Confluence (Selected)", + "CLICKUP_CONNECTOR": "ClickUp (Selected)", + "AIRTABLE_CONNECTOR": "Airtable (Selected)", } source_object = { @@ -1061,6 +1092,32 @@ async def fetch_relevant_documents( ) } ) + elif connector == "AIRTABLE_CONNECTOR": + ( + source_object, + airtable_chunks, + ) = await connector_service.search_airtable( + user_query=reformulated_query, + user_id=user_id, + search_space_id=search_space_id, + top_k=top_k, + search_mode=search_mode, + ) + + # Add to sources and raw documents + if source_object: + all_sources.append(source_object) + all_raw_documents.extend(airtable_chunks) + + # Stream found document count + if streaming_service and writer: + writer( + { + "yield_value": streaming_service.format_terminal_info_delta( + f"🗃️ Found {len(airtable_chunks)} Airtable records related to your query" + ) + } + ) elif connector == "GOOGLE_GMAIL_CONNECTOR": ( source_object, @@ -1141,6 +1198,7 @@ async def fetch_relevant_documents( ) except Exception as e: + logging.error("Error in search_airtable: %s", traceback.format_exc()) error_message = f"Error searching connector {connector}: {e!s}" print(error_message) diff --git a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py index 1a4bf27..c0bf991 100644 --- a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py +++ b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py @@ -35,6 +35,7 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel - GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management) - GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications) - DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications) +- AIRTABLE_CONNECTOR: "Airtable records, tables, and database content" (personal data management and organization) - TAVILY_API: "Tavily search API results" (personalized search results) - LINKUP_API: "Linkup search API results" (personalized search results) diff --git a/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py b/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py index 577010b..5b205bd 100644 --- a/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py +++ b/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py @@ -35,6 +35,7 @@ You are SurfSense, an advanced AI research assistant that synthesizes informatio - GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management) - GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications) - DISCORD_CONNECTOR: "Discord server messages and channels" (personal community interactions) +- AIRTABLE_CONNECTOR: "Airtable records, tables, and database content" (personal data management and organization) - TAVILY_API: "Tavily search API results" (personalized search results) - LINKUP_API: "Linkup search API results" (personalized search results) diff --git a/surfsense_backend/app/agents/researcher/utils.py b/surfsense_backend/app/agents/researcher/utils.py index f3ccc28..9ff3476 100644 --- a/surfsense_backend/app/agents/researcher/utils.py +++ b/surfsense_backend/app/agents/researcher/utils.py @@ -49,6 +49,7 @@ def get_connector_emoji(connector_name: str) -> str: "TAVILY_API": "🔍", "LINKUP_API": "🔗", "GOOGLE_CALENDAR_CONNECTOR": "📅", + "AIRTABLE_CONNECTOR": "🗃️", } return connector_emojis.get(connector_name, "🔎") @@ -70,6 +71,7 @@ def get_connector_friendly_name(connector_name: str) -> str: "DISCORD_CONNECTOR": "Discord", "TAVILY_API": "Tavily Search", "LINKUP_API": "Linkup Search", + "AIRTABLE_CONNECTOR": "Airtable", } return connector_friendly_names.get(connector_name, connector_name) diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 38ae616..6a30839 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -54,6 +54,11 @@ class Config: # Google Gmail redirect URI GOOGLE_GMAIL_REDIRECT_URI = os.getenv("GOOGLE_GMAIL_REDIRECT_URI") + # Airtable OAuth + AIRTABLE_CLIENT_ID = os.getenv("AIRTABLE_CLIENT_ID") + AIRTABLE_CLIENT_SECRET = os.getenv("AIRTABLE_CLIENT_SECRET") + AIRTABLE_REDIRECT_URI = os.getenv("AIRTABLE_REDIRECT_URI") + # LLM instances are now managed per-user through the LLMConfig system # Legacy environment variables removed in favor of user-specific configurations diff --git a/surfsense_backend/app/connectors/airtable_connector.py b/surfsense_backend/app/connectors/airtable_connector.py new file mode 100644 index 0000000..840b227 --- /dev/null +++ b/surfsense_backend/app/connectors/airtable_connector.py @@ -0,0 +1,384 @@ +""" +Airtable connector for fetching records from Airtable bases. +""" + +import json +import logging +import time +from typing import Any + +import httpx +from dateutil.parser import isoparse + +from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase + +logger = logging.getLogger(__name__) + + +class AirtableConnector: + """ + Connector for interacting with Airtable API using OAuth 2.0 credentials. + """ + + def __init__(self, credentials: AirtableAuthCredentialsBase): + """ + Initialize the AirtableConnector with OAuth credentials. + + Args: + credentials: Airtable OAuth credentials + """ + self.credentials = credentials + self.base_url = "https://api.airtable.com/v0" + self._client = None + + def _get_client(self) -> httpx.Client: + """ + Get or create an HTTP client with proper authentication headers. + + Returns: + Configured httpx.Client instance + """ + if self._client is None: + headers = { + "Authorization": f"Bearer {self.credentials.access_token}", + "Content-Type": "application/json", + } + self._client = httpx.Client( + headers=headers, + timeout=30.0, + follow_redirects=True, + ) + return self._client + + def _make_request( + self, method: str, url: str, **kwargs + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Make an HTTP request with error handling and retry logic. + + Args: + method: HTTP method (GET, POST, etc.) + url: Request URL + **kwargs: Additional arguments for the request + + Returns: + Tuple of (response_data, error_message) + """ + client = self._get_client() + max_retries = 3 + retry_delay = 1 + + for attempt in range(max_retries): + try: + response = client.request(method, url, **kwargs) + + if response.status_code == 429: + # Rate limited - wait and retry + retry_after = int(response.headers.get("Retry-After", retry_delay)) + logger.warning( + f"Rate limited by Airtable API. Waiting {retry_after} seconds. " + f"Attempt {attempt + 1}/{max_retries}" + ) + time.sleep(retry_after) + retry_delay *= 2 + continue + + if response.status_code == 401: + return None, "Authentication failed. Please check your credentials." + + if response.status_code == 403: + return ( + None, + "Access forbidden. Please check your permissions and scopes.", + ) + + if response.status_code >= 400: + error_detail = response.text + try: + error_json = response.json() + error_detail = error_json.get("error", {}).get( + "message", error_detail + ) + except Exception: + pass + return None, f"API error {response.status_code}: {error_detail}" + + return response.json(), None + + except httpx.TimeoutException: + if attempt == max_retries - 1: + return None, "Request timeout. Please try again later." + logger.warning( + f"Request timeout. Retrying... Attempt {attempt + 1}/{max_retries}" + ) + time.sleep(retry_delay) + retry_delay *= 2 + + except Exception as e: + if attempt == max_retries - 1: + return None, f"Request failed: {e!s}" + logger.warning( + f"Request failed: {e!s}. Retrying... Attempt {attempt + 1}/{max_retries}" + ) + time.sleep(retry_delay) + retry_delay *= 2 + + return None, "Max retries exceeded" + + def get_bases(self) -> tuple[list[dict[str, Any]], str | None]: + """ + Get list of accessible bases. + + Returns: + Tuple of (bases_list, error_message) + """ + url = f"{self.base_url}/meta/bases" + response_data, error = self._make_request("GET", url) + + if error: + return [], error + + if not response_data or "bases" not in response_data: + return [], "No bases found in response" + + return response_data["bases"], None + + def get_base_schema(self, base_id: str) -> tuple[dict[str, Any] | None, str | None]: + """ + Get schema information for a specific base. + + Args: + base_id: The base ID + + Returns: + Tuple of (schema_data, error_message) + """ + url = f"{self.base_url}/meta/bases/{base_id}/tables" + return self._make_request("GET", url) + + def get_records( + self, + base_id: str, + table_id: str, + max_records: int = 100, + offset: str | None = None, + filter_by_formula: str | None = None, + sort: list[dict[str, str]] | None = None, + fields: list[str] | None = None, + ) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + Get records from a specific table in a base. + + Args: + base_id: The base ID + table_id: The table ID or name + max_records: Maximum number of records to return (max 100) + offset: Pagination offset + filter_by_formula: Airtable formula to filter records + sort: List of sort specifications + fields: List of field names to include + + Returns: + Tuple of (records_list, next_offset, error_message) + """ + url = f"{self.base_url}/{base_id}/{table_id}" + + params = {} + if max_records: + params["maxRecords"] = min(max_records, 100) # Airtable max is 100 + if offset: + params["offset"] = offset + if filter_by_formula: + params["filterByFormula"] = filter_by_formula + if sort: + for i, sort_spec in enumerate(sort): + params[f"sort[{i}][field]"] = sort_spec["field"] + params[f"sort[{i}][direction]"] = sort_spec.get("direction", "asc") + if fields: + for i, field in enumerate(fields): + params[f"fields[{i}]"] = field + + response_data, error = self._make_request("GET", url, params=params) + + if error: + return [], None, error + + if not response_data: + return [], None, "No data in response" + + records = response_data.get("records", []) + next_offset = response_data.get("offset") + + return records, next_offset, None + + def get_all_records( + self, + base_id: str, + table_id: str, + max_records: int = 2500, + filter_by_formula: str | None = None, + sort: list[dict[str, str]] | None = None, + fields: list[str] | None = None, + ) -> tuple[list[dict[str, Any]], str | None]: + """ + Get all records from a table with pagination. + + Args: + base_id: The base ID + table_id: The table ID or name + max_records: Maximum total records to fetch + filter_by_formula: Airtable formula to filter records + sort: List of sort specifications + fields: List of field names to include + + Returns: + Tuple of (all_records, error_message) + """ + all_records = [] + offset = None + fetched_count = 0 + + while fetched_count < max_records: + batch_size = min(100, max_records - fetched_count) + + records, next_offset, error = self.get_records( + base_id=base_id, + table_id=table_id, + max_records=batch_size, + offset=offset, + filter_by_formula=filter_by_formula, + sort=sort, + fields=fields, + ) + + if error: + return all_records, error + + if not records: + break + + all_records.extend(records) + fetched_count += len(records) + + if not next_offset: + break + + offset = next_offset + + # Small delay to be respectful to the API + time.sleep(0.1) + + return all_records, None + + def get_records_by_date_range( + self, + base_id: str, + table_id: str, + date_field: str, + start_date: str, + end_date: str, + max_records: int = 2500, + ) -> tuple[list[dict[str, Any]], str | None]: + """ + Get records filtered by a date range. + + Args: + base_id: The base ID + table_id: The table ID or name + date_field: Name of the date field to filter on + start_date: Start date (YYYY-MM-DD format) + end_date: End date (YYYY-MM-DD format) + max_records: Maximum total records to fetch + + Returns: + Tuple of (records, error_message) + """ + try: + # Parse and validate dates + start_dt = isoparse(start_date) + end_dt = isoparse(end_date) + + if start_dt >= end_dt: + return ( + [], + f"start_date ({start_date}) must be before end_date ({end_date})", + ) + + # Create Airtable formula for date filtering + # filter_formula = ( + # f"AND(" + # f"IS_AFTER({{date_field}}, '{start_date}'), " + # f"IS_BEFORE({{date_field}}, '{end_date}')" + # f")" + # ).replace("{date_field}", date_field) + # TODO: Investigate how to properly use filter formula + + return self.get_all_records( + base_id=base_id, + table_id=table_id, + max_records=max_records, + # filter_by_formula=filter_formula, + ) + + except Exception as e: + return [], f"Error filtering by date range: {e!s}" + + def format_record_to_markdown( + self, record: dict[str, Any], table_name: str = "" + ) -> str: + """ + Format an Airtable record as markdown. + + Args: + record: The Airtable record + table_name: Name of the table (optional) + + Returns: + Formatted markdown string + """ + record_id = record.get("id", "Unknown") + fields = record.get("fields", {}) + created_time = record.get("CREATED_TIME()", "") + + markdown_parts = [] + + # Title + title = "Airtable Record" + if table_name: + title += f" from {table_name}" + markdown_parts.append(f"# {title}") + markdown_parts.append("") + + # Metadata + markdown_parts.append("## Record Information") + markdown_parts.append(f"- **Record ID**: {record_id}") + if created_time: + markdown_parts.append(f"- **Created**: {created_time}") + markdown_parts.append("") + + # Fields + if fields: + markdown_parts.append("## Fields") + for field_name, field_value in fields.items(): + markdown_parts.append(f"### {field_name}") + + if isinstance(field_value, list): + for item in field_value: + if isinstance(item, dict): + # Handle attachments, linked records, etc. + if "url" in item: + markdown_parts.append(f"- [Attachment]({item['url']})") + else: + markdown_parts.append(f"- {json.dumps(item, indent=2)}") + else: + markdown_parts.append(f"- {item}") + elif isinstance(field_value, dict): + markdown_parts.append( + f"```json\n{json.dumps(field_value, indent=2)}\n```" + ) + else: + markdown_parts.append(str(field_value)) + + markdown_parts.append("") + + return "\n".join(markdown_parts) diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 49e227f..488503e 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -48,6 +48,7 @@ class DocumentType(str, Enum): CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR" GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR" GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR" + AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" class SearchSourceConnectorType(str, Enum): @@ -64,6 +65,7 @@ class SearchSourceConnectorType(str, Enum): CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR" GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR" GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR" + AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" class ChatType(str, Enum): diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index e10db1e..9cf4387 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -1,5 +1,8 @@ from fastapi import APIRouter +from .airtable_add_connector_route import ( + router as airtable_add_connector_router, +) from .chats_routes import router as chats_router from .documents_routes import router as documents_router from .google_calendar_add_connector_route import ( @@ -23,5 +26,6 @@ router.include_router(chats_router) router.include_router(search_source_connectors_router) router.include_router(google_calendar_add_connector_router) router.include_router(google_gmail_add_connector_router) +router.include_router(airtable_add_connector_router) router.include_router(llm_config_router) router.include_router(logs_router) diff --git a/surfsense_backend/app/routes/airtable_add_connector_route.py b/surfsense_backend/app/routes/airtable_add_connector_route.py new file mode 100644 index 0000000..2747e3a --- /dev/null +++ b/surfsense_backend/app/routes/airtable_add_connector_route.py @@ -0,0 +1,282 @@ +import base64 +import hashlib +import json +import logging +import secrets +from datetime import UTC, datetime, timedelta +from uuid import UUID + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import RedirectResponse +from pydantic import ValidationError +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.config import config +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + User, + get_async_session, +) +from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase +from app.users import current_active_user + +logger = logging.getLogger(__name__) + +router = APIRouter() + +# Airtable OAuth endpoints +AUTHORIZATION_URL = "https://airtable.com/oauth2/v1/authorize" +TOKEN_URL = "https://airtable.com/oauth2/v1/token" + +# OAuth scopes for Airtable +SCOPES = [ + "data.records:read", + "data.recordComments:read", + "schema.bases:read", + "user.email:read", +] + + +def make_basic_auth_header(client_id: str, client_secret: str) -> str: + credentials = f"{client_id}:{client_secret}".encode() + b64 = base64.b64encode(credentials).decode("ascii") + return f"Basic {b64}" + + +def generate_pkce_pair() -> tuple[str, str]: + """ + Generate PKCE code verifier and code challenge. + + Returns: + Tuple of (code_verifier, code_challenge) + """ + # Generate code verifier (43-128 characters) + code_verifier = ( + base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=") + ) + + # Generate code challenge (SHA256 hash of verifier, base64url encoded) + code_challenge = ( + base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()) + .decode("utf-8") + .rstrip("=") + ) + + return code_verifier, code_challenge + + +@router.get("/auth/airtable/connector/add/") +async def connect_airtable(space_id: int, user: User = Depends(current_active_user)): + """ + Initiate Airtable OAuth flow. + + Args: + space_id: The search space ID + user: Current authenticated user + + Returns: + Authorization URL for redirect + """ + try: + if not space_id: + raise HTTPException(status_code=400, detail="space_id is required") + + if not config.AIRTABLE_CLIENT_ID: + raise HTTPException( + status_code=500, detail="Airtable OAuth not configured." + ) + + # Generate PKCE parameters + code_verifier, code_challenge = generate_pkce_pair() + + # Generate state parameter + state_payload = json.dumps( + { + "space_id": space_id, + "user_id": str(user.id), + "code_verifier": code_verifier, + } + ) + state_encoded = base64.urlsafe_b64encode(state_payload.encode()).decode() + + # Build authorization URL + auth_params = { + "client_id": config.AIRTABLE_CLIENT_ID, + "redirect_uri": config.AIRTABLE_REDIRECT_URI, + "response_type": "code", + "scope": " ".join(SCOPES), + "state": state_encoded, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + } + + # Construct URL manually to ensure proper encoding + from urllib.parse import urlencode + + auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}" + + logger.info( + f"Generated Airtable OAuth URL for user {user.id}, space {space_id}" + ) + return {"auth_url": auth_url} + + except Exception as e: + logger.error(f"Failed to initiate Airtable OAuth: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to initiate Airtable OAuth: {e!s}" + ) from e + + +@router.get("/auth/airtable/connector/callback/") +async def airtable_callback( + request: Request, + code: str, + state: str, + session: AsyncSession = Depends(get_async_session), +): + """ + Handle Airtable OAuth callback. + + Args: + request: FastAPI request object + code: Authorization code from Airtable + state: State parameter containing user/space info + session: Database session + + Returns: + Redirect response to frontend + """ + try: + # Decode and parse the state + try: + decoded_state = base64.urlsafe_b64decode(state.encode()).decode() + data = json.loads(decoded_state) + except Exception as e: + raise HTTPException( + status_code=400, detail=f"Invalid state parameter: {e!s}" + ) from e + + user_id = UUID(data["user_id"]) + space_id = data["space_id"] + code_verifier = data["code_verifier"] + auth_header = make_basic_auth_header( + config.AIRTABLE_CLIENT_ID, config.AIRTABLE_CLIENT_SECRET + ) + + # Exchange authorization code for access token + token_data = { + "client_id": config.AIRTABLE_CLIENT_ID, + "client_secret": config.AIRTABLE_CLIENT_SECRET, + "redirect_uri": config.AIRTABLE_REDIRECT_URI, + "code": code, + "grant_type": "authorization_code", + "code_verifier": code_verifier, + } + + async with httpx.AsyncClient() as client: + token_response = await client.post( + TOKEN_URL, + data=token_data, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": auth_header, + }, + timeout=30.0, + ) + + if token_response.status_code != 200: + error_detail = token_response.text + try: + error_json = token_response.json() + error_detail = error_json.get("error_description", error_detail) + except Exception: + pass + raise HTTPException( + status_code=400, detail=f"Token exchange failed: {error_detail}" + ) + + token_json = token_response.json() + + # Calculate expiration time (UTC, tz-aware) + expires_at = None + if token_json.get("expires_in"): + now_utc = datetime.now(UTC) + expires_at = now_utc + timedelta(seconds=int(token_json["expires_in"])) + + # Create credentials object + credentials = AirtableAuthCredentialsBase( + access_token=token_json["access_token"], + refresh_token=token_json.get("refresh_token"), + token_type=token_json.get("token_type", "Bearer"), + expires_in=token_json.get("expires_in"), + expires_at=expires_at, + scope=token_json.get("scope"), + ) + + # Check if connector already exists for this user + existing_connector_result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.AIRTABLE_CONNECTOR, + ) + ) + existing_connector = existing_connector_result.scalars().first() + + if existing_connector: + # Update existing connector + existing_connector.config = credentials.to_dict() + existing_connector.name = "Airtable Connector" + existing_connector.is_indexable = True + logger.info(f"Updated existing Airtable connector for user {user_id}") + else: + # Create new connector + new_connector = SearchSourceConnector( + name="Airtable Connector", + connector_type=SearchSourceConnectorType.AIRTABLE_CONNECTOR, + is_indexable=True, + config=credentials.to_dict(), + user_id=user_id, + ) + session.add(new_connector) + logger.info(f"Created new Airtable connector for user {user_id}") + + try: + await session.commit() + logger.info(f"Successfully saved Airtable connector for user {user_id}") + + # Redirect to the frontend success page + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/add/airtable-connector?success=true" + ) + + except ValidationError as e: + await session.rollback() + raise HTTPException( + status_code=422, detail=f"Validation error: {e!s}" + ) from e + except IntegrityError as e: + await session.rollback() + raise HTTPException( + status_code=409, + detail=f"Integrity error: A connector with this type already exists. {e!s}", + ) from e + except Exception as e: + logger.error(f"Failed to create search source connector: {e!s}") + await session.rollback() + raise HTTPException( + status_code=500, + detail=f"Failed to create search source connector: {e!s}", + ) from e + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to complete Airtable OAuth: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to complete Airtable OAuth: {e!s}" + ) from e diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index d1f6108..6ea3929 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -36,6 +36,7 @@ from app.schemas import ( SearchSourceConnectorUpdate, ) from app.tasks.connector_indexers import ( + index_airtable_records, index_clickup_tasks, index_confluence_pages, index_discord_messages, @@ -508,6 +509,20 @@ async def index_connector_content( indexing_to, ) response_message = "Google Calendar indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.AIRTABLE_CONNECTOR: + # Run indexing in background + logger.info( + f"Triggering Airtable indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_airtable_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) + response_message = "Airtable indexing started in the background." elif ( connector.connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR ): @@ -1072,6 +1087,64 @@ async def run_clickup_indexing( # Optionally update status in DB to indicate failure +# Add new helper functions for Airtable indexing +async def run_airtable_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Wrapper to run Airtable indexing with its own database session.""" + logger.info( + f"Background task started: Indexing Airtable connector {connector_id} into space {search_space_id} from {start_date} to {end_date}" + ) + async with async_session_maker() as session: + await run_airtable_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + logger.info(f"Background task finished: Indexing Airtable connector {connector_id}") + + +async def run_airtable_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Runs the Airtable indexing task and updates the timestamp.""" + try: + indexed_count, error_message = await index_airtable_records( + session, + connector_id, + search_space_id, + user_id, + start_date, + end_date, + update_last_indexed=False, + ) + if error_message: + logger.error( + f"Airtable indexing failed for connector {connector_id}: {error_message}" + ) + # Optionally update status in DB to indicate failure + else: + logger.info( + f"Airtable indexing successful for connector {connector_id}. Indexed {indexed_count} records." + ) + # Update the last indexed timestamp only on success + await update_connector_last_indexed(session, connector_id) + await session.commit() # Commit timestamp update + except Exception as e: + logger.error( + f"Critical error in run_airtable_indexing for connector {connector_id}: {e}", + exc_info=True, + ) + # Optionally update status in DB to indicate failure + + # Add new helper functions for Google Calendar indexing async def run_google_calendar_indexing_with_new_session( connector_id: int, diff --git a/surfsense_backend/app/schemas/airtable_auth_credentials.py b/surfsense_backend/app/schemas/airtable_auth_credentials.py new file mode 100644 index 0000000..586e99e --- /dev/null +++ b/surfsense_backend/app/schemas/airtable_auth_credentials.py @@ -0,0 +1,66 @@ +from datetime import UTC, datetime + +from pydantic import BaseModel, field_validator + + +class AirtableAuthCredentialsBase(BaseModel): + access_token: str + refresh_token: str | None = None + token_type: str = "Bearer" + expires_in: int | None = None + expires_at: datetime | None = None + scope: str | None = None + + @property + def is_expired(self) -> bool: + """Check if the credentials have expired.""" + if self.expires_at is None: + return False + return self.expires_at <= datetime.now(UTC) + + @property + def is_refreshable(self) -> bool: + """Check if the credentials can be refreshed.""" + return self.refresh_token is not None + + def to_dict(self) -> dict: + """Convert credentials to dictionary for storage.""" + return { + "access_token": self.access_token, + "refresh_token": self.refresh_token, + "token_type": self.token_type, + "expires_in": self.expires_in, + "expires_at": self.expires_at.isoformat() if self.expires_at else None, + "scope": self.scope, + } + + @classmethod + def from_dict(cls, data: dict) -> "AirtableAuthCredentialsBase": + """Create credentials from dictionary.""" + expires_at = None + if data.get("expires_at"): + expires_at = datetime.fromisoformat(data["expires_at"]) + + return cls( + access_token=data["access_token"], + refresh_token=data.get("refresh_token"), + token_type=data.get("token_type", "Bearer"), + expires_in=data.get("expires_in"), + expires_at=expires_at, + scope=data.get("scope"), + ) + + @field_validator("expires_at", mode="before") + @classmethod + def ensure_aware_utc(cls, v): + # Strings like "2025-08-26T14:46:57.367184" + if isinstance(v, str): + # add +00:00 if missing tz info + if v.endswith("Z"): + return datetime.fromisoformat(v.replace("Z", "+00:00")) + dt = datetime.fromisoformat(v) + return dt if dt.tzinfo else dt.replace(tzinfo=UTC) + # datetime objects + if isinstance(v, datetime): + return v if v.tzinfo else v.replace(tzinfo=UTC) + return v diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index c2e3669..a6a75ba 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -1209,6 +1209,94 @@ class ConnectorService: return result_object, calendar_chunks + async def search_airtable( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: + """ + Search for Airtable records and return both the source information and langchain documents + + Args: + user_query: The user's query + user_id: The user's ID + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + search_mode: Search mode (CHUNKS or DOCUMENTS) + + Returns: + tuple: (sources_info, langchain_documents) + """ + if search_mode == SearchMode.CHUNKS: + airtable_chunks = await self.chunk_retriever.hybrid_search( + query_text=user_query, + top_k=top_k, + user_id=user_id, + search_space_id=search_space_id, + document_type="AIRTABLE_CONNECTOR", + ) + elif search_mode == SearchMode.DOCUMENTS: + airtable_chunks = await self.document_retriever.hybrid_search( + query_text=user_query, + top_k=top_k, + user_id=user_id, + search_space_id=search_space_id, + document_type="AIRTABLE_CONNECTOR", + ) + # Transform document retriever results to match expected format + airtable_chunks = self._transform_document_results(airtable_chunks) + + # Early return if no results + if not airtable_chunks: + return { + "id": 32, + "name": "Airtable Records", + "type": "AIRTABLE_CONNECTOR", + "sources": [], + }, [] + + # Process chunks to create sources + sources_list = [] + async with self.counter_lock: + for _i, chunk in enumerate(airtable_chunks): + # Extract document metadata + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) + + # Extract Airtable-specific metadata + record_id = metadata.get("record_id", "") + created_time = metadata.get("created_time", "") + + # Create a more descriptive title for Airtable records + title = f"Airtable Record: {record_id}" + + # Create a more descriptive description for Airtable records + description = f"Created: {created_time}" + + source = { + "id": chunk.get("chunk_id", self.source_id_counter), + "title": title, + "description": description, + "url": "", # TODO: Add URL to Airtable record + "record_id": record_id, + "created_time": created_time, + } + + self.source_id_counter += 1 + sources_list.append(source) + + result_object = { + "id": 32, + "name": "Airtable Records", + "type": "AIRTABLE_CONNECTOR", + "sources": sources_list, + } + + return result_object, airtable_chunks + async def search_google_gmail( self, user_query: str, diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index 7befa59..acc3db4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -19,14 +19,14 @@ Available indexers: """ # Communication platforms +# Calendar and scheduling +from .airtable_indexer import index_airtable_records from .clickup_indexer import index_clickup_tasks from .confluence_indexer import index_confluence_pages from .discord_indexer import index_discord_messages # Development platforms from .github_indexer import index_github_repos - -# Calendar and scheduling from .google_calendar_indexer import index_google_calendar_events from .google_gmail_indexer import index_google_gmail_messages from .jira_indexer import index_jira_issues @@ -39,6 +39,7 @@ from .notion_indexer import index_notion_pages from .slack_indexer import index_slack_messages __all__ = [ # noqa: RUF022 + "index_airtable_records", "index_clickup_tasks", "index_confluence_pages", "index_discord_messages", diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py new file mode 100644 index 0000000..9ba2c2b --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -0,0 +1,381 @@ +""" +Airtable connector indexer. +""" + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.connectors.airtable_connector import AirtableConnector +from app.db import Document, DocumentType, SearchSourceConnectorType +from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) + +from .base import ( + calculate_date_range, + check_duplicate_document_by_hash, + get_connector_by_id, + logger, + update_connector_last_indexed, +) + + +async def index_airtable_records( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None = None, + end_date: str | None = None, + max_records: int = 2500, + update_last_indexed: bool = True, +) -> tuple[int, str | None]: + """ + Index Airtable records for a given connector. + + Args: + session: Database session + connector_id: ID of the Airtable connector + search_space_id: ID of the search space to store documents in + user_id: ID of the user + start_date: Start date for filtering records (YYYY-MM-DD) + end_date: End date for filtering records (YYYY-MM-DD) + max_records: Maximum number of records to fetch per table + update_last_indexed: Whether to update the last_indexed_at timestamp + + Returns: + Tuple of (number_of_documents_processed, error_message) + """ + task_logger = TaskLoggingService(session, search_space_id) + log_entry = await task_logger.log_task_start( + task_name="airtable_indexing", + source="connector_indexing_task", + message=f"Starting Airtable indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + "max_records": max_records, + }, + ) + + try: + # Get the connector from the database + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.AIRTABLE_CONNECTOR + ) + + if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found", + "Connector not found", + {"error_type": "ConnectorNotFound"}, + ) + return 0, f"Connector with ID {connector_id} not found" + + # Create credentials from connector config + config_data = connector.config + try: + credentials = AirtableAuthCredentialsBase.from_dict(config_data) + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Invalid Airtable credentials in connector {connector_id}", + str(e), + {"error_type": "InvalidCredentials"}, + ) + return 0, f"Invalid Airtable credentials: {e!s}" + + # Check if credentials are expired + if credentials.is_expired: + await task_logger.log_task_failure( + log_entry, + f"Airtable credentials expired for connector {connector_id}", + "Credentials expired", + {"error_type": "ExpiredCredentials"}, + ) + return 0, "Airtable credentials have expired. Please re-authenticate." + + # Calculate date range for indexing + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + + logger.info( + f"Starting Airtable indexing for connector {connector_id} " + f"from {start_date_str} to {end_date_str}" + ) + + # Initialize Airtable connector + airtable_connector = AirtableConnector(credentials) + total_processed = 0 + + try: + # Get accessible bases + logger.info(f"Fetching Airtable bases for connector {connector_id}") + bases, error = airtable_connector.get_bases() + + if error: + await task_logger.log_task_failure( + log_entry, + f"Failed to fetch Airtable bases: {error}", + "API Error", + {"error_type": "APIError"}, + ) + return 0, f"Failed to fetch Airtable bases: {error}" + + if not bases: + success_msg = "No Airtable bases found or accessible" + await task_logger.log_task_success( + log_entry, success_msg, {"bases_count": 0} + ) + return 0, success_msg + + logger.info(f"Found {len(bases)} Airtable bases to process") + + # Process each base + for base in bases: + base_id = base.get("id") + base_name = base.get("name", "Unknown Base") + + if not base_id: + logger.warning(f"Skipping base without ID: {base}") + continue + + logger.info(f"Processing base: {base_name} ({base_id})") + + # Get base schema to find tables + schema_data, schema_error = airtable_connector.get_base_schema(base_id) + + if schema_error: + logger.warning( + f"Failed to get schema for base {base_id}: {schema_error}" + ) + continue + + if not schema_data or "tables" not in schema_data: + logger.warning(f"No tables found in base {base_id}") + continue + + tables = schema_data["tables"] + logger.info(f"Found {len(tables)} tables in base {base_name}") + + # Process each table + for table in tables: + table_id = table.get("id") + table_name = table.get("name", "Unknown Table") + + if not table_id: + logger.warning(f"Skipping table without ID: {table}") + continue + + logger.info(f"Processing table: {table_name} ({table_id})") + + # Fetch records + if start_date_str and end_date_str: + # Use date filtering if available + records, records_error = ( + airtable_connector.get_records_by_date_range( + base_id=base_id, + table_id=table_id, + date_field="CREATED_TIME()", + start_date=start_date_str, + end_date=end_date_str, + max_records=max_records, + ) + ) + + else: + # Fetch all records + records, records_error = airtable_connector.get_all_records( + base_id=base_id, + table_id=table_id, + max_records=max_records, + ) + + if records_error: + logger.warning( + f"Failed to fetch records from table {table_name}: {records_error}" + ) + continue + + if not records: + logger.info(f"No records found in table {table_name}") + continue + + logger.info(f"Found {len(records)} records in table {table_name}") + + documents_indexed = 0 + skipped_messages = [] + documents_skipped = 0 + # Process each record + for record in records: + try: + # Generate markdown content + markdown_content = ( + airtable_connector.format_record_to_markdown( + record, f"{base_name} - {table_name}" + ) + ) + + if not markdown_content.strip(): + logger.warning( + f"Skipping message with no content: {record.get('id')}" + ) + skipped_messages.append( + f"{record.get('id')} (no content)" + ) + documents_skipped += 1 + continue + + # Generate content hash + content_hash = generate_content_hash( + markdown_content, search_space_id + ) + + # Check if document already exists + existing_document_by_hash = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if existing_document_by_hash: + logger.info( + f"Document with content hash {content_hash} already exists for message {record.get('id')}. Skipping processing." + ) + documents_skipped += 1 + continue + + # Generate document summary + user_llm = await get_user_long_context_llm(session, user_id) + + if user_llm: + document_metadata = { + "record_id": record.get("id", "Unknown"), + "created_time": record.get("CREATED_TIME()", ""), + "document_type": "Airtable Record", + "connector_type": "Airtable", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Airtable Record: {record.get('id', 'Unknown')}\n\n" + summary_embedding = ( + config.embedding_model_instance.embed( + summary_content + ) + ) + + # Process chunks + chunks = await create_document_chunks(markdown_content) + + # Create and store new document + logger.info( + f"Creating new document for Airtable record: {record.get('id', 'Unknown')}" + ) + document = Document( + search_space_id=search_space_id, + title=f"Airtable Record: {record.get('id', 'Unknown')}", + document_type=DocumentType.AIRTABLE_CONNECTOR, + document_metadata={ + "record_id": record.get("id", "Unknown"), + "created_time": record.get("CREATED_TIME()", ""), + }, + content=summary_content, + content_hash=content_hash, + embedding=summary_embedding, + chunks=chunks, + ) + + session.add(document) + documents_indexed += 1 + logger.info( + f"Successfully indexed new Airtable record {summary_content}" + ) + + except Exception as e: + logger.error( + f"Error processing the Airtable record {record.get('id', 'Unknown')}: {e!s}", + exc_info=True, + ) + skipped_messages.append( + f"{record.get('id', 'Unknown')} (processing error)" + ) + documents_skipped += 1 + continue # Skip this message and continue with others + + # Update the last_indexed_at timestamp for the connector only if requested + total_processed = documents_indexed + if total_processed > 0: + await update_connector_last_indexed( + session, connector, update_last_indexed + ) + + # Commit all changes + await session.commit() + logger.info( + "Successfully committed all Airtable document changes to database" + ) + + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Airtable indexing for connector {connector_id}", + { + "events_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_messages_count": len(skipped_messages), + }, + ) + + logger.info( + f"Airtable indexing completed: {documents_indexed} new records, {documents_skipped} skipped" + ) + return ( + total_processed, + None, + ) # Return None as the error message to indicate success + + except Exception as e: + logger.error( + f"Fetching Airtable bases for connector {connector_id} failed: {e!s}", + exc_info=True, + ) + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Airtable indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error( + f"Database error during Airtable indexing: {db_error!s}", exc_info=True + ) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Airtable records for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Error during Airtable indexing: {e!s}", exc_info=True) + return 0, f"Failed to index Airtable records: {e!s}" diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/page.tsx index e420b9e..be481ee 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/page.tsx @@ -48,6 +48,11 @@ const getConnectorTypeDisplay = (type: string): string => { JIRA_CONNECTOR: "Jira Connector", DISCORD_CONNECTOR: "Discord Connector", LINKUP_API: "Linkup", + CONFLUENCE_CONNECTOR: "Confluence Connector", + CLICKUP_CONNECTOR: "ClickUp Connector", + GOOGLE_CALENDAR_CONNECTOR: "Google Calendar Connector", + GOOGLE_GMAIL_CONNECTOR: "Google Gmail Connector", + AIRTABLE_CONNECTOR: "Airtable Connector", // Add other connector types here as needed }; return typeMap[type] || type; diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/airtable-connector/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/airtable-connector/page.tsx new file mode 100644 index 0000000..b17965b --- /dev/null +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/airtable-connector/page.tsx @@ -0,0 +1,184 @@ +"use client"; + +import { IconBrandAirtable } from "@tabler/icons-react"; +import { motion } from "framer-motion"; +import { ArrowLeft, Check, ExternalLink, Loader2 } from "lucide-react"; +import Link from "next/link"; +import { useParams, useRouter } from "next/navigation"; +import { useEffect, useState } from "react"; +import { toast } from "sonner"; +import { Button } from "@/components/ui/button"; +import { + Card, + CardContent, + CardDescription, + CardFooter, + CardHeader, + CardTitle, +} from "@/components/ui/card"; +import { EnumConnectorName } from "@/contracts/enums/connector"; +import { + type SearchSourceConnector, + useSearchSourceConnectors, +} from "@/hooks/useSearchSourceConnectors"; + +export default function AirtableConnectorPage() { + const router = useRouter(); + const params = useParams(); + const searchSpaceId = params.search_space_id as string; + const [isConnecting, setIsConnecting] = useState(false); + const [doesConnectorExist, setDoesConnectorExist] = useState(false); + + const { fetchConnectors } = useSearchSourceConnectors(); + + useEffect(() => { + fetchConnectors().then((data) => { + const connector = data.find( + (c: SearchSourceConnector) => c.connector_type === EnumConnectorName.AIRTABLE_CONNECTOR + ); + if (connector) { + setDoesConnectorExist(true); + } + }); + }, []); + + const handleConnectAirtable = async () => { + setIsConnecting(true); + try { + const response = await fetch( + `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/auth/airtable/connector/add/?space_id=${searchSpaceId}`, + { + method: "GET", + headers: { + Authorization: `Bearer ${localStorage.getItem("surfsense_bearer_token")}`, + }, + } + ); + + if (!response.ok) { + throw new Error("Failed to initiate Airtable OAuth"); + } + + const data = await response.json(); + + // Redirect to Airtable for authentication + window.location.href = data.auth_url; + } catch (error) { + console.error("Error connecting to Airtable:", error); + toast.error("Failed to connect to Airtable"); + } finally { + setIsConnecting(false); + } + }; + + return ( +
+ + {/* Header */} +
+ + + Back to connectors + +
+
+ +
+
+

Connect Airtable

+

Connect your Airtable to search records.

+
+
+
+ + {/* OAuth Connection Card */} + {!doesConnectorExist ? ( + + + Connect Your Airtable Account + + Connect your Airtable account to access your records. We'll only request read-only + access to your records. + + + +
+ + Read-only access to your records +
+
+ + Access works even when you're offline +
+
+ + You can disconnect anytime +
+
+ + + + +
+ ) : ( + /* Configuration Form Card */ + + + ✅ Your Airtable is successfully connected! + + + )} + + {/* Help Section */} + {!doesConnectorExist && ( + + + How It Works + + +
+

1. Connect Your Account

+

+ Click "Connect Your Airtable Account" to start the secure OAuth process. You'll be + redirected to Airtable to sign in. +

+
+
+

2. Grant Permissions

+

+ Airtable will ask for permission to read your records. We only request read-only + access to keep your data safe. +

+
+
+
+ )} +
+
+ ); +} diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx index 2d4f2b9..a78bdfa 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx @@ -15,6 +15,7 @@ import { IconLayoutKanban, IconLinkPlus, IconMail, + IconTable, IconTicket, IconWorldWww, } from "@tabler/icons-react"; @@ -143,6 +144,13 @@ const connectorCategories: ConnectorCategory[] = [ icon: , status: "available", }, + { + id: "airtable-connector", + title: "Airtable", + description: "Connect to Airtable to search records, tables and database content.", + icon: , + status: "available", + }, ], }, { diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx index 5273fa5..403acfe 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx @@ -11,6 +11,7 @@ import { IconChecklist, IconLayoutKanban, IconMail, + IconTable, IconTicket, } from "@tabler/icons-react"; import { File, Globe, Webhook } from "lucide-react"; @@ -33,6 +34,7 @@ const documentTypeIcons: Record = { CLICKUP_CONNECTOR: IconChecklist, GOOGLE_CALENDAR_CONNECTOR: IconCalendar, GOOGLE_GMAIL_CONNECTOR: IconMail, + AIRTABLE_CONNECTOR: IconTable, }; export function getDocumentTypeIcon(type: string): IconComponent { diff --git a/surfsense_web/components/chat/ChatSources.tsx b/surfsense_web/components/chat/ChatSources.tsx index d7a0d32..c74369e 100644 --- a/surfsense_web/components/chat/ChatSources.tsx +++ b/surfsense_web/components/chat/ChatSources.tsx @@ -12,6 +12,7 @@ import { BookOpen, Calendar, CheckSquare, + Database, ExternalLink, FileText, Globe, @@ -86,6 +87,11 @@ function getSourceIcon(type: string) { case "GOOGLE_GMAIL_CONNECTOR": return ; + // Airtable + case "USER_SELECTED_AIRTABLE_CONNECTOR": + case "AIRTABLE_CONNECTOR": + return ; + // YouTube case "USER_SELECTED_YOUTUBE_VIDEO": case "YOUTUBE_VIDEO": diff --git a/surfsense_web/components/chat/ConnectorComponents.tsx b/surfsense_web/components/chat/ConnectorComponents.tsx index d66227e..d016fb4 100644 --- a/surfsense_web/components/chat/ConnectorComponents.tsx +++ b/surfsense_web/components/chat/ConnectorComponents.tsx @@ -8,6 +8,7 @@ import { IconLayoutKanban, IconLinkPlus, IconMail, + IconTable, IconTicket, } from "@tabler/icons-react"; import { @@ -62,6 +63,8 @@ export const getConnectorIcon = (connectorType: string) => { return ; case "GOOGLE_GMAIL_CONNECTOR": return ; + case "AIRTABLE_CONNECTOR": + return ; case "DEEP": return ; case "DEEPER": diff --git a/surfsense_web/components/chat/DocumentsDataTable.tsx b/surfsense_web/components/chat/DocumentsDataTable.tsx index 685178c..d7a240d 100644 --- a/surfsense_web/components/chat/DocumentsDataTable.tsx +++ b/surfsense_web/components/chat/DocumentsDataTable.tsx @@ -55,6 +55,10 @@ const DOCUMENT_TYPES: (DocumentType | "ALL")[] = [ "DISCORD_CONNECTOR", "JIRA_CONNECTOR", "CONFLUENCE_CONNECTOR", + "CLICKUP_CONNECTOR", + "GOOGLE_CALENDAR_CONNECTOR", + "GOOGLE_GMAIL_CONNECTOR", + "AIRTABLE_CONNECTOR", ]; const getDocumentTypeColor = (type: DocumentType) => { diff --git a/surfsense_web/contracts/enums/connector.ts b/surfsense_web/contracts/enums/connector.ts index 2b58c6a..bc121e1 100644 --- a/surfsense_web/contracts/enums/connector.ts +++ b/surfsense_web/contracts/enums/connector.ts @@ -12,4 +12,5 @@ export enum EnumConnectorName { CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR", GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR", GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR", + AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR", } diff --git a/surfsense_web/hooks/use-documents.ts b/surfsense_web/hooks/use-documents.ts index c888513..4c75366 100644 --- a/surfsense_web/hooks/use-documents.ts +++ b/surfsense_web/hooks/use-documents.ts @@ -23,7 +23,11 @@ export type DocumentType = | "LINEAR_CONNECTOR" | "DISCORD_CONNECTOR" | "JIRA_CONNECTOR" - | "CONFLUENCE_CONNECTOR"; + | "CONFLUENCE_CONNECTOR" + | "CLICKUP_CONNECTOR" + | "GOOGLE_CALENDAR_CONNECTOR" + | "GOOGLE_GMAIL_CONNECTOR" + | "AIRTABLE_CONNECTOR"; export function useDocuments(searchSpaceId: number, lazy: boolean = false) { const [documents, setDocuments] = useState([]); diff --git a/surfsense_web/lib/connectors/utils.ts b/surfsense_web/lib/connectors/utils.ts index e01d31e..c1f2165 100644 --- a/surfsense_web/lib/connectors/utils.ts +++ b/surfsense_web/lib/connectors/utils.ts @@ -14,6 +14,7 @@ export const getConnectorTypeDisplay = (type: string): string => { CLICKUP_CONNECTOR: "ClickUp", GOOGLE_CALENDAR_CONNECTOR: "Google Calendar", GOOGLE_GMAIL_CONNECTOR: "Google Gmail", + AIRTABLE_CONNECTOR: "Airtable", }; return typeMap[type] || type; };