feat: implement local folder indexing and document versioning capabilities

This commit is contained in:
Anish Sarkar 2026-04-02 11:11:57 +05:30
parent fe6c2e9b52
commit 96a58d0d30
4 changed files with 1127 additions and 1 deletions

View file

@ -926,6 +926,52 @@ async def _index_obsidian_vault(
)
@celery_app.task(name="index_local_folder", bind=True)
def index_local_folder_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str = None,
end_date: str = None,
target_file_path: str = None,
):
"""Celery task to index a local folder."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_local_folder(
connector_id, search_space_id, user_id, start_date, end_date, target_file_path
)
)
finally:
loop.close()
async def _index_local_folder(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str = None,
end_date: str = None,
target_file_path: str = None,
):
"""Index local folder with new session."""
from app.routes.search_source_connectors_routes import (
run_local_folder_indexing,
)
async with get_celery_session_maker()() as session:
await run_local_folder_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date,
target_file_path=target_file_path,
)
@celery_app.task(name="index_composio_connector", bind=True)
def index_composio_connector_task(
self,

View file

@ -42,9 +42,10 @@ from .jira_indexer import index_jira_issues
# Issue tracking and project management
from .linear_indexer import index_linear_issues
from .luma_indexer import index_luma_events
# Documentation and knowledge management
from .local_folder_indexer import index_local_folder
from .luma_indexer import index_luma_events
from .notion_indexer import index_notion_pages
from .obsidian_indexer import index_obsidian_vault
from .slack_indexer import index_slack_messages
@ -74,4 +75,5 @@ __all__ = [ # noqa: RUF022
# Communication platforms
"index_slack_messages",
"index_google_gmail_messages",
"index_local_folder",
]

View file

@ -0,0 +1,971 @@
"""
Local folder connector indexer.
Indexes files from a local folder on disk. Supports:
- Full-scan mode (startup reconciliation / manual trigger)
- Single-file mode (chokidar real-time trigger)
- Filesystem folder structure mirroring into DB Folder rows
- Document versioning via create_version_snapshot
- ETL-based file parsing for binary formats (PDF, DOCX, images, audio, etc.)
Electron-only: all change detection is driven by chokidar in the desktop app.
"""
import os
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from pathlib import Path
from sqlalchemy import delete, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.db import (
Document,
DocumentStatus,
DocumentType,
Folder,
SearchSourceConnectorType,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.document_versioning import create_version_snapshot
from .base import (
build_document_metadata_string,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
PLAINTEXT_EXTENSIONS = frozenset({
".md", ".markdown", ".txt", ".text", ".csv", ".tsv",
".json", ".jsonl", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf",
".xml", ".html", ".htm", ".css", ".scss", ".less", ".sass",
".py", ".pyw", ".pyi", ".pyx",
".js", ".jsx", ".ts", ".tsx", ".mjs", ".cjs",
".java", ".kt", ".kts", ".scala", ".groovy",
".c", ".h", ".cpp", ".cxx", ".cc", ".hpp", ".hxx",
".cs", ".fs", ".fsx",
".go", ".rs", ".rb", ".php", ".pl", ".pm", ".lua",
".swift", ".m", ".mm",
".r", ".R", ".jl",
".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1",
".sql", ".graphql", ".gql",
".env", ".gitignore", ".dockerignore", ".editorconfig",
".makefile", ".cmake",
".log", ".rst", ".tex", ".bib", ".org", ".adoc", ".asciidoc",
".vue", ".svelte", ".astro",
".tf", ".hcl", ".proto",
})
AUDIO_EXTENSIONS = frozenset({
".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm",
})
def _is_plaintext_file(filename: str) -> bool:
return Path(filename).suffix.lower() in PLAINTEXT_EXTENSIONS
def _is_audio_file(filename: str) -> bool:
return Path(filename).suffix.lower() in AUDIO_EXTENSIONS
def _needs_etl(filename: str) -> bool:
"""File is not plaintext and not audio — requires ETL service to parse."""
return not _is_plaintext_file(filename) and not _is_audio_file(filename)
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
HEARTBEAT_INTERVAL_SECONDS = 30
DEFAULT_EXCLUDE_PATTERNS = [
".git",
"node_modules",
"__pycache__",
".DS_Store",
".obsidian",
".trash",
]
def scan_folder(
folder_path: str,
file_extensions: list[str] | None = None,
exclude_patterns: list[str] | None = None,
) -> list[dict]:
"""Walk a directory and return a list of file entries.
Args:
folder_path: Absolute path to the folder to scan.
file_extensions: If provided, only include files with these extensions
(e.g. [".md", ".txt"]). ``None`` means include all files.
exclude_patterns: Directory/file names to exclude. Any path component
matching one of these strings is skipped.
Returns:
List of dicts with keys: path, relative_path, name, modified_at, size.
"""
root = Path(folder_path)
if not root.exists():
raise ValueError(f"Folder path does not exist: {folder_path}")
if exclude_patterns is None:
exclude_patterns = []
files: list[dict] = []
for dirpath, dirnames, filenames in os.walk(root):
rel_dir = Path(dirpath).relative_to(root)
# Prune excluded directories in-place so os.walk skips them
dirnames[:] = [
d for d in dirnames if d not in exclude_patterns
]
# Check if the current directory itself is excluded
if any(part in exclude_patterns for part in rel_dir.parts):
continue
for fname in filenames:
if fname in exclude_patterns:
continue
full = Path(dirpath) / fname
if file_extensions is not None:
if full.suffix.lower() not in file_extensions:
continue
try:
stat = full.stat()
rel_path = full.relative_to(root)
files.append(
{
"path": str(full),
"relative_path": str(rel_path),
"name": full.name,
"modified_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC),
"size": stat.st_size,
}
)
except OSError as e:
logger.warning(f"Could not stat file {full}: {e}")
return files
def _read_plaintext_file(file_path: str) -> str:
"""Read a plaintext/text-based file as UTF-8."""
with open(file_path, encoding="utf-8", errors="replace") as f:
content = f.read()
if "\x00" in content:
raise ValueError(
f"File contains null bytes — likely a binary file opened as text: {file_path}"
)
return content
async def _read_file_content(file_path: str, filename: str) -> str:
"""Read file content, using ETL for binary formats.
Plaintext files are read directly. Audio and document files (PDF, DOCX, etc.)
are routed through the configured ETL service (same as Google Drive / OneDrive).
Raises ValueError if the file cannot be parsed (e.g. no ETL service configured
for a binary file).
"""
if _is_plaintext_file(filename):
return _read_plaintext_file(file_path)
if _is_audio_file(filename):
etl_service = config.ETL_SERVICE if hasattr(config, "ETL_SERVICE") else None
stt_service_val = config.STT_SERVICE if hasattr(config, "STT_SERVICE") else None
if not stt_service_val and not etl_service:
raise ValueError(
f"No STT_SERVICE configured — cannot transcribe audio file: {filename}"
)
if _needs_etl(filename):
etl_service = getattr(config, "ETL_SERVICE", None)
if not etl_service:
raise ValueError(
f"No ETL_SERVICE configured — cannot parse binary file: {filename}. "
f"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env"
)
from app.connectors.onedrive.content_extractor import (
_parse_file_to_markdown,
)
return await _parse_file_to_markdown(file_path, filename)
async def _compute_file_content_hash(
file_path: str, filename: str, search_space_id: int,
) -> tuple[str, str]:
"""Read a file (via ETL if needed) and compute its content hash.
Returns (content_text, content_hash).
"""
content = await _read_file_content(file_path, filename)
content_hash = generate_content_hash(content, search_space_id)
return content, content_hash
async def _mirror_folder_structure(
session: AsyncSession,
folder_path: str,
folder_name: str,
search_space_id: int,
user_id: str,
connector_config: dict,
connector,
exclude_patterns: list[str] | None = None,
) -> dict[str, int]:
"""Mirror the local filesystem directory structure into DB Folder rows.
Returns a mapping of relative_dir_path -> folder_id.
The empty string key ("") maps to the root folder.
"""
root = Path(folder_path)
if exclude_patterns is None:
exclude_patterns = []
# Collect all subdirectory paths relative to root
subdirs: list[str] = []
for dirpath, dirnames, _ in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in exclude_patterns]
rel = Path(dirpath).relative_to(root)
if any(part in exclude_patterns for part in rel.parts):
continue
rel_str = str(rel) if str(rel) != "." else ""
if rel_str:
subdirs.append(rel_str)
# Sort by depth so parents are created before children
subdirs.sort(key=lambda p: p.count(os.sep))
mapping: dict[str, int] = {}
# Get or create root folder
root_folder_id = connector_config.get("root_folder_id")
if root_folder_id:
existing = (
await session.execute(
select(Folder).where(Folder.id == root_folder_id)
)
).scalar_one_or_none()
if existing:
mapping[""] = existing.id
else:
root_folder_id = None
if not root_folder_id:
root_folder = Folder(
name=folder_name,
search_space_id=search_space_id,
created_by_id=user_id,
position="a0",
)
session.add(root_folder)
await session.flush()
mapping[""] = root_folder.id
# Persist root_folder_id into connector config
connector_config["root_folder_id"] = root_folder.id
connector.config = {**connector.config, "root_folder_id": root_folder.id}
flag_modified(connector, "config")
# Create/reuse subdirectory Folder rows
for rel_dir in subdirs:
dir_parts = Path(rel_dir).parts
dir_name = dir_parts[-1]
parent_rel = str(Path(*dir_parts[:-1])) if len(dir_parts) > 1 else ""
parent_id = mapping.get(parent_rel, mapping[""])
existing_folder = (
await session.execute(
select(Folder).where(
Folder.name == dir_name,
Folder.parent_id == parent_id,
Folder.search_space_id == search_space_id,
)
)
).scalar_one_or_none()
if existing_folder:
mapping[rel_dir] = existing_folder.id
else:
new_folder = Folder(
name=dir_name,
parent_id=parent_id,
search_space_id=search_space_id,
created_by_id=user_id,
position="a0",
)
session.add(new_folder)
await session.flush()
mapping[rel_dir] = new_folder.id
await session.flush()
return mapping
async def _cleanup_empty_folders(
session: AsyncSession,
root_folder_id: int,
search_space_id: int,
existing_dirs_on_disk: set[str],
folder_mapping: dict[str, int],
) -> None:
"""Delete Folder rows that are empty (no docs, no children) and no longer on disk.
Queries ALL folders under this search space (not just the current mapping)
so that stale folders from previous syncs are also cleaned up.
"""
# Build a reverse mapping from folder_id → rel_dir for known dirs
id_to_rel: dict[int, str] = {fid: rel for rel, fid in folder_mapping.items() if rel}
# Also find any folders in the DB that are children of the root but NOT
# in the current mapping (stale from a previous sync).
all_folders = (
await session.execute(
select(Folder).where(
Folder.search_space_id == search_space_id,
Folder.id != root_folder_id,
)
)
).scalars().all()
# Build candidates: folders not on disk that we might delete
candidates: list[Folder] = []
for folder in all_folders:
rel = id_to_rel.get(folder.id)
if rel and rel in existing_dirs_on_disk:
continue
candidates.append(folder)
# Sort deepest first (by name depth heuristic — folders with no children first)
# Repeat until no more deletions happen (cascading empty parents)
changed = True
while changed:
changed = False
remaining: list[Folder] = []
for folder in candidates:
doc_exists = (
await session.execute(
select(Document.id).where(Document.folder_id == folder.id).limit(1)
)
).scalar_one_or_none()
if doc_exists is not None:
remaining.append(folder)
continue
child_exists = (
await session.execute(
select(Folder.id).where(Folder.parent_id == folder.id).limit(1)
)
).scalar_one_or_none()
if child_exists is not None:
remaining.append(folder)
continue
await session.execute(delete(Folder).where(Folder.id == folder.id))
changed = True
candidates = remaining
async def index_local_folder(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
target_file_path: str | None = None,
) -> tuple[int, int, str | None]:
"""Index files from a local folder.
Supports two modes:
- Full scan (target_file_path=None): walks entire folder, handles new/changed/deleted files.
- Single-file (target_file_path set): processes only that file.
Returns (indexed_count, skipped_count, error_or_warning_message).
"""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="local_folder_indexing",
source="connector_indexing_task",
message=f"Starting local folder indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"target_file_path": target_file_path,
},
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR
)
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector {connector_id} not found",
"Connector not found",
{},
)
return 0, 0, f"Connector {connector_id} not found"
folder_path = connector.config.get("folder_path")
if not folder_path or not os.path.exists(folder_path):
await task_logger.log_task_failure(
log_entry,
f"Folder path missing or does not exist: {folder_path}",
"Folder not found",
{},
)
return 0, 0, f"Folder path missing or does not exist: {folder_path}"
folder_name = connector.config.get("folder_name") or os.path.basename(folder_path)
exclude_patterns = connector.config.get("exclude_patterns", DEFAULT_EXCLUDE_PATTERNS)
file_extensions = connector.config.get("file_extensions") # None = all
# ====================================================================
# SINGLE-FILE MODE
# ====================================================================
if target_file_path:
return await _index_single_file(
session=session,
connector=connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
target_file_path=target_file_path,
task_logger=task_logger,
log_entry=log_entry,
update_last_indexed=update_last_indexed,
)
# ====================================================================
# FULL-SCAN MODE
# ====================================================================
# Phase 0: Mirror folder structure
await task_logger.log_task_progress(
log_entry, "Mirroring folder structure", {"stage": "folder_mirror"}
)
folder_mapping = await _mirror_folder_structure(
session=session,
folder_path=folder_path,
folder_name=folder_name,
search_space_id=search_space_id,
user_id=user_id,
connector_config=connector.config,
connector=connector,
exclude_patterns=exclude_patterns,
)
await session.flush()
# Scan files on disk
try:
files = scan_folder(folder_path, file_extensions, exclude_patterns)
except Exception as e:
await task_logger.log_task_failure(
log_entry, f"Failed to scan folder: {e}", "Scan error", {}
)
return 0, 0, f"Failed to scan folder: {e}"
logger.info(f"Found {len(files)} files in folder")
indexed_count = 0
skipped_count = 0
failed_count = 0
duplicate_count = 0
last_heartbeat_time = time.time()
# ================================================================
# PHASE 1: Analyze all files, create pending documents
# ================================================================
files_to_process: list[dict] = []
new_documents_created = False
seen_unique_hashes: set[str] = set()
for file_info in files:
try:
relative_path = file_info["relative_path"]
file_path_abs = file_info["path"]
unique_identifier = f"{folder_name}:{relative_path}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.LOCAL_FOLDER_FILE,
unique_identifier,
search_space_id,
)
seen_unique_hashes.add(unique_identifier_hash)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Check mtime first (cheap)
stored_mtime = (existing_document.document_metadata or {}).get("mtime")
current_mtime = file_info["modified_at"].timestamp()
if stored_mtime and abs(current_mtime - stored_mtime) < 1.0:
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
skipped_count += 1
continue
# mtime differs — read file and check content hash
try:
content, content_hash = await _compute_file_content_hash(
file_path_abs, file_info["relative_path"], search_space_id
)
except Exception as read_err:
logger.warning(f"Could not read {file_path_abs}: {read_err}")
skipped_count += 1
continue
if existing_document.content_hash == content_hash:
# Content same, just update mtime in metadata
meta = dict(existing_document.document_metadata or {})
meta["mtime"] = current_mtime
existing_document.document_metadata = meta
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
skipped_count += 1
continue
# Content actually changed — snapshot version, queue for re-index
await create_version_snapshot(session, existing_document)
files_to_process.append(
{
"document": existing_document,
"is_new": False,
"file_info": file_info,
"content": content,
"content_hash": content_hash,
"unique_identifier_hash": unique_identifier_hash,
"relative_path": relative_path,
"title": file_info["name"],
}
)
continue
# New document — read content
try:
content, content_hash = await _compute_file_content_hash(
file_path_abs, file_info["relative_path"], search_space_id
)
except Exception as read_err:
logger.warning(f"Could not read {file_path_abs}: {read_err}")
skipped_count += 1
continue
if not content.strip():
skipped_count += 1
continue
# Check for duplicate content from another connector
with session.no_autoflush:
dup = await check_duplicate_document_by_hash(session, content_hash)
if dup:
duplicate_count += 1
skipped_count += 1
continue
# Determine folder_id for this file
parent_dir = str(Path(relative_path).parent)
if parent_dir == ".":
parent_dir = ""
folder_id = folder_mapping.get(parent_dir, folder_mapping.get(""))
document = Document(
search_space_id=search_space_id,
title=file_info["name"],
document_type=DocumentType.LOCAL_FOLDER_FILE,
document_metadata={
"folder_name": folder_name,
"file_path": relative_path,
"connector_id": connector_id,
"mtime": file_info["modified_at"].timestamp(),
},
content="Pending...",
content_hash=unique_identifier_hash, # Temp unique — updated in phase 2
unique_identifier_hash=unique_identifier_hash,
embedding=None,
status=DocumentStatus.pending(),
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
folder_id=folder_id,
)
session.add(document)
new_documents_created = True
files_to_process.append(
{
"document": document,
"is_new": True,
"file_info": file_info,
"content": content,
"content_hash": content_hash,
"unique_identifier_hash": unique_identifier_hash,
"relative_path": relative_path,
"title": file_info["name"],
}
)
except Exception as e:
logger.exception(f"Phase 1 error for {file_info.get('path')}: {e}")
failed_count += 1
if new_documents_created:
await session.commit()
# ================================================================
# PHASE 1.5: Delete documents no longer on disk
# ================================================================
all_connector_docs = (
await session.execute(
select(Document).where(
Document.connector_id == connector_id,
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
)
)
).scalars().all()
for doc in all_connector_docs:
if doc.unique_identifier_hash not in seen_unique_hashes:
await session.delete(doc)
await session.flush()
# ================================================================
# PHASE 2: Process each document
# ================================================================
long_context_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
for item in files_to_process:
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(indexed_count)
last_heartbeat_time = current_time
document = item["document"]
try:
document.status = DocumentStatus.processing()
await session.commit()
title = item["title"]
relative_path = item["relative_path"]
content = item["content"]
content_hash = item["content_hash"]
file_info = item["file_info"]
metadata_sections = [
(
"METADATA",
[
f"Title: {title}",
f"Folder: {folder_name}",
f"Path: {relative_path}",
],
),
("CONTENT", [content]),
]
document_string = build_document_metadata_string(metadata_sections)
summary_content = ""
if long_context_llm and connector.enable_summary:
doc_meta = {
"folder_name": folder_name,
"file_path": relative_path,
}
summary_content, _ = await generate_document_summary(
document_string, long_context_llm, doc_meta
)
embedding = embed_text(document_string)
chunks = await create_document_chunks(document_string)
# Determine folder_id
parent_dir = str(Path(relative_path).parent)
if parent_dir == ".":
parent_dir = ""
folder_id = folder_mapping.get(parent_dir, folder_mapping.get(""))
document.title = title
document.content = document_string
document.content_hash = content_hash
document.source_markdown = content
document.embedding = embedding
document.document_metadata = {
"folder_name": folder_name,
"file_path": relative_path,
"connector_id": connector_id,
"summary": summary_content,
"mtime": file_info["modified_at"].timestamp(),
}
document.folder_id = folder_id
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
indexed_count += 1
if indexed_count % 10 == 0:
await session.commit()
except Exception as e:
logger.exception(f"Phase 2 error for {item.get('relative_path')}: {e}")
try:
await session.rollback()
except Exception:
pass
try:
document.status = DocumentStatus.failed(str(e)[:500])
document.updated_at = get_current_timestamp()
await session.commit()
except Exception:
try:
await session.rollback()
except Exception:
pass
failed_count += 1
# Cleanup empty folders
existing_dirs = set()
for dirpath, dirnames, _ in os.walk(folder_path):
dirnames[:] = [d for d in dirnames if d not in exclude_patterns]
rel = str(Path(dirpath).relative_to(folder_path))
if rel == ".":
rel = ""
if rel and not any(part in exclude_patterns for part in Path(rel).parts):
existing_dirs.add(rel)
root_fid = folder_mapping.get("")
if root_fid:
await _cleanup_empty_folders(
session, root_fid, search_space_id, existing_dirs, folder_mapping
)
await update_connector_last_indexed(session, connector, update_last_indexed)
try:
await session.commit()
except Exception as e:
if "duplicate key value violates unique constraint" in str(e).lower():
logger.warning(f"Duplicate key during commit: {e}")
await session.rollback()
else:
raise
warning_parts = []
if duplicate_count > 0:
warning_parts.append(f"{duplicate_count} duplicate")
if failed_count > 0:
warning_parts.append(f"{failed_count} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
await task_logger.log_task_success(
log_entry,
f"Completed local folder indexing for connector {connector_id}",
{
"indexed": indexed_count,
"skipped": skipped_count,
"failed": failed_count,
"duplicates": duplicate_count,
},
)
return indexed_count, skipped_count, warning_message
except SQLAlchemyError as e:
logger.exception(f"Database error during local folder indexing: {e}")
await session.rollback()
await task_logger.log_task_failure(
log_entry, f"DB error: {e}", "Database error", {}
)
return 0, 0, f"Database error: {e}"
except Exception as e:
logger.exception(f"Error during local folder indexing: {e}")
await task_logger.log_task_failure(
log_entry, f"Error: {e}", "Unexpected error", {}
)
return 0, 0, str(e)
async def _index_single_file(
session: AsyncSession,
connector,
connector_id: int,
search_space_id: int,
user_id: str,
folder_path: str,
folder_name: str,
target_file_path: str,
task_logger,
log_entry,
update_last_indexed: bool = True,
) -> tuple[int, int, str | None]:
"""Process a single file (chokidar real-time trigger)."""
try:
full_path = Path(target_file_path)
if not full_path.exists():
# File was deleted — find and remove the document
rel = str(full_path.relative_to(folder_path))
unique_id = f"{folder_name}:{rel}"
uid_hash = generate_unique_identifier_hash(
DocumentType.LOCAL_FOLDER_FILE, unique_id, search_space_id
)
existing = await check_document_by_unique_identifier(session, uid_hash)
if existing:
await session.delete(existing)
await session.commit()
return 0, 0, None
return 0, 0, None
rel_path = str(full_path.relative_to(folder_path))
unique_id = f"{folder_name}:{rel_path}"
uid_hash = generate_unique_identifier_hash(
DocumentType.LOCAL_FOLDER_FILE, unique_id, search_space_id
)
try:
content, content_hash = await _compute_file_content_hash(
str(full_path), full_path.name, search_space_id
)
except Exception as e:
return 0, 1, f"Could not read file: {e}"
if not content.strip():
return 0, 1, None
existing = await check_document_by_unique_identifier(session, uid_hash)
if existing:
if existing.content_hash == content_hash:
# Update mtime
mtime = full_path.stat().st_mtime
meta = dict(existing.document_metadata or {})
meta["mtime"] = mtime
existing.document_metadata = meta
await session.commit()
return 0, 1, None
# Content changed — snapshot + re-index
await create_version_snapshot(session, existing)
# Get LLM
long_context_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
title = full_path.name
mtime = full_path.stat().st_mtime
metadata_sections = [
("METADATA", [f"Title: {title}", f"Folder: {folder_name}", f"Path: {rel_path}"]),
("CONTENT", [content]),
]
document_string = build_document_metadata_string(metadata_sections)
summary_content = ""
if long_context_llm and connector.enable_summary:
summary_content, _ = await generate_document_summary(
document_string, long_context_llm, {"folder_name": folder_name, "file_path": rel_path}
)
embedding = embed_text(document_string)
chunks = await create_document_chunks(document_string)
doc_metadata = {
"folder_name": folder_name,
"file_path": rel_path,
"connector_id": connector_id,
"summary": summary_content,
"mtime": mtime,
}
if existing:
existing.title = title
existing.content = document_string
existing.content_hash = content_hash
existing.source_markdown = content
existing.embedding = embedding
existing.document_metadata = doc_metadata
await safe_set_chunks(session, existing, chunks)
existing.updated_at = get_current_timestamp()
existing.status = DocumentStatus.ready()
else:
document = Document(
search_space_id=search_space_id,
title=title,
document_type=DocumentType.LOCAL_FOLDER_FILE,
document_metadata=doc_metadata,
content=document_string,
content_hash=content_hash,
unique_identifier_hash=uid_hash,
source_markdown=content,
embedding=embedding,
status=DocumentStatus.ready(),
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
# Set chunks
await session.flush()
for chunk in chunks:
chunk.document_id = document.id
session.add_all(chunks)
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
await task_logger.log_task_success(
log_entry,
f"Single file indexed: {rel_path}",
{"file": rel_path},
)
return 1, 0, None
except Exception as e:
logger.exception(f"Error indexing single file {target_file_path}: {e}")
await session.rollback()
return 0, 0, str(e)

View file

@ -0,0 +1,107 @@
"""Document versioning: snapshot creation and cleanup.
Rules:
- 30-minute debounce window: if the latest version was created < 30 min ago,
overwrite it instead of creating a new row.
- Maximum 20 versions per document.
- Versions older than 90 days are cleaned up.
"""
from datetime import UTC, datetime, timedelta
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentVersion
MAX_VERSIONS_PER_DOCUMENT = 20
DEBOUNCE_MINUTES = 30
RETENTION_DAYS = 90
def _now() -> datetime:
return datetime.now(UTC)
async def create_version_snapshot(
session: AsyncSession,
document: Document,
) -> DocumentVersion | None:
"""Snapshot the document's current state into a DocumentVersion row.
Returns the created/updated DocumentVersion, or None if nothing was done.
"""
now = _now()
latest = (
await session.execute(
select(DocumentVersion)
.where(DocumentVersion.document_id == document.id)
.order_by(DocumentVersion.version_number.desc())
.limit(1)
)
).scalar_one_or_none()
if latest is not None:
age = now - latest.created_at.replace(tzinfo=UTC)
if age < timedelta(minutes=DEBOUNCE_MINUTES):
latest.source_markdown = document.source_markdown
latest.content_hash = document.content_hash
latest.title = document.title
latest.created_at = now
await session.flush()
return latest
max_num = (
await session.execute(
select(func.coalesce(func.max(DocumentVersion.version_number), 0)).where(
DocumentVersion.document_id == document.id
)
)
).scalar_one()
version = DocumentVersion(
document_id=document.id,
version_number=max_num + 1,
source_markdown=document.source_markdown,
content_hash=document.content_hash,
title=document.title,
created_at=now,
)
session.add(version)
await session.flush()
# Cleanup: remove versions older than 90 days
cutoff = now - timedelta(days=RETENTION_DAYS)
await session.execute(
delete(DocumentVersion).where(
DocumentVersion.document_id == document.id,
DocumentVersion.created_at < cutoff,
)
)
# Cleanup: cap at MAX_VERSIONS_PER_DOCUMENT
count = (
await session.execute(
select(func.count()).select_from(DocumentVersion).where(
DocumentVersion.document_id == document.id
)
)
).scalar_one()
if count > MAX_VERSIONS_PER_DOCUMENT:
excess = count - MAX_VERSIONS_PER_DOCUMENT
oldest_ids_result = await session.execute(
select(DocumentVersion.id)
.where(DocumentVersion.document_id == document.id)
.order_by(DocumentVersion.version_number.asc())
.limit(excess)
)
oldest_ids = [row[0] for row in oldest_ids_result.all()]
if oldest_ids:
await session.execute(
delete(DocumentVersion).where(DocumentVersion.id.in_(oldest_ids))
)
await session.flush()
return version