fix(document_query): isolate LiteParse parsing

Run LiteParse in a subprocess so native parser crashes cannot take down the Web UI process. Bound parser concurrency and LiteParse workers for multi-chat stability, seed Q&A context with leading document chunks for title/abstract grounding, and keep a small-document fallback when vector search returns no chunks.
This commit is contained in:
Alessandro 2026-05-29 15:51:59 +02:00
parent d039af512a
commit b2ead06a4e
7 changed files with 307 additions and 22 deletions

View file

@ -8,7 +8,7 @@ timeouts and thread-safe parsers.
- **Strategy-pattern parsers** - MIME-type routing to dedicated parser classes
- **Centralized fetching** - local and HTTP(S) resources are fetched once, size-checked, then passed to parsers
- **LiteParse first path** - fast local parsing for PDFs and supported document/image formats, with legacy fallbacks
- **Thread-safe execution** - all sync parsers offloaded to asyncio.to_thread
- **Bounded parser execution** - sync parsers are offloaded to asyncio.to_thread and globally capped across chats
- **Configurable timeouts** - per-document and gather-level timeouts
- **Expanded format support** - PDF, HTML, text, YAML, XML, TOML, JS, TS, images, and catch-all Unstructured
@ -23,10 +23,14 @@ See default_config.yaml for all options. Key settings:
| max_remote_bytes | 52428800 | Max remote document size |
| per_document_timeout | 60 | Max time for a single document parse |
| gather_timeout | 120 | Max time for all documents combined |
| parser_concurrency | 1 | Max parser jobs running across all chats in one process |
| context_intro_chunks | 2 | Leading chunks included per document for title/abstract grounding |
| chunk_size | 1000 | Text splitter chunk size |
| chunk_overlap | 100 | Text splitter overlap |
| search_threshold | 0.5 | Similarity search threshold |
| liteparse_enabled | true | Prefer LiteParse before legacy parser fallbacks |
| liteparse_num_workers | 1 | Max LiteParse OCR workers per parser job |
| liteparse_subprocess | true | Run LiteParse in a child process so native crashes fall back safely |
| thread_offload | true | Offload sync parsers to thread pool |
LiteParse is installed into the Agent Zero framework runtime from hooks.py during

View file

@ -9,6 +9,8 @@ per_document_timeout: 60 # max time for a single document parse
gather_timeout: 120 # max time for all documents combined in one call
# --- Parser settings ---
parser_concurrency: 1 # max parser jobs running across all chats in this process
context_intro_chunks: 2 # always include leading chunks per document for title/abstract grounding
chunk_size: 1000
chunk_overlap: 100
search_threshold: 0.5
@ -26,6 +28,7 @@ liteparse_target_pages:
liteparse_dpi: 150
liteparse_preserve_very_small_text: false
liteparse_output_format: text
liteparse_num_workers:
liteparse_num_workers: 1 # LiteParse defaults to CPU cores - 1; cap it for web runtime stability
liteparse_subprocess: true # isolate LiteParse native runtime crashes from the Web UI process
pdf_ocr_fallback: true # enable legacy Tesseract fallback after PyMuPDF
thread_offload: true # offload sync parsers to thread pool

View file

@ -7,8 +7,9 @@ a thread pool and bounded by configurable timeouts.
import asyncio
import json
import threading
from datetime import datetime
from typing import Callable, List, Optional, Sequence, Tuple
from typing import Any, Callable, List, Optional, Sequence, Tuple
from urllib.parse import urlparse
from langchain.schema import SystemMessage, HumanMessage
@ -25,6 +26,33 @@ from plugins._document_query.helpers.parsers import BaseParser, get_parsers_for_
DEFAULT_SEARCH_THRESHOLD = 0.5
DEFAULT_PARSER_CONCURRENCY = 1
SMALL_DOCUMENT_FALLBACK_MAX_CHARS = 12000
_PARSER_SEMAPHORES: dict[tuple[int, int], asyncio.Semaphore] = {}
_PARSER_SEMAPHORES_LOCK = threading.Lock()
def _positive_int(value: Any, default: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return parsed if parsed > 0 else default
def _parser_semaphore(config: dict) -> asyncio.Semaphore:
concurrency = _positive_int(
config.get("parser_concurrency"),
DEFAULT_PARSER_CONCURRENCY,
)
loop = asyncio.get_running_loop()
key = (id(loop), concurrency)
with _PARSER_SEMAPHORES_LOCK:
semaphore = _PARSER_SEMAPHORES.get(key)
if semaphore is None:
semaphore = asyncio.Semaphore(concurrency)
_PARSER_SEMAPHORES[key] = semaphore
return semaphore
def _load_config(agent: Agent) -> dict:
@ -207,7 +235,7 @@ class DocumentQueryHelper:
gather_timeout = self.config.get("gather_timeout", 120)
try:
await asyncio.wait_for(
document_contents = await asyncio.wait_for(
asyncio.gather(
*[self.document_get_content(uri, True) for uri in document_uris]
),
@ -220,6 +248,14 @@ class DocumentQueryHelper:
search_threshold = self.config.get("search_threshold", DEFAULT_SEARCH_THRESHOLD)
search_limit = self.config.get("search_limit", 100)
selected_chunks = {}
normalized_uris = [self.store.normalize_uri(uri) for uri in document_uris]
intro_chunk_count = _positive_int(
self.config.get("context_intro_chunks"),
2,
)
for uri in normalized_uris:
for chunk in await self._get_document_intro_chunks(uri, intro_chunk_count):
selected_chunks[chunk.metadata["id"]] = chunk
for question in questions:
self.progress_callback(f"Optimizing query: {question}")
@ -233,7 +269,6 @@ class DocumentQueryHelper:
await self.agent.handle_intervention()
self.progress_callback(f"Searching documents with query: {optimized_query}")
normalized_uris = [self.store.normalize_uri(uri) for uri in document_uris]
doc_filter = " or ".join(
[f"document_uri == '{uri}'" for uri in normalized_uris]
)
@ -246,18 +281,48 @@ class DocumentQueryHelper:
selected_chunks[chunk.metadata["id"]] = chunk
if not selected_chunks:
fallback_content = self._small_document_fallback_content(
document_uris,
document_contents,
)
if fallback_content:
self.progress_callback(
"No matching chunks found; using extracted document content"
)
ai_response = await self._answer_questions_from_content(
fallback_content,
questions,
"extracted document content",
)
self.progress_callback(f"Q&A process completed")
return True, ai_response
self.progress_callback("No relevant content found in the documents")
content = f"!!! No content found for documents: {json.dumps(document_uris)} matching queries: {json.dumps(questions)}"
return False, content
self.progress_callback(
f"Processing {len(questions)} questions in context of {len(selected_chunks)} chunks"
)
await self.agent.handle_intervention()
questions_str = "\n".join([f" * {question}" for question in questions])
content = "\n\n----\n\n".join(
[chunk.page_content for chunk in selected_chunks.values()]
)
ai_response = await self._answer_questions_from_content(
content,
questions,
f"{len(selected_chunks)} chunks",
)
self.progress_callback(f"Q&A process completed")
return True, ai_response
async def _answer_questions_from_content(
self,
content: str,
questions: Sequence[str],
context_label: str,
) -> str:
self.progress_callback(
f"Processing {len(questions)} questions in context of {context_label}"
)
await self.agent.handle_intervention()
questions_str = "\n".join([f" * {question}" for question in questions])
qa_system_message = self.agent.parse_prompt("fw.document_query.system_prompt.md")
qa_user_message = f"# Document:\n{content}\n\n# Queries:\n{questions_str}"
ai_response, _reasoning = await self.agent.call_chat_model(
@ -267,8 +332,41 @@ class DocumentQueryHelper:
],
explicit_caching=False,
)
self.progress_callback(f"Q&A process completed")
return True, str(ai_response)
return str(ai_response)
@staticmethod
def _small_document_fallback_content(
document_uris: Sequence[str],
document_contents: Sequence[str],
max_chars: int = SMALL_DOCUMENT_FALLBACK_MAX_CHARS,
) -> str:
blocks = []
for document_uri, document_content in zip(document_uris, document_contents):
text = (document_content or "").strip()
if text:
blocks.append(f"# Source: {document_uri}\n\n{text}")
if not blocks:
return ""
content = "\n\n----\n\n".join(blocks)
if len(content) > max_chars:
return ""
return content
async def _get_document_intro_chunks(
self,
document_uri: str,
limit: int,
) -> list[Document]:
if limit <= 0:
return []
if not hasattr(self.store, "_get_document_chunks"):
return []
chunks = await self.store._get_document_chunks(document_uri)
return sorted(chunks, key=lambda chunk: chunk.metadata.get("chunk_index", 0))[
:limit
]
async def document_get_content(
self, document_uri: str, add_to_db: bool = False
@ -328,15 +426,17 @@ class DocumentQueryHelper:
thread_offload: bool,
) -> str:
errors_seen = []
semaphore = _parser_semaphore(self.config)
for parser in parsers:
try:
self.progress_callback("Parsing document content")
content = await parser.parse(
document=document,
config=self.config,
timeout=timeout,
thread_offload=thread_offload,
)
async with semaphore:
self.progress_callback("Parsing document content")
content = await parser.parse(
document=document,
config=self.config,
timeout=timeout,
thread_offload=thread_offload,
)
if content:
return content
errors_seen.append(f"{parser.__class__.__name__}: no content")

View file

@ -2,7 +2,10 @@
from __future__ import annotations
import json
import os
import subprocess
import sys
from pathlib import Path
from plugins._document_query.helpers.fetch import FetchedDocument
@ -12,6 +15,8 @@ from .base import BaseParser
class LiteParseParser(BaseParser):
"""Fast parser powered by run-llama/liteparse when available."""
DEFAULT_NUM_WORKERS = 1
mimetypes = [
"application/pdf",
"application/msword",
@ -30,6 +35,11 @@ class LiteParseParser(BaseParser):
return bool(config.get("liteparse_enabled", True))
def _parse_sync(self, document: FetchedDocument, config: dict) -> str:
if config.get("liteparse_subprocess", True):
return self._parse_subprocess(document, config)
return self._parse_in_process(document, config)
def _parse_in_process(self, document: FetchedDocument, config: dict) -> str:
try:
from liteparse import LiteParse
except Exception as e:
@ -44,6 +54,56 @@ class LiteParseParser(BaseParser):
raise ValueError("LiteParse returned no text")
return text
def _parse_subprocess(self, document: FetchedDocument, config: dict) -> str:
with document.local_file() as file_path:
payload = {
"file_path": file_path,
"kwargs": self._liteparse_kwargs(config),
}
env = os.environ.copy()
project_root = str(Path(__file__).resolve().parents[4])
python_path = env.get("PYTHONPATH", "")
env["PYTHONPATH"] = (
f"{project_root}{os.pathsep}{python_path}"
if python_path
else project_root
)
timeout = float(config.get("per_document_timeout", 60))
result = subprocess.run(
[
sys.executable,
"-m",
"plugins._document_query.helpers.parsers.liteparse_worker",
],
input=json.dumps(payload),
text=True,
capture_output=True,
cwd=project_root,
env=env,
timeout=timeout,
check=False,
)
if result.returncode != 0:
detail = (result.stderr or result.stdout or "").strip()
raise RuntimeError(
"LiteParse subprocess failed"
f" with exit code {result.returncode}: {detail[-2000:]}"
)
try:
response = json.loads(result.stdout)
except json.JSONDecodeError as e:
raise RuntimeError(
"LiteParse subprocess returned invalid output: "
f"{result.stdout[-2000:]}"
) from e
text = response.get("text", "") or ""
if not text.strip():
raise ValueError("LiteParse returned no text")
return text
def _liteparse_kwargs(self, config: dict) -> dict:
kwargs = {
"ocr_enabled": bool(config.get("liteparse_ocr_enabled", True)),
@ -67,9 +127,10 @@ class LiteParseParser(BaseParser):
if value not in (None, ""):
kwargs[liteparse_key] = value
num_workers = config.get("liteparse_num_workers")
if num_workers not in (None, ""):
kwargs["num_workers"] = int(num_workers)
kwargs["num_workers"] = _positive_int(
config.get("liteparse_num_workers"),
self.DEFAULT_NUM_WORKERS,
)
tessdata_path = config.get("liteparse_tessdata_path") or _detect_tessdata_path()
if tessdata_path:
@ -91,3 +152,11 @@ def _detect_tessdata_path() -> str:
if candidate and (Path(candidate) / "eng.traineddata").is_file():
return candidate
return ""
def _positive_int(value, default: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return parsed if parsed > 0 else default

View file

@ -0,0 +1,24 @@
"""Subprocess entry point for isolating LiteParse native runtime crashes."""
from __future__ import annotations
import json
import sys
def main() -> int:
payload = json.load(sys.stdin)
file_path = payload["file_path"]
kwargs = payload.get("kwargs") or {}
from liteparse import LiteParse
parser = LiteParse(**kwargs)
result = parser.parse(file_path)
text = getattr(result, "text", "") or ""
json.dump({"text": text}, sys.stdout)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -44,6 +44,7 @@ def test_document_qa_uses_small_document_content_when_search_finds_no_chunks():
helper = object.__new__(DocumentQueryHelper)
helper.agent = agent
helper.store = FakeStore()
helper.config = {}
helper.progress_callback = progress.append
async def document_get_content(uri, add_to_db=False):

View file

@ -9,6 +9,7 @@ from plugins._document_query.helpers.fetch import FetchedDocument, fetch_public_
from plugins._document_query.helpers.document_query import DocumentQueryHelper
from plugins._document_query.helpers.parsers.base import BaseParser
from plugins._document_query.helpers.parsers import get_parsers_for_mimetype
from plugins._document_query.helpers.parsers.liteparse import LiteParseParser
from plugins._document_query.helpers.parsers.text import TextParser
@ -27,6 +28,24 @@ class ParserNameShouldNotLeak(BaseParser):
return "parsed"
class CountingAsyncParser(BaseParser):
mimetypes = ["text/plain"]
active = 0
max_active = 0
async def _parse_async(self, document: FetchedDocument, config: dict) -> str:
type(self).active += 1
type(self).max_active = max(type(self).max_active, type(self).active)
try:
await asyncio.sleep(0.02)
return document.uri
finally:
type(self).active -= 1
def _parse_sync(self, document: FetchedDocument, config: dict) -> str:
return document.uri
def test_fetch_file_detects_mimetype_and_reads_once(tmp_path):
document = tmp_path / "notes.txt"
document.write_text("hello\nworld\n", encoding="utf-8")
@ -92,6 +111,25 @@ def test_liteparse_is_installed_by_docker_and_plugin_hook_requirements():
assert plugin_requirements.strip().splitlines() == ["liteparse>=2.0.0,<3.0.0"]
def test_default_config_bounds_liteparse_runtime_concurrency():
default_config = (
ROOT / "plugins" / "_document_query" / "default_config.yaml"
).read_text(encoding="utf-8")
assert "parser_concurrency: 1" in default_config
assert "context_intro_chunks: 2" in default_config
assert "liteparse_num_workers: 1" in default_config
assert "liteparse_subprocess: true" in default_config
def test_liteparse_parser_caps_workers_by_default():
parser = LiteParseParser()
assert parser._liteparse_kwargs({})["num_workers"] == 1
assert parser._liteparse_kwargs({"liteparse_num_workers": "3"})["num_workers"] == 3
assert parser._liteparse_kwargs({"liteparse_num_workers": ""})["num_workers"] == 1
def test_query_optimize_prompt_filename_is_spelled_correctly():
prompt_dir = ROOT / "plugins" / "_document_query" / "prompts"
helper_source = (
@ -129,6 +167,52 @@ def test_parser_progress_is_user_facing_and_generic():
assert progress == ["Parsing document content"]
def test_parse_document_limits_parser_concurrency_across_helpers():
CountingAsyncParser.active = 0
CountingAsyncParser.max_active = 0
fetched_a = FetchedDocument(
uri="/tmp/a.txt",
source_uri="/tmp/a.txt",
scheme="file",
mimetype="text/plain",
content=b"a",
local_path=None,
)
fetched_b = FetchedDocument(
uri="/tmp/b.txt",
source_uri="/tmp/b.txt",
scheme="file",
mimetype="text/plain",
content=b"b",
local_path=None,
)
helper_a = object.__new__(DocumentQueryHelper)
helper_a.config = {"parser_concurrency": 1}
helper_a.progress_callback = lambda _msg: None
helper_b = object.__new__(DocumentQueryHelper)
helper_b.config = {"parser_concurrency": 1}
helper_b.progress_callback = lambda _msg: None
async def parse_both():
return await asyncio.gather(
helper_a._parse_document(
document=fetched_a,
parsers=[CountingAsyncParser()],
timeout=1,
thread_offload=False,
),
helper_b._parse_document(
document=fetched_b,
parsers=[CountingAsyncParser()],
timeout=1,
thread_offload=False,
),
)
assert sorted(run_async(parse_both())) == ["/tmp/a.txt", "/tmp/b.txt"]
assert CountingAsyncParser.max_active == 1
def test_document_query_prompt_uses_progressive_skill_disclosure():
from helpers.skills import find_skill