refactor: extract _collect_results and unify internal helpers

1. Extract _collect_results method — Deduplicated ~30 lines of identical result processing from route_event and process_client_event (Exception→error / WsResult→as_result / dict→wrap / None→strategy branch) into a private method with a skip_none parameter.
    * route_event calls _collect_results(skip_none=False) — None becomes ok=True (server-initiated, callers expect a result for every handler)
    * process_client_event calls _collect_results(skip_none=True) — None is skipped (client-initiated, matching legacy _dispatch fire-and-forget semantics)
2. Document None semantics difference — Added # NOTE: comment at the route_event call site explaining why skip_none=False differs from process_client_event.
3. Unify _timestamp() usage — Replaced inline timestamp formatting in _wrap_envelope and handle_connect with self._timestamp() method reuse.#
This commit is contained in:
keyboardstaff 2026-03-27 23:00:09 -07:00
parent b351de456e
commit 6e8c9d8224

View file

@ -582,6 +582,51 @@ class WsManager:
]
)
results = self._collect_results(
executions, event_type, correlation_id, skip_none=True,
)
await self._publish_diagnostic_event(
lambda: {
"kind": "inbound",
"sourceNamespace": namespace,
"namespace": namespace,
"eventType": event_type,
"sid": sid,
"correlationId": correlation_id,
"timestamp": self._timestamp(),
"handlerCount": len(handlers),
"durationMs": sum(
(exec.duration_ms or 0.0) for exec in executions
),
"resultSummary": self._summarize_results(results),
"payloadSummary": self._summarize_payload(handler_payload),
}
)
response = {"correlationId": correlation_id, "results": results}
self._debug(
f"Completed client event namespace={namespace} '{event_type}' "
f"sid={sid} correlation={correlation_id}"
)
return response
def _collect_results(
self,
executions: list[_HandlerExecution],
event_type: str,
correlation_id: str,
*,
skip_none: bool = False,
) -> List[dict[str, Any]]:
"""Build a result list from handler executions.
Args:
skip_none: When ``True``, handlers that return ``None`` are omitted
from the results (fire-and-forget / opt-out semantics used by
``process_client_event``). When ``False``, ``None`` is converted
to ``WsResult(ok=True)`` (default ``route_event`` behaviour).
"""
results: List[dict[str, Any]] = []
for execution in executions:
handler = execution.handler
@ -615,12 +660,11 @@ class WsManager:
)
continue
# Skip handlers that return None — they opted out of contributing
# a result (fire-and-forget semantics, matching legacy _dispatch).
if value is None:
continue
if isinstance(value, dict):
if skip_none:
continue
helper_result = WsResult(ok=True)
elif isinstance(value, dict):
helper_result = WsResult(ok=True, data=value)
else:
helper_result = WsResult(ok=True, data={"result": value})
@ -632,31 +676,7 @@ class WsManager:
duration_ms=duration_ms,
)
)
await self._publish_diagnostic_event(
lambda: {
"kind": "inbound",
"sourceNamespace": namespace,
"namespace": namespace,
"eventType": event_type,
"sid": sid,
"correlationId": correlation_id,
"timestamp": self._timestamp(),
"handlerCount": len(handlers),
"durationMs": sum(
(exec.duration_ms or 0.0) for exec in executions
),
"resultSummary": self._summarize_results(results),
"payloadSummary": self._summarize_payload(handler_payload),
}
)
response = {"correlationId": correlation_id, "results": results}
self._debug(
f"Completed client event namespace={namespace} '{event_type}' "
f"sid={sid} correlation={correlation_id}"
)
return response
return results
async def _invoke_handler(
self,
@ -707,9 +727,7 @@ class WsManager:
sid,
"server_restart",
{
"emittedAt": _utcnow()
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z"),
"emittedAt": self._timestamp(),
"runtimeId": runtime.get_runtime_id(),
},
handler_id=self._identifier,
@ -942,52 +960,13 @@ class WsManager:
]
)
results: List[dict[str, Any]] = []
for execution in executions:
handler = execution.handler
value = execution.value
duration_ms = execution.duration_ms
if isinstance(value, Exception): # pragma: no cover - defensive logging
PrintStyle.error(
f"Error in handler {handler.identifier} for '{event_type}' (correlation {correlation_id}): {value}"
)
results.append(
self._build_error_result(
handler_id=handler.identifier,
code="HANDLER_ERROR",
message="Internal server error",
details=str(value),
correlation_id=correlation_id,
duration_ms=duration_ms,
)
)
continue
if isinstance(value, WsResult):
results.append(
value.as_result(
handler_id=handler.identifier,
fallback_correlation_id=correlation_id,
duration_ms=duration_ms,
)
)
continue
if value is None:
helper_result = WsResult(ok=True)
elif isinstance(value, dict):
helper_result = WsResult(ok=True, data=value)
else:
helper_result = WsResult(ok=True, data={"result": value})
results.append(
helper_result.as_result(
handler_id=handler.identifier,
fallback_correlation_id=correlation_id,
duration_ms=duration_ms,
)
)
# NOTE: skip_none=False here — route_event converts None to ok=True,
# unlike process_client_event which skips None (fire-and-forget).
# This is intentional: route_event is server-initiated and callers
# expect a result entry for every handler.
results = self._collect_results(
executions, event_type, correlation_id, skip_none=False,
)
await self._publish_diagnostic_event(
lambda: {
@ -1221,7 +1200,7 @@ class WsManager:
correlation_id: str | None = None,
) -> dict[str, Any]:
hid = handler_id or self._identifier
ts = _utcnow().isoformat(timespec="milliseconds").replace("+00:00", "Z")
ts = self._timestamp()
event_id = str(uuid.uuid4())
correlation = correlation_id or str(uuid.uuid4())
return {