diff --git a/api/routers/embedding_rebuild.py b/api/routers/embedding_rebuild.py index 3697891..155d8c0 100644 --- a/api/routers/embedding_rebuild.py +++ b/api/routers/embedding_rebuild.py @@ -149,21 +149,21 @@ async def get_rebuild_status(command_id: str): result = status.result # Build progress info - if "total_items" in result and "processed_items" in result: + if "total_items" in result and "jobs_submitted" in result: total = result["total_items"] - processed = result["processed_items"] + submitted = result["jobs_submitted"] response.progress = RebuildProgress( - processed=processed, + processed=submitted, total=total, - percentage=round((processed / total * 100) if total > 0 else 0, 2), + percentage=round((submitted / total * 100) if total > 0 else 0, 2), ) # Build stats response.stats = RebuildStats( - sources=result.get("sources_processed", 0), - notes=result.get("notes_processed", 0), - insights=result.get("insights_processed", 0), - failed=result.get("failed_items", 0), + sources=result.get("sources_submitted", 0), + notes=result.get("notes_submitted", 0), + insights=result.get("insights_submitted", 0), + failed=result.get("failed_submissions", 0), ) # Add timestamps diff --git a/commands/CLAUDE.md b/commands/CLAUDE.md index 5fe6f2f..676a70a 100644 --- a/commands/CLAUDE.md +++ b/commands/CLAUDE.md @@ -23,8 +23,8 @@ ## Important Patterns - **Pydantic I/O**: All commands use `CommandInput`/`CommandOutput` subclasses for type safety and serialization. -- **Error handling**: Permanent errors return failure output; `RuntimeError` exceptions auto-retry via surreal-commands. -- **Retry configuration**: Embedding commands use moderate retry settings (5 attempts, 1-60s backoff). Retries handle transient failures (RuntimeError, ConnectionError, TimeoutError). +- **Error handling**: Permanent errors (ValueError) return failure output; all other exceptions auto-retry via surreal-commands. +- **Retry configuration**: Uses `stop_on: [ValueError]` (blocklist approach) - retries all exceptions EXCEPT ValueError. This is more resilient than allowlist as new exception types auto-retry. - **Fire-and-forget embedding**: Domain models submit embed_* commands via `submit_command()` without waiting. Commands process asynchronously. - **Content-type aware chunking**: `embed_source_command` uses `chunk_text()` with automatic content type detection (HTML, Markdown, plain text) for optimal text splitting. Default: 1500 char chunks with 225 char overlap. - **Batch embedding**: `embed_source_command` uses `generate_embeddings()` for single API call efficiency instead of per-chunk calls. @@ -40,8 +40,8 @@ ## Quirks & Edge Cases -- **source_commands**: `ensure_record_id()` wraps command IDs for DB storage; transaction conflicts trigger exponential backoff retry. Non-`RuntimeError` exceptions are permanent. -- **embedding_commands**: Content type detection uses file extension as primary source, heuristics as fallback. Chunks >1800 chars trigger secondary splitting. Empty/whitespace-only content returns early. +- **source_commands**: `ensure_record_id()` wraps command IDs for DB storage; transaction conflicts trigger exponential backoff retry. ValueError exceptions are permanent (not retried). +- **embedding_commands**: Content type detection uses file extension as primary source, heuristics as fallback. Chunks >1800 chars trigger secondary splitting. Empty/whitespace-only content returns ValueError (not retried). - **rebuild_embeddings_command**: Returns "jobs_submitted" not "processed_items" - embedding is async. Individual commands handle failures with their own retries. - **podcast_commands**: Profiles loaded from SurrealDB by name (must exist); briefing can be extended with suffix. Episode records created mid-execution. - **Example commands**: Accept optional `delay_seconds` for testing async behavior; not for production. @@ -49,7 +49,11 @@ ## Code Example ```python -@command("process_source", app="open_notebook", retry={...}) +@command("process_source", app="open_notebook", retry={ + "max_attempts": 5, + "wait_strategy": "exponential_jitter", + "stop_on": [ValueError], # Don't retry validation errors +}) async def process_source_command(input_data: SourceProcessingInput) -> SourceProcessingOutput: start_time = time.time() try: @@ -57,8 +61,8 @@ async def process_source_command(input_data: SourceProcessingInput) -> SourcePro source = await Source.get(input_data.source_id) result = await source_graph.ainvoke({...}) return SourceProcessingOutput(success=True, ...) - except RuntimeError as e: - raise # Retry this + except ValueError as e: + return SourceProcessingOutput(success=False, error_message=str(e)) # No retry except Exception as e: - return SourceProcessingOutput(success=False, error_message=str(e)) + raise # Retry all other exceptions ``` diff --git a/commands/embedding_commands.py b/commands/embedding_commands.py index 1087066..6d47cd4 100644 --- a/commands/embedding_commands.py +++ b/commands/embedding_commands.py @@ -23,6 +23,13 @@ def full_model_dump(model): return model +def get_command_id(input_data: CommandInput) -> str: + """Extract command_id from input_data's execution context, or return 'unknown'.""" + if input_data.execution_context: + return str(input_data.execution_context.command_id) + return "unknown" + + class RebuildEmbeddingsInput(CommandInput): mode: Literal["existing", "all"] include_sources: bool = True @@ -118,7 +125,7 @@ class EmbedSourceOutput(CommandOutput): "wait_strategy": "exponential_jitter", "wait_min": 1, "wait_max": 60, - "retry_on": [RuntimeError, ConnectionError, TimeoutError], + "stop_on": [ValueError], # Don't retry validation errors "retry_log_level": "debug", }, ) @@ -135,9 +142,9 @@ async def embed_note_command(input_data: EmbedNoteInput) -> EmbedNoteOutput: 3. UPSERT note embedding in database Retry Strategy: - - Retries up to 5 times for transient failures (RuntimeError, ConnectionError, TimeoutError) + - Retries up to 5 times for transient failures (network, timeout, etc.) - Uses exponential-jitter backoff (1-60s) - - Does NOT retry permanent failures (ValueError, authentication errors) + - Does NOT retry permanent failures (ValueError for validation errors) """ start_time = time.time() @@ -154,8 +161,9 @@ async def embed_note_command(input_data: EmbedNoteInput) -> EmbedNoteOutput: # 2. Generate embedding (auto-chunks + mean pools if needed) # Notes are typically markdown content + cmd_id = get_command_id(input_data) embedding = await generate_embedding( - note.content, content_type=ContentType.MARKDOWN + note.content, content_type=ContentType.MARKDOWN, command_id=cmd_id ) # 3. UPSERT embedding into note record @@ -178,27 +186,27 @@ async def embed_note_command(input_data: EmbedNoteInput) -> EmbedNoteOutput: processing_time=processing_time, ) - except RuntimeError: - logger.debug( - f"Transaction conflict for note {input_data.note_id} - will be retried" - ) - raise - except (ConnectionError, TimeoutError) as e: - logger.debug( - f"Network/timeout error for note {input_data.note_id} ({type(e).__name__}: {e}) - will be retried" - ) - raise - except Exception as e: + except ValueError as e: + # Permanent failure - don't retry processing_time = time.time() - start_time - logger.error(f"Failed to embed note {input_data.note_id}: {e}") - logger.exception(e) - + cmd_id = get_command_id(input_data) + logger.error( + f"Failed to embed note {input_data.note_id} (command: {cmd_id}): {e}" + ) return EmbedNoteOutput( success=False, note_id=input_data.note_id, processing_time=processing_time, error_message=str(e), ) + except Exception as e: + # Transient failure - will be retried (surreal-commands logs final failure) + cmd_id = get_command_id(input_data) + logger.debug( + f"Transient error embedding note {input_data.note_id} " + f"(command: {cmd_id}): {e}" + ) + raise @command( @@ -209,7 +217,7 @@ async def embed_note_command(input_data: EmbedNoteInput) -> EmbedNoteOutput: "wait_strategy": "exponential_jitter", "wait_min": 1, "wait_max": 60, - "retry_on": [RuntimeError, ConnectionError, TimeoutError], + "stop_on": [ValueError], # Don't retry validation errors "retry_log_level": "debug", }, ) @@ -226,9 +234,9 @@ async def embed_insight_command(input_data: EmbedInsightInput) -> EmbedInsightOu 3. UPSERT insight embedding in database Retry Strategy: - - Retries up to 5 times for transient failures (RuntimeError, ConnectionError, TimeoutError) + - Retries up to 5 times for transient failures (network, timeout, etc.) - Uses exponential-jitter backoff (1-60s) - - Does NOT retry permanent failures (ValueError, authentication errors) + - Does NOT retry permanent failures (ValueError for validation errors) """ start_time = time.time() @@ -247,8 +255,9 @@ async def embed_insight_command(input_data: EmbedInsightInput) -> EmbedInsightOu # 2. Generate embedding (auto-chunks + mean pools if needed) # Insights are typically markdown content (generated by LLM) + cmd_id = get_command_id(input_data) embedding = await generate_embedding( - insight.content, content_type=ContentType.MARKDOWN + insight.content, content_type=ContentType.MARKDOWN, command_id=cmd_id ) # 3. UPSERT embedding into insight record @@ -271,27 +280,27 @@ async def embed_insight_command(input_data: EmbedInsightInput) -> EmbedInsightOu processing_time=processing_time, ) - except RuntimeError: - logger.debug( - f"Transaction conflict for insight {input_data.insight_id} - will be retried" - ) - raise - except (ConnectionError, TimeoutError) as e: - logger.debug( - f"Network/timeout error for insight {input_data.insight_id} ({type(e).__name__}: {e}) - will be retried" - ) - raise - except Exception as e: + except ValueError as e: + # Permanent failure - don't retry processing_time = time.time() - start_time - logger.error(f"Failed to embed insight {input_data.insight_id}: {e}") - logger.exception(e) - + cmd_id = get_command_id(input_data) + logger.error( + f"Failed to embed insight {input_data.insight_id} (command: {cmd_id}): {e}" + ) return EmbedInsightOutput( success=False, insight_id=input_data.insight_id, processing_time=processing_time, error_message=str(e), ) + except Exception as e: + # Transient failure - will be retried (surreal-commands logs final failure) + cmd_id = get_command_id(input_data) + logger.debug( + f"Transient error embedding insight {input_data.insight_id} " + f"(command: {cmd_id}): {e}" + ) + raise @command( @@ -302,7 +311,7 @@ async def embed_insight_command(input_data: EmbedInsightInput) -> EmbedInsightOu "wait_strategy": "exponential_jitter", "wait_min": 1, "wait_max": 60, - "retry_on": [RuntimeError, ConnectionError, TimeoutError], + "stop_on": [ValueError], # Don't retry validation errors "retry_log_level": "debug", }, ) @@ -322,9 +331,9 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu 6. Bulk INSERT source_embedding records Retry Strategy: - - Retries up to 5 times for transient failures (RuntimeError, ConnectionError, TimeoutError) + - Retries up to 5 times for transient failures (network, timeout, etc.) - Uses exponential-jitter backoff (1-60s) - - Does NOT retry permanent failures (ValueError, authentication errors) + - Does NOT retry permanent failures (ValueError for validation errors) """ start_time = time.time() @@ -368,8 +377,9 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu raise ValueError("No chunks created after splitting text") # 5. Generate embeddings for all chunks in single API call + cmd_id = get_command_id(input_data) logger.debug(f"Generating embeddings for {total_chunks} chunks") - embeddings = await generate_embeddings(chunks) + embeddings = await generate_embeddings(chunks, command_id=cmd_id) # Verify we got embeddings for all chunks if len(embeddings) != len(chunks): @@ -405,21 +415,13 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu processing_time=processing_time, ) - except RuntimeError: - logger.debug( - f"Transaction conflict for source {input_data.source_id} - will be retried" - ) - raise - except (ConnectionError, TimeoutError) as e: - logger.debug( - f"Network/timeout error for source {input_data.source_id} ({type(e).__name__}: {e}) - will be retried" - ) - raise - except Exception as e: + except ValueError as e: + # Permanent failure - don't retry processing_time = time.time() - start_time - logger.error(f"Failed to embed source {input_data.source_id}: {e}") - logger.exception(e) - + cmd_id = get_command_id(input_data) + logger.error( + f"Failed to embed source {input_data.source_id} (command: {cmd_id}): {e}" + ) return EmbedSourceOutput( success=False, source_id=input_data.source_id, @@ -427,6 +429,14 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu processing_time=processing_time, error_message=str(e), ) + except Exception as e: + # Transient failure - will be retried (surreal-commands logs final failure) + cmd_id = get_command_id(input_data) + logger.debug( + f"Transient error embedding source {input_data.source_id} " + f"(command: {cmd_id}): {e}" + ) + raise @command( @@ -437,7 +447,7 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu "wait_strategy": "exponential_jitter", "wait_min": 1, "wait_max": 60, - "retry_on": [RuntimeError, ConnectionError, TimeoutError], + "stop_on": [ValueError], # Don't retry validation errors "retry_log_level": "debug", }, ) @@ -457,9 +467,9 @@ async def create_insight_command( 3. Return the insight_id Retry Strategy: - - Retries up to 5 times for transient failures (RuntimeError, ConnectionError, TimeoutError) + - Retries up to 5 times for transient failures (network, timeout, etc.) - Uses exponential-jitter backoff (1-60s) - - Does NOT retry permanent failures (ValueError, authentication errors) + - Does NOT retry permanent failures (ValueError for validation errors) """ start_time = time.time() @@ -512,30 +522,27 @@ async def create_insight_command( processing_time=processing_time, ) - except RuntimeError: - logger.debug( - f"Transaction conflict creating insight for source " - f"{input_data.source_id} - will retry" - ) - raise - except (ConnectionError, TimeoutError) as e: - logger.debug( - f"Network/timeout error creating insight for source " - f"{input_data.source_id} ({type(e).__name__}: {e}) - will retry" - ) - raise - except Exception as e: + except ValueError as e: + # Permanent failure - don't retry processing_time = time.time() - start_time + cmd_id = get_command_id(input_data) logger.error( - f"Failed to create insight for source {input_data.source_id}: {e}" + f"Failed to create insight for source {input_data.source_id} " + f"(command: {cmd_id}): {e}" ) - logger.exception(e) - return CreateInsightOutput( success=False, processing_time=processing_time, error_message=str(e), ) + except Exception as e: + # Transient failure - will be retried (surreal-commands logs final failure) + cmd_id = get_command_id(input_data) + logger.debug( + f"Transient error creating insight for source {input_data.source_id} " + f"(command: {cmd_id}): {e}" + ) + raise async def collect_items_for_rebuild( @@ -570,8 +577,10 @@ async def collect_items_for_rebuild( else: items["sources"] = [] else: # mode == "all" - # Query all sources with content - result = await repo_query("SELECT id FROM source WHERE full_text != none") + # Query all sources with non-empty content + result = await repo_query( + "SELECT id FROM source WHERE full_text != none AND string::trim(full_text) != ''" + ) items["sources"] = [str(item["id"]) for item in result] if result else [] logger.info(f"Collected {len(items['sources'])} sources for rebuild") @@ -583,8 +592,10 @@ async def collect_items_for_rebuild( "SELECT id FROM note WHERE embedding != none AND array::len(embedding) > 0" ) else: # mode == "all" - # Query all notes (with content) - result = await repo_query("SELECT id FROM note WHERE content != none") + # Query all notes with non-empty content + result = await repo_query( + "SELECT id FROM note WHERE content != none AND string::trim(content) != ''" + ) items["notes"] = [str(item["id"]) for item in result] if result else [] logger.info(f"Collected {len(items['notes'])} notes for rebuild") @@ -596,8 +607,10 @@ async def collect_items_for_rebuild( "SELECT id FROM source_insight WHERE embedding != none AND array::len(embedding) > 0" ) else: # mode == "all" - # Query all insights - result = await repo_query("SELECT id FROM source_insight") + # Query all insights with non-empty content + result = await repo_query( + "SELECT id FROM source_insight WHERE content != none AND string::trim(content) != ''" + ) items["insights"] = [str(item["id"]) for item in result] if result else [] logger.info(f"Collected {len(items['insights'])} insights for rebuild") diff --git a/commands/source_commands.py b/commands/source_commands.py index b5fde95..02df7de 100644 --- a/commands/source_commands.py +++ b/commands/source_commands.py @@ -49,12 +49,12 @@ class SourceProcessingOutput(CommandOutput): "process_source", app="open_notebook", retry={ - "max_attempts": 15, # Increased from 5 to handle deep queues (workaround for SurrealDB v2 transaction conflicts) + "max_attempts": 15, # Handle deep queues (workaround for SurrealDB v2 transaction conflicts) "wait_strategy": "exponential_jitter", "wait_min": 1, - "wait_max": 120, # Increased from 30s to 120s to allow queue to drain - "retry_on": [RuntimeError], - "retry_log_level": "debug", # Use debug level to avoid log noise during transaction conflicts + "wait_max": 120, # Allow queue to drain + "stop_on": [ValueError], # Don't retry validation errors + "retry_log_level": "debug", # Avoid log noise during transaction conflicts }, ) async def process_source_command( @@ -136,22 +136,22 @@ async def process_source_command( processing_time=processing_time, ) - except RuntimeError as e: - # Transaction conflicts should be retried by surreal-commands - logger.debug(f"Transaction conflict, will retry: {e}") - raise - - except Exception as e: - # Other errors are permanent failures + except ValueError as e: + # Validation errors are permanent failures - don't retry processing_time = time.time() - start_time logger.error(f"Source processing failed: {e}") - return SourceProcessingOutput( success=False, source_id=input_data.source_id, processing_time=processing_time, error_message=str(e), ) + except Exception as e: + # Transient failure - will be retried (surreal-commands logs final failure) + logger.debug( + f"Transient error processing source {input_data.source_id}: {e}" + ) + raise # ============================================================================= @@ -184,7 +184,7 @@ class RunTransformationOutput(CommandOutput): "wait_strategy": "exponential_jitter", "wait_min": 1, "wait_max": 60, - "retry_on": [RuntimeError, ConnectionError, TimeoutError], + "stop_on": [ValueError], # Don't retry validation errors "retry_log_level": "debug", }, ) @@ -203,8 +203,9 @@ async def run_transformation_command( the HTTP request while the LLM processes. Retry Strategy: - - Retries up to 5 times for transient failures + - Retries up to 5 times for transient failures (network, timeout, etc.) - Uses exponential-jitter backoff (1-60s) + - Does NOT retry permanent failures (ValueError for validation errors) """ start_time = time.time() @@ -244,25 +245,13 @@ async def run_transformation_command( processing_time=processing_time, ) - except RuntimeError: - logger.debug( - f"Transaction conflict running transformation - will retry" - ) - raise - except (ConnectionError, TimeoutError) as e: - logger.debug( - f"Network/timeout error running transformation " - f"({type(e).__name__}: {e}) - will retry" - ) - raise - except Exception as e: + except ValueError as e: + # Validation errors are permanent failures - don't retry processing_time = time.time() - start_time logger.error( f"Failed to run transformation {input_data.transformation_id} " f"on source {input_data.source_id}: {e}" ) - logger.exception(e) - return RunTransformationOutput( success=False, source_id=input_data.source_id, @@ -270,3 +259,10 @@ async def run_transformation_command( processing_time=processing_time, error_message=str(e), ) + except Exception as e: + # Transient failure - will be retried (surreal-commands logs final failure) + logger.debug( + f"Transient error running transformation {input_data.transformation_id} " + f"on source {input_data.source_id}: {e}" + ) + raise diff --git a/frontend/src/lib/locales/en-US/index.ts b/frontend/src/lib/locales/en-US/index.ts index ca0bf49..9402b91 100644 --- a/frontend/src/lib/locales/en-US/index.ts +++ b/frontend/src/lib/locales/en-US/index.ts @@ -798,13 +798,13 @@ export const enUS = { starting: "Starting Rebuild...", startBtn: "🚀 Start Rebuild", queued: "Queued", - running: "Running...", - completed: "Completed!", + running: "Submitting jobs...", + completed: "Jobs Submitted!", failed: "Failed", leavePageHint: "You can leave this page as this will run in the background", startNew: "Start New Rebuild", - itemsProcessed: "{processed}/{total} items ({percent}%)", - failedItems: "{count} items failed to process", + itemsProcessed: "{processed}/{total} jobs submitted ({percent}%)", + failedItems: "{count} jobs failed to submit", time: "Time", whenToRebuild: "When should I rebuild embeddings?", whenToRebuildAns: "You should rebuild when switching models, upgrading versions, fixing corruption, or after bulk imports.", diff --git a/frontend/src/lib/locales/ja-JP/index.ts b/frontend/src/lib/locales/ja-JP/index.ts index 49e13ea..7d9e6c3 100644 --- a/frontend/src/lib/locales/ja-JP/index.ts +++ b/frontend/src/lib/locales/ja-JP/index.ts @@ -798,13 +798,13 @@ export const jaJP = { starting: "再構築を開始中...", startBtn: "🚀 再構築を開始", queued: "キュー待ち", - running: "実行中...", - completed: "完了!", + running: "ジョブ送信中...", + completed: "ジョブ送信完了!", failed: "失敗", leavePageHint: "バックグラウンドで実行されるため、このページを離れても構いません", startNew: "新しい再構築を開始", - itemsProcessed: "{processed}/{total}件処理済み({percent}%)", - failedItems: "{count}件のアイテムの処理に失敗しました", + itemsProcessed: "{processed}/{total}件送信済み({percent}%)", + failedItems: "{count}件のジョブの送信に失敗しました", time: "経過時間", whenToRebuild: "いつEmbeddingを再構築すべき?", whenToRebuildAns: "モデルの切り替え時、バージョンアップ時、破損の修復時、または一括インポート後に再構築してください。", diff --git a/frontend/src/lib/locales/pt-BR/index.ts b/frontend/src/lib/locales/pt-BR/index.ts index 803581d..6bd401b 100644 --- a/frontend/src/lib/locales/pt-BR/index.ts +++ b/frontend/src/lib/locales/pt-BR/index.ts @@ -798,13 +798,13 @@ export const ptBR = { starting: "Iniciando Reconstrução...", startBtn: "🚀 Iniciar Reconstrução", queued: "Na Fila", - running: "Executando...", - completed: "Concluído!", + running: "Enviando jobs...", + completed: "Jobs Enviados!", failed: "Falhou", leavePageHint: "Você pode sair desta página pois isso será executado em segundo plano", startNew: "Iniciar Nova Reconstrução", - itemsProcessed: "{processed}/{total} itens ({percent}%)", - failedItems: "{count} itens falharam ao processar", + itemsProcessed: "{processed}/{total} jobs enviados ({percent}%)", + failedItems: "{count} jobs falharam ao enviar", time: "Tempo", whenToRebuild: "Quando devo reconstruir embeddings?", whenToRebuildAns: "Você deve reconstruir ao trocar modelos, atualizar versões, corrigir corrupção ou após importações em massa.", diff --git a/frontend/src/lib/locales/zh-CN/index.ts b/frontend/src/lib/locales/zh-CN/index.ts index 23979e9..23ef420 100644 --- a/frontend/src/lib/locales/zh-CN/index.ts +++ b/frontend/src/lib/locales/zh-CN/index.ts @@ -798,13 +798,13 @@ export const zhCN = { starting: "正在启动重建...", startBtn: "开始重建", queued: "排队中", - running: "正在运行...", - completed: "已完成!", + running: "正在提交任务...", + completed: "任务已提交!", failed: "失败", leavePageHint: "您可以离开此页面,后台将继续运行", startNew: "开始新的重建", - itemsProcessed: "{processed}/{total} 项 ({percent}%)", - failedItems: "{count} 项处理失败", + itemsProcessed: "{processed}/{total} 任务已提交 ({percent}%)", + failedItems: "{count} 任务提交失败", time: "耗时", whenToRebuild: "我该何时重建索引?", whenToRebuildAns: "当您切换嵌入模型、升级模型版本、怀疑数据损坏或进行了大批量内容导入后,建议执行重建。", diff --git a/frontend/src/lib/locales/zh-TW/index.ts b/frontend/src/lib/locales/zh-TW/index.ts index 9346ee4..e2b16b7 100644 --- a/frontend/src/lib/locales/zh-TW/index.ts +++ b/frontend/src/lib/locales/zh-TW/index.ts @@ -798,13 +798,13 @@ export const zhTW = { starting: "正在啟動重建...", startBtn: "開始重建", queued: "排隊中", - running: "正在執行...", - completed: "已完成!", + running: "正在提交任務...", + completed: "任務已提交!", failed: "失敗", leavePageHint: "您可以離開此頁面,後台將繼續運行", startNew: "開始新的重建", - itemsProcessed: "{processed}/{total} 項 ({percent}%)", - failedItems: "{count} 項處理失敗", + itemsProcessed: "{processed}/{total} 任務已提交 ({percent}%)", + failedItems: "{count} 任務提交失敗", time: "耗時", whenToRebuild: "我該何時重建索引?", whenToRebuildAns: "當您切換嵌入模型、升級模型版本、懷疑資料損壞或進行了大批次內容導入後,建議執行重建。", diff --git a/open_notebook/database/repository.py b/open_notebook/database/repository.py index a150977..3fb9e04 100644 --- a/open_notebook/database/repository.py +++ b/open_notebook/database/repository.py @@ -180,7 +180,12 @@ async def repo_insert( except RuntimeError as e: if ignore_duplicates and "already contains" in str(e): return [] - logger.error(str(e)) + # Log transaction conflicts at debug level (they are expected during concurrent operations) + error_str = str(e).lower() + if "transaction" in error_str or "conflict" in error_str: + logger.debug(str(e)) + else: + logger.error(str(e)) raise except Exception as e: if ignore_duplicates and "already contains" in str(e): diff --git a/open_notebook/utils/embedding.py b/open_notebook/utils/embedding.py index 743962e..107eb02 100644 --- a/open_notebook/utils/embedding.py +++ b/open_notebook/utils/embedding.py @@ -76,7 +76,9 @@ async def mean_pool_embeddings(embeddings: List[List[float]]) -> List[float]: return mean.tolist() -async def generate_embeddings(texts: List[str]) -> List[List[float]]: +async def generate_embeddings( + texts: List[str], command_id: Optional[str] = None +) -> List[List[float]]: """ Generate embeddings for multiple texts in a single API call. @@ -85,6 +87,7 @@ async def generate_embeddings(texts: List[str]) -> List[List[float]]: Args: texts: List of text strings to embed + command_id: Optional command ID for error logging context Returns: List of embedding vectors, one per input text @@ -102,6 +105,8 @@ async def generate_embeddings(texts: List[str]) -> List[List[float]]: "No embedding model configured. Please configure one in the Models section." ) + model_name = getattr(embedding_model, "model_name", "unknown") + # Log text sizes for debugging text_sizes = [len(t) for t in texts] logger.debug( @@ -116,17 +121,24 @@ async def generate_embeddings(texts: List[str]) -> List[List[float]]: logger.debug(f"Generated {len(embeddings)} embeddings") return embeddings except Exception as e: - logger.error( - f"Failed to generate embeddings: {e} " - f"(tried {len(texts)} texts, max size: {max(text_sizes)} chars)" + # Log at debug level - the calling command will log at appropriate level + # based on whether retries are exhausted + cmd_context = f" (command: {command_id})" if command_id else "" + logger.debug( + f"Embedding API error using model '{model_name}' " + f"for {len(texts)} texts (sizes: {min(text_sizes)}-{max(text_sizes)} chars)" + f"{cmd_context}: {e}" ) - raise RuntimeError(f"Failed to generate embeddings: {e}") from e + raise RuntimeError( + f"Failed to generate embeddings using model '{model_name}': {e}" + ) from e async def generate_embedding( text: str, content_type: Optional[ContentType] = None, file_path: Optional[str] = None, + command_id: Optional[str] = None, ) -> List[float]: """ Generate a single embedding for text, handling large content via chunking and mean pooling. @@ -143,6 +155,7 @@ async def generate_embedding( text: The text to embed content_type: Optional explicit content type for chunking file_path: Optional file path for content type detection + command_id: Optional command ID for error logging context Returns: Single embedding vector (list of floats) @@ -160,7 +173,7 @@ async def generate_embedding( if len(text) <= CHUNK_SIZE: # Short text - embed directly logger.debug(f"Embedding short text ({len(text)} chars) directly") - embeddings = await generate_embeddings([text]) + embeddings = await generate_embeddings([text], command_id=command_id) return embeddings[0] # Long text - chunk and mean pool @@ -173,13 +186,13 @@ async def generate_embedding( if len(chunks) == 1: # Single chunk after splitting - embeddings = await generate_embeddings(chunks) + embeddings = await generate_embeddings(chunks, command_id=command_id) return embeddings[0] logger.debug(f"Embedding {len(chunks)} chunks and mean pooling") # Embed all chunks in single API call - embeddings = await generate_embeddings(chunks) + embeddings = await generate_embeddings(chunks, command_id=command_id) # Mean pool to get single embedding pooled = await mean_pool_embeddings(embeddings)