""" Field-level encryption for sensitive data using API keys. This module provides encryption/decryption for API keys stored in the database. Fernet uses AES-128-CBC with HMAC-SHA256 for authenticated encryption. OPEN_NOTEBOOK_ENCRYPTION_KEY accepts **any string**. A Fernet key is derived from it via SHA-256, so users can set a simple passphrase like ``OPEN_NOTEBOOK_ENCRYPTION_KEY=my-secret`` and it will work. Usage: # Encrypt before storing encrypted = encrypt_value(api_key) # Decrypt when reading decrypted = decrypt_value(encrypted) """ import base64 import hashlib import os from pathlib import Path from typing import Optional from cryptography.fernet import Fernet, InvalidToken from loguru import logger def get_secret_from_env(var_name: str) -> Optional[str]: """ Get a secret from environment, supporting Docker secrets pattern. Checks for VAR_FILE first (Docker secrets), then falls back to VAR. Args: var_name: Base name of the environment variable (e.g., "OPEN_NOTEBOOK_ENCRYPTION_KEY") Returns: The secret value, or None if not configured. """ # Check for _FILE variant first (Docker secrets) file_path = os.environ.get(f"{var_name}_FILE") if file_path: try: path = Path(file_path) if path.exists() and path.is_file(): secret = path.read_text().strip() if secret: logger.debug(f"Loaded {var_name} from file: {file_path}") return secret else: logger.warning(f"{var_name}_FILE points to empty file: {file_path}") else: logger.warning(f"{var_name}_FILE path does not exist: {file_path}") except Exception as e: logger.error(f"Failed to read {var_name} from file {file_path}: {e}") # Fall back to direct environment variable return os.environ.get(var_name) def _get_or_create_encryption_key() -> str: """ Get encryption key from environment, requires explicit configuration. Priority: 1. OPEN_NOTEBOOK_ENCRYPTION_KEY_FILE (Docker secrets) 2. OPEN_NOTEBOOK_ENCRYPTION_KEY (environment variable) For production deployments, you MUST set OPEN_NOTEBOOK_ENCRYPTION_KEY explicitly! Returns: Encryption key string. Raises: ValueError: If no encryption key is configured. """ # First check environment/Docker secrets key = get_secret_from_env("OPEN_NOTEBOOK_ENCRYPTION_KEY") if key: return key raise ValueError( "OPEN_NOTEBOOK_ENCRYPTION_KEY is not set. " "Set this environment variable to any secret string to enable " "encrypted storage of API keys in the database." ) # Lazy-loaded encryption key: initialized on first use, not at import time. # This prevents the entire app from crashing if the key is not yet configured # when other modules import from this file. _ENCRYPTION_KEY: Optional[str] = None def _get_encryption_key() -> str: """Get the encryption key, initializing lazily on first call.""" global _ENCRYPTION_KEY if _ENCRYPTION_KEY is None: _ENCRYPTION_KEY = _get_or_create_encryption_key() return _ENCRYPTION_KEY def _ensure_fernet_key(key: str) -> str: """ Derive a valid Fernet key from an arbitrary string via SHA-256. Any string is accepted as input. The key is derived by hashing it with SHA-256 and encoding the result as URL-safe base64. """ derived = hashlib.sha256(key.encode()).digest() return base64.urlsafe_b64encode(derived).decode() def get_fernet() -> Fernet: """ Get Fernet instance with the configured encryption key. Returns: Fernet instance. Raises: ValueError: If encryption key is not configured. """ return Fernet(_ensure_fernet_key(_get_encryption_key()).encode()) def encrypt_value(value: str) -> str: """ Encrypt a string value using Fernet symmetric encryption. Args: value: The plain text string to encrypt. Returns: Base64-encoded encrypted string. Raises: ValueError: If encryption is not configured. """ fernet = get_fernet() return fernet.encrypt(value.encode()).decode() def looks_like_fernet_token(s: str) -> bool: """ Check if string looks like a Fernet encrypted token. Fernet tokens are versioned (1 byte) + timestamp (8 bytes) + IV (16 bytes) + ciphertext (variable, multiple of 16 with PKCS7 padding) + HMAC (32 bytes). Minimum decoded size is 73 bytes (1+8+16+16+32) for the smallest payload. """ if len(s) < 100: # Base64 of 73 bytes = ~100 chars minimum return False try: decoded = base64.urlsafe_b64decode(s) # Fernet: version(1) + timestamp(8) + IV(16) + ciphertext(>=16) + HMAC(32) # Minimum 73 bytes, ciphertext must be multiple of 16 (AES block size) if len(decoded) < 73: return False ciphertext_len = len(decoded) - 1 - 8 - 16 - 32 return ciphertext_len > 0 and ciphertext_len % 16 == 0 except Exception: return False def decrypt_value(value: str) -> str: """ Decrypt a Fernet-encrypted string value. Handles graceful fallback for legacy unencrypted data. Args: value: The encrypted string (or plain text for legacy data). Returns: Decrypted plain text string, or original value if not encrypted. Raises: ValueError: If encryption is not configured or if decryption fails for what appears to be encrypted data (wrong key). """ fernet = get_fernet() try: return fernet.decrypt(value.encode()).decode() except InvalidToken: if looks_like_fernet_token(value): # Looks like encrypted data but failed to decrypt - likely wrong key raise ValueError( "Decryption failed: data appears to be encrypted but key is incorrect. " "Check OPEN_NOTEBOOK_ENCRYPTION_KEY configuration." ) # Not a valid token - treat as legacy plaintext return value except Exception as e: logger.error(f"Decryption failed: {e}") raise ValueError(f"Decryption failed: {str(e)}")