Add airtable connector auth flow routes

This commit is contained in:
CREDO23 2025-08-26 13:56:31 +02:00
parent bc89959d2f
commit 27b914f822
7 changed files with 402 additions and 0 deletions

View file

@ -11,6 +11,11 @@ GOOGLE_OAUTH_CLIENT_SECRET=GOCSV
GOOGLE_CALENDAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/calendar/connector/callback
GOOGLE_GMAIL_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/gmail/connector/callback
# Airtable OAuth
AIRTABLE_CLIENT_ID=your_airtable_client_id
AIRTABLE_CLIENT_SECRET=your_airtable_client_secret
AIRTABLE_REDIRECT_URI=http://localhost:8000/api/v1/auth/airtable/connector/callback
# Embedding Model
EMBEDDING_MODEL=mixedbread-ai/mxbai-embed-large-v1

View file

@ -0,0 +1,55 @@
"""Add AIRTABLE_CONNECTOR to enums
Revision ID: 19
Revises: 18
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "19"
down_revision = "18"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Upgrade schema - add AIRTABLE_CONNECTOR to enums."""
# Add to searchsourceconnectortype enum
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'AIRTABLE_CONNECTOR'
) THEN
ALTER TYPE searchsourceconnectortype ADD VALUE 'AIRTABLE_CONNECTOR';
END IF;
END
$$;
"""
)
# Add to documenttype enum
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'documenttype' AND e.enumlabel = 'AIRTABLE_CONNECTOR'
) THEN
ALTER TYPE documenttype ADD VALUE 'AIRTABLE_CONNECTOR';
END IF;
END
$$;
"""
)
def downgrade() -> None:
"""Downgrade schema - remove AIRTABLE_CONNECTOR from enums."""
pass

View file

@ -54,6 +54,11 @@ class Config:
# Google Gmail redirect URI
GOOGLE_GMAIL_REDIRECT_URI = os.getenv("GOOGLE_GMAIL_REDIRECT_URI")
# Airtable OAuth
AIRTABLE_CLIENT_ID = os.getenv("AIRTABLE_CLIENT_ID")
AIRTABLE_CLIENT_SECRET = os.getenv("AIRTABLE_CLIENT_SECRET")
AIRTABLE_REDIRECT_URI = os.getenv("AIRTABLE_REDIRECT_URI")
# LLM instances are now managed per-user through the LLMConfig system
# Legacy environment variables removed in favor of user-specific configurations

View file

@ -48,6 +48,7 @@ class DocumentType(str, Enum):
CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR"
GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR"
GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR"
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
class SearchSourceConnectorType(str, Enum):
@ -64,6 +65,7 @@ class SearchSourceConnectorType(str, Enum):
CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR"
GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR"
GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR"
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
class ChatType(str, Enum):

View file

@ -1,5 +1,8 @@
from fastapi import APIRouter
from .airtable_add_connector_route import (
router as airtable_add_connector_router,
)
from .chats_routes import router as chats_router
from .documents_routes import router as documents_router
from .google_calendar_add_connector_route import (
@ -23,5 +26,6 @@ router.include_router(chats_router)
router.include_router(search_source_connectors_router)
router.include_router(google_calendar_add_connector_router)
router.include_router(google_gmail_add_connector_router)
router.include_router(airtable_add_connector_router)
router.include_router(llm_config_router)
router.include_router(logs_router)

View file

@ -0,0 +1,280 @@
import base64
import hashlib
import json
import logging
import secrets
from datetime import datetime, timedelta
from uuid import UUID
import httpx
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import RedirectResponse
from pydantic import ValidationError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.config import config
from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
User,
get_async_session,
)
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
from app.users import current_active_user
logger = logging.getLogger(__name__)
router = APIRouter()
# Airtable OAuth endpoints
AUTHORIZATION_URL = "https://airtable.com/oauth2/v1/authorize"
TOKEN_URL = "https://airtable.com/oauth2/v1/token"
# OAuth scopes for Airtable
SCOPES = [
"data.records:read",
"data.recordComments:read",
]
def make_basic_auth_header(client_id: str, client_secret: str) -> str:
credentials = f"{client_id}:{client_secret}".encode()
b64 = base64.b64encode(credentials).decode("ascii")
return f"Basic {b64}"
def generate_pkce_pair() -> tuple[str, str]:
"""
Generate PKCE code verifier and code challenge.
Returns:
Tuple of (code_verifier, code_challenge)
"""
# Generate code verifier (43-128 characters)
code_verifier = (
base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=")
)
# Generate code challenge (SHA256 hash of verifier, base64url encoded)
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest())
.decode("utf-8")
.rstrip("=")
)
return code_verifier, code_challenge
@router.get("/auth/airtable/connector/add/")
async def connect_airtable(space_id: int, user: User = Depends(current_active_user)):
"""
Initiate Airtable OAuth flow.
Args:
space_id: The search space ID
user: Current authenticated user
Returns:
Authorization URL for redirect
"""
try:
if not space_id:
raise HTTPException(status_code=400, detail="space_id is required")
if not config.AIRTABLE_CLIENT_ID:
raise HTTPException(
status_code=500, detail="Airtable OAuth not configured."
)
# Generate PKCE parameters
code_verifier, code_challenge = generate_pkce_pair()
# Generate state parameter
state_payload = json.dumps(
{
"space_id": space_id,
"user_id": str(user.id),
"code_verifier": code_verifier,
}
)
state_encoded = base64.urlsafe_b64encode(state_payload.encode()).decode()
# Build authorization URL
auth_params = {
"client_id": config.AIRTABLE_CLIENT_ID,
"redirect_uri": config.AIRTABLE_REDIRECT_URI,
"response_type": "code",
"scope": " ".join(SCOPES),
"state": state_encoded,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
# Construct URL manually to ensure proper encoding
from urllib.parse import urlencode
auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}"
logger.info(
f"Generated Airtable OAuth URL for user {user.id}, space {space_id}"
)
return {"auth_url": auth_url}
except Exception as e:
logger.error(f"Failed to initiate Airtable OAuth: {e!s}", exc_info=True)
raise HTTPException(
status_code=500, detail=f"Failed to initiate Airtable OAuth: {e!s}"
) from e
@router.get("/auth/airtable/connector/callback/")
async def airtable_callback(
request: Request,
code: str,
state: str,
session: AsyncSession = Depends(get_async_session),
):
"""
Handle Airtable OAuth callback.
Args:
request: FastAPI request object
code: Authorization code from Airtable
state: State parameter containing user/space info
session: Database session
Returns:
Redirect response to frontend
"""
try:
# Decode and parse the state
try:
decoded_state = base64.urlsafe_b64decode(state.encode()).decode()
data = json.loads(decoded_state)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Invalid state parameter: {e!s}"
) from e
user_id = UUID(data["user_id"])
space_id = data["space_id"]
code_verifier = data["code_verifier"]
auth_header = make_basic_auth_header(
config.AIRTABLE_CLIENT_ID, config.AIRTABLE_CLIENT_SECRET
)
# Exchange authorization code for access token
token_data = {
"client_id": config.AIRTABLE_CLIENT_ID,
"client_secret": config.AIRTABLE_CLIENT_SECRET,
"redirect_uri": config.AIRTABLE_REDIRECT_URI,
"code": code,
"grant_type": "authorization_code",
"code_verifier": code_verifier,
}
async with httpx.AsyncClient() as client:
token_response = await client.post(
TOKEN_URL,
data=token_data,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_header,
},
timeout=30.0,
)
logger.info(f"Token response: {token_response.json()}")
if token_response.status_code != 200:
error_detail = token_response.text
try:
error_json = token_response.json()
error_detail = error_json.get("error_description", error_detail)
except Exception:
pass
raise HTTPException(
status_code=400, detail=f"Token exchange failed: {error_detail}"
)
token_json = token_response.json()
# Calculate expiration time
expires_at = None
if token_json.get("expires_in"):
expires_at = datetime.now() + timedelta(seconds=token_json["expires_in"])
# Create credentials object
credentials = AirtableAuthCredentialsBase(
access_token=token_json["access_token"],
refresh_token=token_json.get("refresh_token"),
token_type=token_json.get("token_type", "Bearer"),
expires_in=token_json.get("expires_in"),
expires_at=expires_at,
scope=token_json.get("scope"),
)
# Check if connector already exists for this user
existing_connector_result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.AIRTABLE_CONNECTOR,
)
)
existing_connector = existing_connector_result.scalars().first()
if existing_connector:
# Update existing connector
existing_connector.config = credentials.to_dict()
existing_connector.name = "Airtable Connector"
existing_connector.is_indexable = True
logger.info(f"Updated existing Airtable connector for user {user_id}")
else:
# Create new connector
new_connector = SearchSourceConnector(
name="Airtable Connector",
connector_type=SearchSourceConnectorType.AIRTABLE_CONNECTOR,
is_indexable=True,
config=credentials.to_dict(),
user_id=user_id,
)
session.add(new_connector)
logger.info(f"Created new Airtable connector for user {user_id}")
try:
await session.commit()
logger.info(f"Successfully saved Airtable connector for user {user_id}")
# Redirect to frontend success page
frontend_url = f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors"
return RedirectResponse(url=frontend_url)
except ValidationError as e:
await session.rollback()
raise HTTPException(
status_code=422, detail=f"Validation error: {e!s}"
) from e
except IntegrityError as e:
await session.rollback()
raise HTTPException(
status_code=409,
detail=f"Integrity error: A connector with this type already exists. {e!s}",
) from e
except Exception as e:
logger.error(f"Failed to create search source connector: {e!s}")
await session.rollback()
raise HTTPException(
status_code=500,
detail=f"Failed to create search source connector: {e!s}",
) from e
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to complete Airtable OAuth: {e!s}", exc_info=True)
raise HTTPException(
status_code=500, detail=f"Failed to complete Airtable OAuth: {e!s}"
) from e

View file

@ -0,0 +1,51 @@
from datetime import UTC, datetime
from pydantic import BaseModel
class AirtableAuthCredentialsBase(BaseModel):
access_token: str
refresh_token: str | None = None
token_type: str = "Bearer"
expires_in: int | None = None
expires_at: datetime | None = None
scope: str | None = None
@property
def is_expired(self) -> bool:
"""Check if the credentials have expired."""
if self.expires_at is None:
return False
return self.expires_at <= datetime.now(UTC)
@property
def is_refreshable(self) -> bool:
"""Check if the credentials can be refreshed."""
return self.refresh_token is not None
def to_dict(self) -> dict:
"""Convert credentials to dictionary for storage."""
return {
"access_token": self.access_token,
"refresh_token": self.refresh_token,
"token_type": self.token_type,
"expires_in": self.expires_in,
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"scope": self.scope,
}
@classmethod
def from_dict(cls, data: dict) -> "AirtableAuthCredentialsBase":
"""Create credentials from dictionary."""
expires_at = None
if data.get("expires_at"):
expires_at = datetime.fromisoformat(data["expires_at"])
return cls(
access_token=data["access_token"],
refresh_token=data.get("refresh_token"),
token_type=data.get("token_type", "Bearer"),
expires_in=data.get("expires_in"),
expires_at=expires_at,
scope=data.get("scope"),
)