open-notebook/open_notebook/utils/embedding.py
Luis Novo 5b2c97cab7
Fix re-embedding issues and improve retry strategy (#515)
* fix: filter empty content in rebuild embeddings queries

Update collect_items_for_rebuild() to properly filter out items with
empty or whitespace-only content before submitting embedding jobs.

Changes:
- Sources: add string::trim(full_text) != '' filter
- Notes: add string::trim(content) != '' filter
- Insights: add content != none AND string::trim(content) != '' filter
  (previously had no content filter at all)

This prevents unnecessary job submissions that would fail validation
in the individual embed commands.

Ref #513

* feat: add command_id to embedding error logs

Add get_command_id() helper to extract command_id from execution context.
Include command_id in error logs for all embedding commands:
- embed_note_command
- embed_insight_command
- embed_source_command
- create_insight_command

This makes it easier to trace failed embedding jobs back to specific
command records in the database.

Ref #513

* fix: improve logging for embedding commands

Log improvements:
- Add command_id to all embedding error logs for traceability
- Transaction conflicts in repo_insert now log at DEBUG (not ERROR)
- Embedding API errors log at DEBUG, only ERROR when retries exhausted
- Friendlier retry messages: "This will be retried automatically"
- Include model name and command_id in generate_embeddings errors

Files changed:
- commands/embedding_commands.py: command_id in logs, friendlier messages
- open_notebook/database/repository.py: DEBUG for transaction conflicts
- open_notebook/utils/embedding.py: DEBUG logging, pass-through command_id

Ref #513

* fix: correct field names in rebuild embeddings status endpoint

The API status endpoint was looking for wrong field names:
- sources_processed → sources_submitted
- notes_processed → notes_submitted
- insights_processed → insights_submitted
- processed_items → jobs_submitted
- failed_items → failed_submissions

The command outputs "_submitted" because embedding happens async
(we count jobs submitted, not items processed).

Ref #513

* fix: update rebuild UI text to reflect async job submission

Changed terminology from "Completed/processed" to "Jobs Submitted"
since the rebuild command submits embedding jobs for async processing,
not completing them synchronously.

Updated in all locales: en-US, pt-BR, zh-CN, zh-TW, ja-JP

Ref #513

* refactor: migrate retry strategy from allowlist to blocklist

- Change from `retry_on: [RuntimeError, ...]` to `stop_on: [ValueError]`
- This is more resilient: new exception types auto-retry by default
- Simplified exception handling: ValueError = permanent, else = retry
- Transient errors logged at DEBUG (surreal-commands logs final failure)
- Permanent errors (ValueError) logged at ERROR

Ref #513
2026-01-31 18:55:01 -03:00

201 lines
6.5 KiB
Python

"""
Unified embedding utilities for Open Notebook.
Provides centralized embedding generation with support for:
- Single text embedding (with automatic chunking and mean pooling for large texts)
- Batch text embedding (multiple texts in a single API call)
- Mean pooling for combining multiple embeddings into one
All embedding operations in the application should use these functions
to ensure consistent behavior and proper handling of large content.
"""
from typing import List, Optional
import numpy as np
from loguru import logger
from open_notebook.ai.models import model_manager
from .chunking import CHUNK_SIZE, ContentType, chunk_text
async def mean_pool_embeddings(embeddings: List[List[float]]) -> List[float]:
"""
Combine multiple embeddings into a single embedding using mean pooling.
Algorithm:
1. Normalize each embedding to unit length
2. Compute element-wise mean
3. Normalize the result to unit length
This approach ensures the final embedding has the same properties as
individual embeddings (unit length) regardless of input count.
Args:
embeddings: List of embedding vectors (each is a list of floats)
Returns:
Single embedding vector (mean pooled and normalized)
Raises:
ValueError: If embeddings list is empty or embeddings have different dimensions
"""
if not embeddings:
raise ValueError("Cannot mean pool empty list of embeddings")
if len(embeddings) == 1:
# Single embedding - just normalize and return
arr = np.array(embeddings[0], dtype=np.float64)
norm = np.linalg.norm(arr)
if norm > 0:
arr = arr / norm
return arr.tolist()
# Convert to numpy array
arr = np.array(embeddings, dtype=np.float64)
# Verify all embeddings have same dimension
if arr.ndim != 2:
raise ValueError(f"Expected 2D array, got shape {arr.shape}")
# Normalize each embedding to unit length
norms = np.linalg.norm(arr, axis=1, keepdims=True)
# Avoid division by zero
norms = np.where(norms > 0, norms, 1.0)
normalized = arr / norms
# Compute mean
mean = np.mean(normalized, axis=0)
# Normalize the result
mean_norm = np.linalg.norm(mean)
if mean_norm > 0:
mean = mean / mean_norm
return mean.tolist()
async def generate_embeddings(
texts: List[str], command_id: Optional[str] = None
) -> List[List[float]]:
"""
Generate embeddings for multiple texts in a single API call.
This is more efficient than calling generate_embedding() multiple times
when you have multiple texts to embed (e.g., source chunks).
Args:
texts: List of text strings to embed
command_id: Optional command ID for error logging context
Returns:
List of embedding vectors, one per input text
Raises:
ValueError: If no embedding model is configured
RuntimeError: If embedding generation fails
"""
if not texts:
return []
embedding_model = await model_manager.get_embedding_model()
if not embedding_model:
raise ValueError(
"No embedding model configured. Please configure one in the Models section."
)
model_name = getattr(embedding_model, "model_name", "unknown")
# Log text sizes for debugging
text_sizes = [len(t) for t in texts]
logger.debug(
f"Generating embeddings for {len(texts)} texts "
f"(sizes: min={min(text_sizes)}, max={max(text_sizes)}, "
f"total={sum(text_sizes)} chars)"
)
try:
# Single API call for all texts
embeddings = await embedding_model.aembed(texts)
logger.debug(f"Generated {len(embeddings)} embeddings")
return embeddings
except Exception as e:
# Log at debug level - the calling command will log at appropriate level
# based on whether retries are exhausted
cmd_context = f" (command: {command_id})" if command_id else ""
logger.debug(
f"Embedding API error using model '{model_name}' "
f"for {len(texts)} texts (sizes: {min(text_sizes)}-{max(text_sizes)} chars)"
f"{cmd_context}: {e}"
)
raise RuntimeError(
f"Failed to generate embeddings using model '{model_name}': {e}"
) from e
async def generate_embedding(
text: str,
content_type: Optional[ContentType] = None,
file_path: Optional[str] = None,
command_id: Optional[str] = None,
) -> List[float]:
"""
Generate a single embedding for text, handling large content via chunking and mean pooling.
For short text (<= CHUNK_SIZE):
- Embeds directly and returns the embedding
For long text (> CHUNK_SIZE):
- Chunks the text using appropriate splitter for content type
- Embeds all chunks in a single API call
- Combines embeddings via mean pooling
Args:
text: The text to embed
content_type: Optional explicit content type for chunking
file_path: Optional file path for content type detection
command_id: Optional command ID for error logging context
Returns:
Single embedding vector (list of floats)
Raises:
ValueError: If text is empty or no embedding model configured
RuntimeError: If embedding generation fails
"""
if not text or not text.strip():
raise ValueError("Cannot generate embedding for empty text")
text = text.strip()
# Check if chunking is needed
if len(text) <= CHUNK_SIZE:
# Short text - embed directly
logger.debug(f"Embedding short text ({len(text)} chars) directly")
embeddings = await generate_embeddings([text], command_id=command_id)
return embeddings[0]
# Long text - chunk and mean pool
logger.debug(f"Text exceeds chunk size ({len(text)} chars), chunking...")
chunks = chunk_text(text, content_type=content_type, file_path=file_path)
if not chunks:
raise ValueError("Text chunking produced no chunks")
if len(chunks) == 1:
# Single chunk after splitting
embeddings = await generate_embeddings(chunks, command_id=command_id)
return embeddings[0]
logger.debug(f"Embedding {len(chunks)} chunks and mean pooling")
# Embed all chunks in single API call
embeddings = await generate_embeddings(chunks, command_id=command_id)
# Mean pool to get single embedding
pooled = await mean_pool_embeddings(embeddings)
logger.debug(f"Mean pooled {len(embeddings)} embeddings into single vector")
return pooled