refactor(backend): simplify DeerFlowClient streaming helpers (#1969)

Post-review cleanup for the token-level streaming fix. No behavior
change for correct inputs; one efficiency regression fixed.

Fix: chat() O(n²) accumulator
-----------------------------
`chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`,
which is O(n) per concat → O(n²) total over a streamed response. At
~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks
it costs roughly 100-300 ms of pure copying. Switched to
`dict[str, list[str]]` + `"".join()` once at return.

Cleanup
-------
- Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`,
  and `_tool_message_event` static helpers. The messages-mode and
  values-mode branches previously repeated four inline dict literals each;
  they now call the same builders.
- `StreamEvent.type` is now typed as `Literal["values", "messages-tuple",
  "custom", "end"]` via a `StreamEventType` alias. Makes the closed set
  explicit and catches typos at type-check time.
- Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`,
  `.tool_calls`, `.id` all have default values on the base class, so the
  `getattr(..., None)` fallbacks were dead code. Removed from the hot
  path.
- `_account_usage` parameter type loosened to `Any` so that LangChain's
  `UsageMetadata` TypedDict is accepted under strict type checking.
- Trimmed narrating comments on `seen_ids` / `streamed_ids` / the
  values-synthesis skip block; kept the non-obvious ones that document
  the cross-mode dedup invariant.

Net diff: -15 lines. All 132 unit tests + harness boundary test still
pass; ruff check and ruff format pass.
This commit is contained in:
greatmengqi 2026-04-08 10:53:24 +08:00
parent f3486bb37d
commit 1f11ba10a9

View file

@ -25,7 +25,7 @@ import uuid
from collections.abc import Generator, Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from typing import Any, Literal
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware
@ -55,6 +55,9 @@ from deerflow.uploads.manager import (
logger = logging.getLogger(__name__)
StreamEventType = Literal["values", "messages-tuple", "custom", "end"]
@dataclass
class StreamEvent:
"""A single event from the streaming agent response.
@ -69,7 +72,7 @@ class StreamEvent:
data: Event payload. Contents vary by type.
"""
type: str
type: StreamEventType
data: dict[str, Any] = field(default_factory=dict)
@ -254,13 +257,53 @@ class DeerFlowClient:
return get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled)
@staticmethod
def _serialize_tool_calls(tool_calls) -> list[dict]:
"""Reshape LangChain tool_calls into the wire format used in events."""
return [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in tool_calls]
@staticmethod
def _ai_text_event(msg_id: str | None, text: str, usage: dict | None) -> "StreamEvent":
"""Build a ``messages-tuple`` AI text event, attaching usage when present."""
data: dict[str, Any] = {"type": "ai", "content": text, "id": msg_id}
if usage:
data["usage_metadata"] = usage
return StreamEvent(type="messages-tuple", data=data)
@staticmethod
def _ai_tool_calls_event(msg_id: str | None, tool_calls) -> "StreamEvent":
"""Build a ``messages-tuple`` AI tool-calls event."""
return StreamEvent(
type="messages-tuple",
data={
"type": "ai",
"content": "",
"id": msg_id,
"tool_calls": DeerFlowClient._serialize_tool_calls(tool_calls),
},
)
@staticmethod
def _tool_message_event(msg) -> "StreamEvent":
"""Build a ``messages-tuple`` tool-result event from a ToolMessage."""
return StreamEvent(
type="messages-tuple",
data={
"type": "tool",
"content": DeerFlowClient._extract_text(msg.content),
"name": getattr(msg, "name", None),
"tool_call_id": getattr(msg, "tool_call_id", None),
"id": getattr(msg, "id", None),
},
)
@staticmethod
def _serialize_message(msg) -> dict:
"""Serialize a LangChain message to a plain dict for values events."""
if isinstance(msg, AIMessage):
d: dict[str, Any] = {"type": "ai", "content": msg.content, "id": getattr(msg, "id", None)}
if msg.tool_calls:
d["tool_calls"] = [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in msg.tool_calls]
d["tool_calls"] = DeerFlowClient._serialize_tool_calls(msg.tool_calls)
if getattr(msg, "usage_metadata", None):
d["usage_metadata"] = msg.usage_metadata
return d
@ -409,28 +452,24 @@ class DeerFlowClient:
if self._agent_name:
context["agent_name"] = self._agent_name
# ids already emitted as a complete message via the ``values``
# snapshot path — used by the values path itself to avoid
# duplicate per-message synthesis when the same message appears
# in consecutive snapshots.
seen_ids: set[str] = set()
# ids whose text / tool_calls have already been streamed via the
# LangGraph ``messages`` mode. The ``values`` path uses this set
# to skip re-emitting synthesized messages-tuple events for the
# same message.
# Cross-mode handoff: ids already streamed via LangGraph ``messages``
# mode so the ``values`` path skips re-synthesis of the same message.
streamed_ids: set[str] = set()
# ids whose ``usage_metadata`` has already been counted into
# ``cumulative_usage``. The same message id shows up both in
# ``messages`` chunks (last chunk carries usage) and in ``values``
# snapshots (final AIMessage carries the same usage) — count once.
# The same message id carries identical cumulative ``usage_metadata``
# in both the final ``messages`` chunk and the values snapshot —
# count it only on whichever arrives first.
counted_usage_ids: set[str] = set()
cumulative_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
def _account_usage(msg_id: str | None, usage: dict | None) -> dict | None:
def _account_usage(msg_id: str | None, usage: Any) -> dict | None:
"""Add *usage* to cumulative totals if this id has not been counted.
Returns the normalized usage dict (for attaching to an event)
when we accepted it, otherwise ``None``.
``usage`` is a ``langchain_core.messages.UsageMetadata`` TypedDict
or ``None``; typed as ``Any`` because TypedDicts are not
structurally assignable to plain ``dict`` under strict type
checking. Returns the normalized usage dict (for attaching
to an event) when we accepted it, otherwise ``None``.
"""
if not usage:
return None
@ -467,11 +506,7 @@ class DeerFlowClient:
continue
if mode == "messages":
# LangGraph emits ``(message_chunk, metadata_dict)`` for
# each LLM delta and each tool message. ``message_chunk``
# is typically an ``AIMessageChunk`` (subclass of
# ``AIMessage``) during LLM streaming; for tool nodes it
# is a ``ToolMessage``.
# LangGraph ``messages`` mode emits ``(message_chunk, metadata)``.
if isinstance(chunk, tuple) and len(chunk) == 2:
msg_chunk, _metadata = chunk
else:
@ -481,44 +516,22 @@ class DeerFlowClient:
if isinstance(msg_chunk, AIMessage):
text = self._extract_text(msg_chunk.content)
usage = getattr(msg_chunk, "usage_metadata", None)
counted_usage = _account_usage(msg_id, usage)
counted_usage = _account_usage(msg_id, msg_chunk.usage_metadata)
if text:
if msg_id:
streamed_ids.add(msg_id)
event_data: dict[str, Any] = {"type": "ai", "content": text, "id": msg_id}
if counted_usage:
event_data["usage_metadata"] = counted_usage
yield StreamEvent(type="messages-tuple", data=event_data)
yield self._ai_text_event(msg_id, text, counted_usage)
tool_calls = getattr(msg_chunk, "tool_calls", None)
if tool_calls:
if msg_chunk.tool_calls:
if msg_id:
streamed_ids.add(msg_id)
yield StreamEvent(
type="messages-tuple",
data={
"type": "ai",
"content": "",
"id": msg_id,
"tool_calls": [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in tool_calls],
},
)
yield self._ai_tool_calls_event(msg_id, msg_chunk.tool_calls)
elif isinstance(msg_chunk, ToolMessage):
if msg_id:
streamed_ids.add(msg_id)
yield StreamEvent(
type="messages-tuple",
data={
"type": "tool",
"content": self._extract_text(msg_chunk.content),
"name": getattr(msg_chunk, "name", None),
"tool_call_id": getattr(msg_chunk, "tool_call_id", None),
"id": msg_id,
},
)
yield self._tool_message_event(msg_chunk)
continue
# mode == "values"
@ -531,50 +544,25 @@ class DeerFlowClient:
if msg_id:
seen_ids.add(msg_id)
# Already streamed through ``messages`` mode — capture
# usage once more (defensive: the final AIMessage in the
# snapshot may carry usage_metadata that the streamed
# chunks did not) but skip synthesizing messages-tuple
# events, which would duplicate what the consumer already
# received.
# Already streamed via ``messages`` mode; only (defensively)
# capture usage here and skip re-synthesizing the event.
if msg_id and msg_id in streamed_ids:
if isinstance(msg, AIMessage):
_account_usage(msg_id, getattr(msg, "usage_metadata", None))
continue
if isinstance(msg, AIMessage):
usage = getattr(msg, "usage_metadata", None)
counted_usage = _account_usage(msg_id, usage)
counted_usage = _account_usage(msg_id, msg.usage_metadata)
if msg.tool_calls:
yield StreamEvent(
type="messages-tuple",
data={
"type": "ai",
"content": "",
"id": msg_id,
"tool_calls": [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in msg.tool_calls],
},
)
yield self._ai_tool_calls_event(msg_id, msg.tool_calls)
text = self._extract_text(msg.content)
if text:
event_data = {"type": "ai", "content": text, "id": msg_id}
if counted_usage:
event_data["usage_metadata"] = counted_usage
yield StreamEvent(type="messages-tuple", data=event_data)
yield self._ai_text_event(msg_id, text, counted_usage)
elif isinstance(msg, ToolMessage):
yield StreamEvent(
type="messages-tuple",
data={
"type": "tool",
"content": self._extract_text(msg.content),
"name": getattr(msg, "name", None),
"tool_call_id": getattr(msg, "tool_call_id", None),
"id": msg_id,
},
)
yield self._tool_message_event(msg)
# Emit a values event for each state snapshot
yield StreamEvent(
@ -607,21 +595,18 @@ class DeerFlowClient:
The accumulated text of the last AI message, or empty string
if no AI text was produced.
"""
# Accumulator keyed by message id. Token-level streaming yields
# multiple ``messages-tuple`` events sharing the same id, each
# carrying a delta that must be concatenated. Non-streaming mock
# sources that emit a single event per id are a degenerate case
# of the same logic.
buffers: dict[str, str] = {}
# Per-id delta lists joined once at the end — avoids the O(n²) cost
# of repeated ``str + str`` on a growing buffer for long responses.
chunks: dict[str, list[str]] = {}
last_id: str = ""
for event in self.stream(message, thread_id=thread_id, **kwargs):
if event.type == "messages-tuple" and event.data.get("type") == "ai":
msg_id = event.data.get("id") or ""
delta = event.data.get("content", "")
if delta:
buffers[msg_id] = buffers.get(msg_id, "") + delta
chunks.setdefault(msg_id, []).append(delta)
last_id = msg_id
return buffers.get(last_id, "")
return "".join(chunks.get(last_id, ()))
# ------------------------------------------------------------------
# Public API — configuration queries