mirror of
https://github.com/lfnovo/open-notebook.git
synced 2026-04-29 03:50:04 +00:00
Release 1.2 (#242)
* chore: improve podcast transcripts * fix: remove date from insight - fixes #241 * fix: improve scrolling on source and insights - fixes #237 * chore: update esperanto to fix: #234 * chore: update esperanto to fix #226 * fix: process vectorization as subcommands to handle larger documents more gracefully - fix: #229 * feat: enable background job retry capabilities * feat: reenable content types that were disabled during alpha version * fix: remove unnecessary model caching causing many issues. * feat: support multiple azure endpoints and keys just like openai compatible. Fixes #215 * docs: update azure variables * chore: bump and update dependencies
This commit is contained in:
parent
bc35a95117
commit
f79a9040ae
20 changed files with 1077 additions and 435 deletions
|
|
@ -3,6 +3,7 @@ from typing import Any, ClassVar, Dict, List, Literal, Optional, Tuple, Union
|
|||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from surreal_commands import submit_command
|
||||
from surrealdb import RecordID
|
||||
|
||||
from open_notebook.database.repository import ensure_record_id, repo_query
|
||||
|
|
@ -262,83 +263,49 @@ class Source(ObjectModel):
|
|||
raise InvalidInputError("Notebook ID must be provided")
|
||||
return await self.relate("reference", notebook_id)
|
||||
|
||||
async def vectorize(self) -> None:
|
||||
logger.info(f"Starting vectorization for source {self.id}")
|
||||
EMBEDDING_MODEL = await model_manager.get_embedding_model()
|
||||
async def vectorize(self) -> str:
|
||||
"""
|
||||
Submit vectorization as a background job using the vectorize_source command.
|
||||
|
||||
This method now leverages the job-based architecture to prevent HTTP connection
|
||||
pool exhaustion when processing large documents. The actual chunk processing
|
||||
happens in the background worker pool, with natural concurrency control.
|
||||
|
||||
Returns:
|
||||
str: The command/job ID that can be used to track progress via the commands API
|
||||
|
||||
Raises:
|
||||
ValueError: If source has no text to vectorize
|
||||
DatabaseOperationError: If job submission fails
|
||||
"""
|
||||
logger.info(f"Submitting vectorization job for source {self.id}")
|
||||
|
||||
try:
|
||||
# DELETE EXISTING EMBEDDINGS FIRST - Makes vectorize() idempotent
|
||||
delete_result = await repo_query(
|
||||
"DELETE source_embedding WHERE source = $source_id",
|
||||
{"source_id": ensure_record_id(self.id)}
|
||||
)
|
||||
deleted_count = len(delete_result) if delete_result else 0
|
||||
if deleted_count > 0:
|
||||
logger.info(f"Deleted {deleted_count} existing embeddings for source {self.id}")
|
||||
else:
|
||||
logger.debug(f"No existing embeddings found for source {self.id}")
|
||||
|
||||
if not self.full_text:
|
||||
logger.warning(f"No text to vectorize for source {self.id}")
|
||||
return
|
||||
raise ValueError(f"Source {self.id} has no text to vectorize")
|
||||
|
||||
chunks = split_text(
|
||||
self.full_text,
|
||||
# Submit the vectorize_source command which will:
|
||||
# 1. Delete existing embeddings (idempotency)
|
||||
# 2. Split text into chunks
|
||||
# 3. Submit each chunk as an embed_chunk job
|
||||
command_id = submit_command(
|
||||
"open_notebook", # app name
|
||||
"vectorize_source", # command name
|
||||
{
|
||||
"source_id": str(self.id),
|
||||
}
|
||||
)
|
||||
chunk_count = len(chunks)
|
||||
logger.info(f"Split into {chunk_count} chunks for source {self.id}")
|
||||
|
||||
if chunk_count == 0:
|
||||
logger.warning("No chunks created after splitting")
|
||||
return
|
||||
command_id_str = str(command_id)
|
||||
logger.info(
|
||||
f"Vectorization job submitted for source {self.id}: "
|
||||
f"command_id={command_id_str}"
|
||||
)
|
||||
|
||||
# Process chunks concurrently using async gather
|
||||
logger.info("Starting concurrent processing of chunks")
|
||||
|
||||
async def process_chunk(
|
||||
idx: int, chunk: str
|
||||
) -> Tuple[int, List[float], str]:
|
||||
logger.debug(f"Processing chunk {idx}/{chunk_count}")
|
||||
try:
|
||||
if EMBEDDING_MODEL is None:
|
||||
raise ValueError("EMBEDDING_MODEL is not configured")
|
||||
embedding = (await EMBEDDING_MODEL.aembed([chunk]))[0]
|
||||
cleaned_content = chunk
|
||||
logger.debug(f"Successfully processed chunk {idx}")
|
||||
return (idx, embedding, cleaned_content)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing chunk {idx}: {str(e)}")
|
||||
raise
|
||||
|
||||
# Create tasks for all chunks and process them concurrently
|
||||
tasks = [process_chunk(idx, chunk) for idx, chunk in enumerate(chunks)]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
logger.info(f"Parallel processing complete. Got {len(results)} results")
|
||||
|
||||
# Insert results in order (they're already ordered by index)
|
||||
for idx, embedding, content in results:
|
||||
logger.debug(f"Inserting chunk {idx} into database")
|
||||
await repo_query(
|
||||
"""
|
||||
CREATE source_embedding CONTENT {
|
||||
"source": $source_id,
|
||||
"order": $order,
|
||||
"content": $content,
|
||||
"embedding": $embedding,
|
||||
};""",
|
||||
{
|
||||
"source_id": ensure_record_id(self.id),
|
||||
"order": idx,
|
||||
"content": content,
|
||||
"embedding": embedding,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(f"Vectorization complete for source {self.id}")
|
||||
return command_id_str
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error vectorizing source {self.id}: {str(e)}")
|
||||
logger.error(f"Failed to submit vectorization job for source {self.id}: {e}")
|
||||
logger.exception(e)
|
||||
raise DatabaseOperationError(e)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue