mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-04-28 03:30:40 +00:00
feat(cli): add dual-output sidecar mode for TUI (#3352)
Some checks are pending
Qwen Code CI / Lint (push) Waiting to run
Qwen Code CI / Test (push) Blocked by required conditions
Qwen Code CI / Test-1 (push) Blocked by required conditions
Qwen Code CI / Test-2 (push) Blocked by required conditions
Qwen Code CI / Test-3 (push) Blocked by required conditions
Qwen Code CI / Test-4 (push) Blocked by required conditions
Qwen Code CI / Test-5 (push) Blocked by required conditions
Qwen Code CI / Test-6 (push) Blocked by required conditions
Qwen Code CI / Test-7 (push) Blocked by required conditions
Qwen Code CI / Test-8 (push) Blocked by required conditions
Qwen Code CI / Post Coverage Comment (push) Blocked by required conditions
Qwen Code CI / CodeQL (push) Waiting to run
E2E Tests / E2E Test (Linux) - sandbox:docker (push) Waiting to run
E2E Tests / E2E Test (Linux) - sandbox:none (push) Waiting to run
E2E Tests / E2E Test - macOS (push) Waiting to run
Some checks are pending
Qwen Code CI / Lint (push) Waiting to run
Qwen Code CI / Test (push) Blocked by required conditions
Qwen Code CI / Test-1 (push) Blocked by required conditions
Qwen Code CI / Test-2 (push) Blocked by required conditions
Qwen Code CI / Test-3 (push) Blocked by required conditions
Qwen Code CI / Test-4 (push) Blocked by required conditions
Qwen Code CI / Test-5 (push) Blocked by required conditions
Qwen Code CI / Test-6 (push) Blocked by required conditions
Qwen Code CI / Test-7 (push) Blocked by required conditions
Qwen Code CI / Test-8 (push) Blocked by required conditions
Qwen Code CI / Post Coverage Comment (push) Blocked by required conditions
Qwen Code CI / CodeQL (push) Waiting to run
E2E Tests / E2E Test (Linux) - sandbox:docker (push) Waiting to run
E2E Tests / E2E Test (Linux) - sandbox:none (push) Waiting to run
E2E Tests / E2E Test - macOS (push) Waiting to run
* feat(cli): add dual-output sidecar mode for TUI
Adds an optional **dual-output** mode for the interactive TUI: while Qwen
Code keeps rendering normally on stdout, it concurrently emits a structured
JSON event stream on a second channel (--json-fd / --json-file) and
optionally watches a JSONL command file (--input-file) for prompts and
tool-permission responses written by an external program.
This unlocks programmatic embedding of the TUI from IDE extensions, web
frontends, CI agents, or automation scripts without forcing them to give
up the rich interactive UI in favor of --output-format=stream-json.
## Design
The TUI already has a battle-tested JSON event emitter
(`StreamJsonOutputAdapter`). This change makes that adapter pluggable on
its output stream and wires a small `DualOutputBridge` that forwards TUI
events to a second instance of the adapter writing to fd / file.
For tool approvals, when a tool enters awaiting_approval the bridge emits
`control_request` (subtype `can_use_tool`); whichever side resolves first
(TUI's native UI or `confirmation_response` via --input-file) wins, and a
`control_response` is mirrored back so all observers stay in sync.
`session_start` is announced once when the bridge is constructed so
consumers can correlate the channel with a session before any other event
arrives.
## CLI surface
- `--json-fd <n>` — write JSON events to fd n (n >= 3; provided via spawn
stdio).
- `--json-file <path>` — write JSON events to a file / FIFO / /dev/fd/N.
- `--input-file <path>` — watch this file for JSONL commands.
`--json-fd` and `--json-file` are mutually exclusive. fds 0/1/2 are
rejected to prevent corrupting the TUI.
## Wire protocol
Output: existing stream-json schema with `includePartialMessages` always
enabled, plus:
- `system` / `subtype: session_start` — emitted once on bridge
construction.
- `control_request` / `subtype: can_use_tool` — pending tool approval.
- `control_response` — final approval outcome (mirrors TUI-native or
external resolution).
Input (--input-file):
{"type":"submit","text":"What does this function do?"}
{"type":"confirmation_response","request_id":"...","allowed":true}
`submit` is queued and retried when the TUI returns to idle.
`confirmation_response` is dispatched immediately — a pending tool call
is blocking and the response cannot wait behind earlier submits.
See `docs/users/features/dual-output.md` for the full schema, latency
notes, failure modes, and a spawn example.
## What changes when the flags are absent
Nothing. The bridge and watcher are constructed only when the relevant
flags are set; otherwise the React Context providers carry `null` and
every callsite short-circuits. No overhead, no behavioral change for
existing users.
## Failure handling
- Bad fd / unopenable path → warning on stderr, dual output stays
disabled, TUI launches normally.
- Consumer disconnect (EPIPE) → bridge silently disables itself, TUI
keeps running.
- Any exception inside the adapter → caught, logged, bridge disabled.
The TUI is never crashed by a dual-output failure.
## Files
New:
- packages/cli/src/dualOutput/{DualOutputBridge,DualOutputContext,index}.{ts,tsx}
- packages/cli/src/remoteInput/{RemoteInputWatcher,RemoteInputContext,index}.{ts,tsx}
- packages/cli/src/nonInteractive/io/index.ts
- docs/users/features/dual-output.md
Modified:
- packages/core/src/config/config.ts — 3 new ConfigParameters fields + getters
- packages/cli/src/config/config.ts — yargs options + mutex validation
- packages/cli/src/gemini.tsx — instantiate bridge / watcher in
startInteractiveUI, wrap with Context Providers, register cleanup
- packages/cli/src/ui/AppContainer.tsx — connect RemoteInput to
submitQuery, bridge tool confirmations
- packages/cli/src/ui/hooks/useGeminiStream.ts — call
dualOutput?.processEvent(...) at five existing event points
- packages/cli/src/nonInteractive/io/{Base,Stream}JsonOutputAdapter.ts —
StreamJsonOutputAdapter accepts an injected output stream; base adapter
exposes emitPermissionRequest / emitControlResponse through a new
emitControlMessageImpl hook (default no-op in batch mode).
## Tests
- packages/cli/src/dualOutput/DualOutputBridge.test.ts — fd validation,
auto session_start, control-event routing, post-shutdown safety.
- packages/cli/src/remoteInput/RemoteInputWatcher.test.ts — submit
forwarding, immediate confirmation dispatch, busy/idle retry,
malformed-line tolerance, shutdown.
- packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.dualOutput.test.ts —
custom outputStream injection and new emitPermissionRequest /
emitControlResponse paths.
tsc --noEmit -p packages/cli/tsconfig.json is clean.
vitest run src/nonInteractive src/dualOutput src/remoteInput → 297 passed,
1 skipped, 11 files.
* feat(cli): dual-output capability handshake, session_end, control_error, settings.json
Incremental improvements on top of the initial dual-output PR based on
reviewer feedback. All extensions are additive; older consumers that
ignore unknown fields keep working.
## Capability handshake in session_start
`session_start.data` now carries three new fields so consumers can
feature-detect without sniffing the stream:
- `protocol_version` (integer, currently 1) — bumped on any protocol
change consumers might care about.
- `version` (string) — the Qwen Code CLI version, threaded in from
`gemini.tsx`.
- `supported_events` (string[]) — the event kinds this bridge version
is known to emit, exported as `SUPPORTED_EVENTS` from the module.
## session_end on bridge shutdown
DualOutputBridge.shutdown() now emits a final
`system` / `session_end` event carrying `session_id` before closing the
stream. Gives consumers a definitive termination signal rather than
requiring them to infer it from EPIPE. Idempotent — calling shutdown
twice emits exactly one session_end.
## control_error emission path
`ControlErrorResponse` (already defined in types.ts) now has a first-
class emission path: `BaseJsonOutputAdapter.emitControlError(requestId,
message)` → `control_response` with `subtype: 'error'`. Wired into
AppContainer's remote-input confirmation handler so that a
`confirmation_response` referencing an unknown / already-resolved
request_id produces a structured error reply instead of silently
dropping, letting consumers retry or surface the error.
## settings.json support
New `dualOutput` top-level settings block with `jsonFile` and
`inputFile` properties. `--json-fd` has no settings equivalent (fd
passing is a spawn-time concern). CLI flag wins over settings when
both are present, so scripted one-off runs still work unchanged.
`requiresRestart: true` since the bridge is constructed once at
startup.
## Documentation
`docs/users/features/dual-output.md` gains three major sections:
- **Use cases** — concrete integration scenarios (terminal+chat dual
sync, IDE extensions, web frontends, CI observers, multi-agent
orchestration, session replay, observability, QA).
- **Why two output flags?** — detailed rationale for coexisting
`--json-fd` and `--json-file`, including the PTY constraint
(`node-pty` / `bun-pty` expose no stdio array, and `forkpty(3)` /
`login_tty` actively close fds >= 3 before exec).
- **Comparison with Claude Code's stream-json** — schema-parity
matrix, transport-topology differences, permission-control-plane
behavioral notes, and a "room to improve" section as a design
horizon.
- **Runnable demos** — seven copy-paste POCs: event observer, remote
submit, permission bridge, Node embedder with capability
feature-detection, session_end handling, failure drills.
- **Settings-based configuration** — example settings.json snippet and
precedence rules.
## Tests
- DualOutputBridge.test.ts: new cases for capability handshake shape,
session_end on shutdown, shutdown idempotency, and emitControlError.
- StreamJsonOutputAdapter.dualOutput.test.ts: new case for
emitControlError at the adapter level.
302 passed, 1 skipped, 11 files. tsc --noEmit -p packages/cli is clean.
* docs(dual-output): shrink Claude Code comparison to one honest sentence
After actually reading the Claude Code source (src/cli/structuredIO.ts,
src/bridge/*, src/utils/messages/systemInit.ts), the previous
"Comparison with Claude Code's stream-json" section was overstated:
- Claude Code has no equivalent of TUI + sidecar running simultaneously.
Its stream-json only works with --print (non-interactive); the bridge
in src/bridge/* is Anthropic's own remote worker protocol, not a
local embedding surface.
- CC uses `system/init` (not `session_start`) and has no session_end in
the wire protocol, so the schema-parity table contained false ticks.
- Framing this PR as "parity with Claude Code" is therefore inaccurate;
it's filling a gap Claude Code does not address.
Replace the whole multi-section comparison (schema matrix, transport
table, permission notes, borrow list, roadmap) with a single sentence
stating the accurate relation: same event format in spirit, different
topology — CC's is non-interactive only.
* fix(cli): address review feedback on dual-output sidecar mode
- Fix control_response mirror: external-initiated confirmations now
emit control_response via the same mirror useEffect as TUI-native
resolutions, making the emission path symmetric for all observers.
- Fix ENOENT: --json-file with a non-existent path now falls back to
createWriteStream (auto-creates the file) instead of throwing.
- Fix race: add reading guard to RemoteInputWatcher.readNewLines()
preventing duplicate command processing on rapid appends.
- Refactor confirmationHandler to use refs (pendingToolCallsRef,
dualOutputRef) and register once (deps: [remoteInput]) to eliminate
teardown/re-registration churn.
- Add debug logging to shutdown bare catch for ops correlation.
- Add ENOENT fallback test case for DualOutputBridge.
- Regenerate settings.schema.json for dualOutput section.
Generated with AI
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
* fix(cli): make RemoteInputWatcher poll interval configurable for CI reliability
RemoteInputWatcher.test.ts was timing out in CI (5s default) because
fs.watchFile's 500ms poll interval is unreliable under load. Fix:
- Accept optional `pollIntervalMs` in constructor (default 500ms).
- Tests use 100ms poll interval for faster feedback.
- Increase per-test timeout to 15s and waitFor timeout to 10s.
- Increase "TUI busy" wait from 800ms to 1500ms for CI headroom.
Generated with AI
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
* fix(cli): eliminate fs.watchFile timing dependency in RemoteInputWatcher tests
Tests were flaky across all CI platforms (macOS/ubuntu/windows) because
fs.watchFile polling (even at 100ms) is unreliable under CI load.
Fix: expose checkForNewInput() as a public method that directly triggers
file reading and returns a Promise. Tests now call it synchronously after
writing to the input file — no polling, no timeouts, deterministic.
Also fixes:
- Windows ENOTEMPTY: add delay in afterEach before rmSync
- Add active check in readNewLines to respect shutdown state
- readNewLines now returns Promise<void> for awaitable reads
Generated with AI
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
---------
Co-authored-by: 秦奇 <gary.gq@alibaba-inc.com>
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
parent
cbf409840d
commit
9e26424aa7
21 changed files with 2173 additions and 27 deletions
|
|
@ -7,6 +7,7 @@ export default {
|
|||
skills: 'Skills',
|
||||
memory: 'Memory',
|
||||
headless: 'Headless Mode',
|
||||
'dual-output': 'Dual Output',
|
||||
checkpointing: {
|
||||
display: 'hidden',
|
||||
},
|
||||
|
|
|
|||
593
docs/users/features/dual-output.md
Normal file
593
docs/users/features/dual-output.md
Normal file
|
|
@ -0,0 +1,593 @@
|
|||
# Dual Output
|
||||
|
||||
Dual Output is a sidecar mode for the interactive TUI: while Qwen Code keeps
|
||||
rendering normally on `stdout`, it concurrently emits a structured JSON event
|
||||
stream to a separate channel so an external program — an IDE extension, a web
|
||||
frontend, a CI pipeline, an automation script — can observe and steer the
|
||||
session.
|
||||
|
||||
It also provides a reverse channel: an external program can write JSONL
|
||||
commands into a file that the TUI watches, allowing it to submit prompts and
|
||||
respond to tool-permission requests as if a human were at the keyboard.
|
||||
|
||||
Dual Output is fully optional. When the flags below are absent the TUI behaves
|
||||
exactly as before with no extra I/O and no behavioral changes.
|
||||
|
||||
## Use cases
|
||||
|
||||
Dual Output is a low-level plumbing primitive. These are concrete integrations
|
||||
it unlocks:
|
||||
|
||||
### Terminal + Chat dual-mode real-time sync
|
||||
|
||||
The flagship use case. A web or desktop ChatUI hosts the TUI inside a PTY
|
||||
and renders a parallel conversation view driven by the structured event
|
||||
stream:
|
||||
|
||||
- User can type in either surface — the TUI (for terminal-native power-users)
|
||||
or the web UI (for richer UX, shareable links, mobile). Both views stay
|
||||
in sync because every message flows through the same JSON events.
|
||||
- Tool-approval prompts appear in both places; whoever approves first wins.
|
||||
- Session history is captured verbatim from `--json-file`, so the server
|
||||
side has a canonical machine-readable transcript without parsing ANSI.
|
||||
|
||||
### IDE extensions (VS Code / JetBrains / Cursor / Neovim)
|
||||
|
||||
Embed Qwen Code inside the IDE. The TUI runs in the editor's integrated
|
||||
terminal panel for users who want it, while the extension consumes
|
||||
`--json-fd` / `--json-file` events to drive:
|
||||
|
||||
- Inline diff overlays when the agent touches files.
|
||||
- A webview side panel with formatted markdown, syntax-highlighted tool
|
||||
calls, and clickable citations.
|
||||
- Status bar indicators (thinking / responding / awaiting approval).
|
||||
- Programmatic `confirmation_response` writes when the user clicks a
|
||||
native IDE approval button.
|
||||
|
||||
### Browser-based Chat frontends
|
||||
|
||||
A Node/Bun server spawns the TUI in a PTY for its rendering semantics but
|
||||
exposes a WebSocket channel to the browser. Events on `--json-file` are
|
||||
forwarded to the client; user messages typed in the browser are injected
|
||||
via `--input-file`. No ANSI parsing on either side.
|
||||
|
||||
### CI / automation observers
|
||||
|
||||
A CI job runs Qwen Code with a task prompt. The human sees the TUI in the
|
||||
job log; the CI system tails `--json-file` to:
|
||||
|
||||
- Fail the job if a `result` event reports an error.
|
||||
- Push `token usage` / `duration_ms` / `tool_use` counts to metrics.
|
||||
- Archive the full transcript as a build artifact.
|
||||
|
||||
### Multi-agent orchestration
|
||||
|
||||
A supervisor agent spawns multiple TUI workers, each with its own pair of
|
||||
event/input files. It watches progress, injects follow-up prompts, and
|
||||
enforces global budget / safety policies by approving or denying tool
|
||||
calls across all workers.
|
||||
|
||||
### Session recording, audit, and replay
|
||||
|
||||
Tee every TUI session to a regular file with `--json-file`. Later:
|
||||
|
||||
- Compliance audits can reconstruct exactly what was executed.
|
||||
- Automated regression tests can compare runs across model versions.
|
||||
- A replay tool can re-emit events through the same protocol to feed
|
||||
visualization dashboards.
|
||||
|
||||
### Observability dashboards
|
||||
|
||||
Stream `--json-file` into Loki / OTEL / any pipeline that accepts JSONL.
|
||||
Extract `usage.input_tokens`, `tool_use.name`, `result.duration_api_ms`
|
||||
as first-class metrics in Grafana. No need for log-parsing regex.
|
||||
|
||||
### Testing and QA
|
||||
|
||||
Integration tests spawn Qwen Code headlessly, drive it with `--input-file`
|
||||
scripts, and assert on `--json-file` events. Unlike parsing stdout ANSI,
|
||||
assertions are stable across UI refactors.
|
||||
|
||||
## Flags
|
||||
|
||||
| Flag | Type | Purpose |
|
||||
| --------------------- | ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
|
||||
| `--json-fd <n>` | number, `n >= 3` | Write structured JSON events to file descriptor `n`. The caller must provide this fd via spawn `stdio` configuration or shell redirection. |
|
||||
| `--json-file <path>` | path | Write structured JSON events to a file. The path can be a regular file, a FIFO (named pipe), or `/dev/fd/N`. |
|
||||
| `--input-file <path>` | path | Watch this file for JSONL commands written by an external program. |
|
||||
|
||||
`--json-fd` and `--json-file` are mutually exclusive. fds 0, 1, and 2 are
|
||||
rejected to prevent corrupting the TUI's own output.
|
||||
|
||||
## Why two output flags? (`--json-fd` vs `--json-file`)
|
||||
|
||||
At first glance `--json-fd` looks sufficient — the caller spawns Qwen Code
|
||||
with an extra file descriptor, the TUI writes events to it, done. In
|
||||
practice, fd passing breaks down under the most important embedding
|
||||
scenario: running the TUI inside a pseudo-terminal (PTY). That is why
|
||||
this feature also exposes a path-based alternative.
|
||||
|
||||
### When `--json-fd` works
|
||||
|
||||
Pure `child_process.spawn` with a `stdio` array:
|
||||
|
||||
```ts
|
||||
const child = spawn('qwen', ['--json-fd', '3'], {
|
||||
stdio: ['inherit', 'inherit', 'inherit', eventsFd],
|
||||
});
|
||||
```
|
||||
|
||||
Node's spawn supports arbitrary `stdio` entries; fd 3 is inherited by the
|
||||
child, which can write to it directly. Zero-copy, zero-buffer, zero
|
||||
filesystem — the fastest path.
|
||||
|
||||
### Why `--json-fd` does **not** work under PTY
|
||||
|
||||
PTY wrappers like [`node-pty`](https://github.com/microsoft/node-pty) and
|
||||
[`bun-pty`](https://github.com/oven-sh/bun) are how any serious embedder
|
||||
(IDE extensions, web terminals, tmux-like multiplexers) hosts an
|
||||
interactive TUI. They cannot forward extra fds to the child, for three
|
||||
reinforcing reasons:
|
||||
|
||||
1. **API surface.** `node-pty.spawn(file, args, options)` accepts `cwd`,
|
||||
`env`, `cols`, `rows`, `encoding`, etc. — but **no `stdio` array**. There
|
||||
is simply no place in the API to say "also attach this fd as fd 3 in
|
||||
the child". `bun-pty` exposes the same shape.
|
||||
2. **`forkpty(3)` semantics.** Under the hood, PTY wrappers call
|
||||
`forkpty(3)` (or the equivalent `posix_openpt` + `login_tty` dance).
|
||||
That syscall allocates a master/slave pseudo-terminal pair and
|
||||
redirects the child's fds 0/1/2 to the slave side so the child thinks
|
||||
it is attached to a real terminal. Any fds above 2 in the parent are
|
||||
closed by `login_tty`, which calls `close(fd)` for `fd >= 3` before
|
||||
`exec`. Extra fds are actively wiped, not inherited.
|
||||
3. **Controlling-terminal side effect.** Even if you hacked an extra fd
|
||||
through, it would not be a terminal, so the child's TUI renderer
|
||||
(which writes escape sequences assuming a TTY on fd 1) would still
|
||||
need the slave for its output. You would end up with two independent
|
||||
transports anyway.
|
||||
|
||||
In short: the moment an embedder needs a real TTY for TUI rendering —
|
||||
which is every IDE extension, every web terminal, every desktop chat
|
||||
app — fd inheritance is off the table.
|
||||
|
||||
### `--json-file` fills the gap
|
||||
|
||||
A file path is passed as an ordinary CLI argument, so it survives every
|
||||
spawn model:
|
||||
|
||||
```ts
|
||||
import { spawn } from 'node-pty';
|
||||
|
||||
const pty = spawn(
|
||||
'qwen',
|
||||
[
|
||||
'--json-file',
|
||||
'/tmp/qwen-events.jsonl',
|
||||
'--input-file',
|
||||
'/tmp/qwen-input.jsonl',
|
||||
],
|
||||
{ cols: 120, rows: 40 },
|
||||
);
|
||||
```
|
||||
|
||||
The child opens the file itself and writes events there; the embedder
|
||||
tails the same path with `fs.watch` + incremental reads. Three things to
|
||||
note:
|
||||
|
||||
- **Regular file**, FIFO (named pipe), or `/dev/fd/N` all work. FIFO is
|
||||
the lowest-latency option when both sides are on the same host.
|
||||
- The bridge opens FIFOs with `O_NONBLOCK` and falls back to blocking
|
||||
mode on `ENXIO` (no reader yet), so PTY startup is never deadlocked
|
||||
waiting for a consumer.
|
||||
- For multi-session isolation, use per-session paths under
|
||||
`$XDG_RUNTIME_DIR` or a `mkdtemp`'d directory with mode `0700`.
|
||||
|
||||
### Which flag should I use?
|
||||
|
||||
| Embedding style | Use |
|
||||
| ------------------------------------------------- | -------------------- |
|
||||
| `child_process.spawn` with plain stdio | `--json-fd` |
|
||||
| `node-pty` / `bun-pty` / any PTY host | `--json-file` |
|
||||
| Shell redirection / manual pipeline testing | either |
|
||||
| CI log collection (regular file, read after exit) | `--json-file` |
|
||||
| Lowest possible latency on same host | `--json-file` + FIFO |
|
||||
|
||||
The general rule: **if you need the TUI to render correctly, you need a
|
||||
PTY, which means you need `--json-file`.** `--json-fd` is for simpler
|
||||
embedders that do not care about TUI fidelity — typically programmatic
|
||||
wrappers that throw away stdout anyway.
|
||||
|
||||
## Quick start
|
||||
|
||||
Run Qwen Code with all three channels enabled:
|
||||
|
||||
```bash
|
||||
mkfifo /tmp/qwen-events.jsonl /tmp/qwen-input.jsonl
|
||||
qwen \
|
||||
--json-file /tmp/qwen-events.jsonl \
|
||||
--input-file /tmp/qwen-input.jsonl
|
||||
```
|
||||
|
||||
In a second terminal, tail the event stream:
|
||||
|
||||
```bash
|
||||
cat /tmp/qwen-events.jsonl
|
||||
```
|
||||
|
||||
In a third terminal, push a prompt into the running TUI:
|
||||
|
||||
```bash
|
||||
echo '{"type":"submit","text":"Explain this repo"}' >> /tmp/qwen-input.jsonl
|
||||
```
|
||||
|
||||
The prompt appears in the TUI exactly as if the user typed it, and the
|
||||
streaming response is mirrored on `/tmp/qwen-events.jsonl`.
|
||||
|
||||
## Output event schema
|
||||
|
||||
Events are emitted as JSON Lines (one object per line). The schema is the same
|
||||
one used by the non-interactive `--output-format=stream-json` mode, with
|
||||
`includePartialMessages` always enabled.
|
||||
|
||||
The first event on the channel is always `system` / `session_start`, emitted
|
||||
when the bridge is constructed. Use it to correlate the channel with a
|
||||
session id before any other event arrives.
|
||||
|
||||
```jsonc
|
||||
// Session lifecycle
|
||||
{
|
||||
"type": "system",
|
||||
"subtype": "session_start",
|
||||
"uuid": "...",
|
||||
"session_id": "...",
|
||||
"data": { "session_id": "...", "cwd": "/path/to/cwd" }
|
||||
}
|
||||
|
||||
// Streaming events for an in-progress assistant turn
|
||||
{ "type": "stream_event", "event": { "type": "message_start", "message": { ... } }, ... }
|
||||
{ "type": "stream_event", "event": { "type": "content_block_start", "index": 0, "content_block": { "type": "text" } }, ... }
|
||||
{ "type": "stream_event", "event": { "type": "content_block_delta", "index": 0, "delta": { "type": "text_delta", "text": "Hello" } }, ... }
|
||||
{ "type": "stream_event", "event": { "type": "content_block_stop", "index": 0 }, ... }
|
||||
{ "type": "stream_event", "event": { "type": "message_stop" }, ... }
|
||||
|
||||
// Completed messages
|
||||
{ "type": "user", "message": { "role": "user", "content": [...] }, ... }
|
||||
{ "type": "assistant", "message": { "role": "assistant", "content": [...], "usage": { ... } }, ... }
|
||||
{ "type": "user", "message": { "role": "user", "content": [{ "type": "tool_result", ... }] } }
|
||||
|
||||
// Permission control plane (only when a tool needs approval)
|
||||
{
|
||||
"type": "control_request",
|
||||
"request_id": "...",
|
||||
"request": {
|
||||
"subtype": "can_use_tool",
|
||||
"tool_name": "run_shell_command",
|
||||
"tool_use_id": "...",
|
||||
"input": { "command": "rm -rf /tmp/x" },
|
||||
"permission_suggestions": null,
|
||||
"blocked_path": null
|
||||
}
|
||||
}
|
||||
{
|
||||
"type": "control_response",
|
||||
"response": {
|
||||
"subtype": "success",
|
||||
"request_id": "...",
|
||||
"response": { "allowed": true }
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`control_response` is emitted whether the decision was made in the TUI
|
||||
(native approval UI) or by an external `confirmation_response` (see below).
|
||||
Either way, all observers see the final outcome.
|
||||
|
||||
## Input command schema
|
||||
|
||||
Two command shapes are accepted on `--input-file`:
|
||||
|
||||
```jsonc
|
||||
// Submit a user message into the prompt queue
|
||||
{ "type": "submit", "text": "What does this function do?" }
|
||||
|
||||
// Reply to a pending control_request
|
||||
{ "type": "confirmation_response", "request_id": "...", "allowed": true }
|
||||
```
|
||||
|
||||
Behavior:
|
||||
|
||||
- `submit` commands are queued. If the TUI is busy responding, they are
|
||||
retried automatically the next time the TUI returns to the idle state.
|
||||
- `confirmation_response` commands are dispatched immediately and never
|
||||
queued, because a tool call is blocking and the response must reach the
|
||||
underlying `onConfirm` handler without waiting for any earlier `submit`.
|
||||
- Whichever side approves a tool first wins; the other side's late response
|
||||
is harmlessly dropped.
|
||||
- Lines that fail to parse as JSON are logged and skipped — they do not
|
||||
stop the watcher.
|
||||
|
||||
## Latency notes
|
||||
|
||||
The input file is observed with `fs.watchFile` at a 500 ms polling interval,
|
||||
so worst-case round-trip latency for a remote `submit` is about half a
|
||||
second. This is intentional: polling is portable across platforms and
|
||||
filesystems (including macOS / network mounts), and matches the typical
|
||||
human-in-the-loop pacing the feature targets. The output channel has no
|
||||
polling — events are written synchronously as the TUI emits them.
|
||||
|
||||
## Failure modes
|
||||
|
||||
- **Bad fd.** If the fd passed to `--json-fd` is not open or is one of
|
||||
0/1/2, the TUI prints a warning to `stderr` and continues without dual
|
||||
output enabled.
|
||||
- **Bad path.** If the file passed to `--json-file` cannot be opened, the
|
||||
TUI prints a warning and continues without dual output.
|
||||
- **Consumer disconnect.** If the reader on the other side of the channel
|
||||
goes away (`EPIPE`), the bridge silently disables itself and the TUI
|
||||
keeps running. No retry.
|
||||
- **Adapter exception.** Any exception thrown while emitting an event is
|
||||
caught, logged, and disables the bridge. The TUI is never crashed by a
|
||||
dual-output failure.
|
||||
|
||||
## Spawn example
|
||||
|
||||
A typical embedding parent process spawns Qwen Code with both channels:
|
||||
|
||||
```ts
|
||||
import { spawn } from 'node:child_process';
|
||||
import { openSync } from 'node:fs';
|
||||
|
||||
const eventsFd = openSync('/tmp/qwen-events.jsonl', 'w');
|
||||
const child = spawn(
|
||||
'qwen',
|
||||
['--json-fd', '3', '--input-file', '/tmp/qwen-input.jsonl'],
|
||||
{ stdio: ['inherit', 'inherit', 'inherit', eventsFd] },
|
||||
);
|
||||
```
|
||||
|
||||
The TUI still owns the user's terminal on stdio 0/1/2, while the embedder
|
||||
reads structured events on the file backing fd 3 and pushes commands by
|
||||
appending JSONL lines to `/tmp/qwen-input.jsonl`.
|
||||
|
||||
## Settings-based configuration
|
||||
|
||||
For long-lived embedders it is often inconvenient to thread CLI flags
|
||||
through every launch. The same channels can be configured in
|
||||
`settings.json` under the top-level `dualOutput` key:
|
||||
|
||||
```jsonc
|
||||
// ~/.qwen/settings.json (user-level)
|
||||
// or <workspace>/.qwen/settings.json (workspace-level)
|
||||
{
|
||||
"dualOutput": {
|
||||
"jsonFile": "/tmp/qwen-events.jsonl",
|
||||
"inputFile": "/tmp/qwen-input.jsonl",
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
Precedence rules:
|
||||
|
||||
- CLI flag **wins** over settings. Passing `--json-file /foo` on the
|
||||
command line overrides `dualOutput.jsonFile` in settings.
|
||||
- `--json-fd` has no settings equivalent — fd passing is a spawn-time
|
||||
concern that cannot be statically declared.
|
||||
- If neither flag nor setting is present, dual output stays disabled
|
||||
(identical to today's default).
|
||||
|
||||
The `requiresRestart: true` flag means changes only take effect on the
|
||||
next Qwen Code launch, since the bridge is constructed once during
|
||||
startup.
|
||||
|
||||
## Runnable demos
|
||||
|
||||
Every script below is copy-paste ready. Start with POC 1 to verify
|
||||
the build has dual output; POC 4 is the closest analogue to a real
|
||||
IDE-extension integration.
|
||||
|
||||
### POC 1 — observe the event stream
|
||||
|
||||
Watch every structured event the TUI emits while a human uses it
|
||||
normally:
|
||||
|
||||
```bash
|
||||
# Terminal A
|
||||
mkfifo /tmp/qwen-events.jsonl
|
||||
cat /tmp/qwen-events.jsonl | jq -c 'select(.type != "stream_event") | {type, subtype}'
|
||||
|
||||
# Terminal B
|
||||
qwen --json-file /tmp/qwen-events.jsonl
|
||||
# ...then chat normally; terminal A shows session_start,
|
||||
# user/assistant/result/control_request lifecycle in real time.
|
||||
```
|
||||
|
||||
Expected first line in terminal A:
|
||||
|
||||
```json
|
||||
{ "type": "system", "subtype": "session_start" }
|
||||
```
|
||||
|
||||
### POC 2 — inject prompts from outside
|
||||
|
||||
Drive the TUI from a second terminal without touching the keyboard of
|
||||
the first:
|
||||
|
||||
```bash
|
||||
# Terminal A
|
||||
touch /tmp/qwen-in.jsonl
|
||||
qwen --input-file /tmp/qwen-in.jsonl
|
||||
|
||||
# Terminal B — the TUI responds as if you typed it
|
||||
echo '{"type":"submit","text":"list files in the current directory"}' \
|
||||
>> /tmp/qwen-in.jsonl
|
||||
```
|
||||
|
||||
### POC 3 — remote tool-permission bridge
|
||||
|
||||
Approve or deny tool calls from a separate process:
|
||||
|
||||
```bash
|
||||
# Terminal A — observe control_requests
|
||||
mkfifo /tmp/qwen-out.jsonl
|
||||
touch /tmp/qwen-in.jsonl
|
||||
(cat /tmp/qwen-out.jsonl \
|
||||
| jq -c 'select(.type == "control_request")') &
|
||||
|
||||
# Terminal B
|
||||
qwen --json-file /tmp/qwen-out.jsonl --input-file /tmp/qwen-in.jsonl
|
||||
# Ask Qwen to do something that needs approval, e.g.
|
||||
# "run `ls -la /tmp`". A control_request will appear in terminal A.
|
||||
# Copy the request_id, then in a third terminal:
|
||||
echo '{"type":"confirmation_response","request_id":"<paste-id>","allowed":true}' \
|
||||
>> /tmp/qwen-in.jsonl
|
||||
# The TUI confirmation prompt dismisses and the tool executes.
|
||||
```
|
||||
|
||||
If you reply with an unknown `request_id`, the bridge emits a
|
||||
`control_response` with `subtype: "error"` on the output channel so your
|
||||
consumer can log it or retry:
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "control_response",
|
||||
"response": {
|
||||
"subtype": "error",
|
||||
"request_id": "...",
|
||||
"error": "unknown request_id (already resolved, cancelled, or never issued)"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### POC 4 — Node embedder (IDE-like)
|
||||
|
||||
The most realistic shape: a parent process spawns Qwen Code, tails
|
||||
events, and injects prompts on its own schedule.
|
||||
|
||||
```ts
|
||||
// demo-embedder.ts
|
||||
import { spawn } from 'node:child_process';
|
||||
import { appendFileSync, createReadStream, writeFileSync } from 'node:fs';
|
||||
import { createInterface } from 'node:readline';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
|
||||
const events = join(tmpdir(), `qwen-events-${process.pid}.jsonl`);
|
||||
const input = join(tmpdir(), `qwen-input-${process.pid}.jsonl`);
|
||||
writeFileSync(events, '');
|
||||
writeFileSync(input, '');
|
||||
|
||||
const child = spawn('qwen', ['--json-file', events, '--input-file', input], {
|
||||
stdio: 'inherit',
|
||||
});
|
||||
|
||||
// Tail the output channel. In production you'd use a proper
|
||||
// byte-offset tail; this one re-streams from 0 for brevity.
|
||||
const rl = createInterface({
|
||||
input: createReadStream(events, { encoding: 'utf8' }),
|
||||
});
|
||||
rl.on('line', (line) => {
|
||||
if (!line.trim()) return;
|
||||
const ev = JSON.parse(line);
|
||||
if (ev.type === 'system' && ev.subtype === 'session_start') {
|
||||
console.log('[embedder] handshake:', {
|
||||
protocol_version: ev.data.protocol_version,
|
||||
version: ev.data.version,
|
||||
supported_events: ev.data.supported_events,
|
||||
});
|
||||
// Feature-detect before using a capability
|
||||
if (ev.data.supported_events.includes('control_request')) {
|
||||
console.log('[embedder] permission control-plane available');
|
||||
}
|
||||
}
|
||||
if (ev.type === 'assistant') {
|
||||
console.log(
|
||||
'[embedder] assistant turn ended, tokens =',
|
||||
ev.message.usage?.output_tokens,
|
||||
);
|
||||
}
|
||||
if (ev.type === 'system' && ev.subtype === 'session_end') {
|
||||
console.log('[embedder] session ended cleanly');
|
||||
}
|
||||
});
|
||||
|
||||
// After 2s, inject a prompt as if the user typed it
|
||||
setTimeout(() => {
|
||||
appendFileSync(
|
||||
input,
|
||||
JSON.stringify({ type: 'submit', text: 'hello from embedder' }) + '\n',
|
||||
);
|
||||
}, 2000);
|
||||
|
||||
child.on('exit', () => process.exit(0));
|
||||
```
|
||||
|
||||
Run with:
|
||||
|
||||
```bash
|
||||
npx tsx demo-embedder.ts
|
||||
# Qwen Code TUI opens in the current terminal; the embedder logs
|
||||
# handshake + turn-end + session_end events to the parent's stdout.
|
||||
```
|
||||
|
||||
### POC 5 — capability handshake feature detection
|
||||
|
||||
Older Qwen Code versions won't emit `protocol_version`. Treat the field
|
||||
as optional and feature-detect:
|
||||
|
||||
```ts
|
||||
rl.on('line', (line) => {
|
||||
const ev = JSON.parse(line);
|
||||
if (ev.type === 'system' && ev.subtype === 'session_start') {
|
||||
const v = ev.data?.protocol_version ?? 0;
|
||||
if (v < 1) {
|
||||
console.error(
|
||||
'qwen-code dual output is present but protocol < 1; ' +
|
||||
'falling back to best-effort behavior',
|
||||
);
|
||||
} else {
|
||||
console.log('qwen-code dual output protocol v' + v);
|
||||
}
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
### POC 6 — session_end as a clean termination signal
|
||||
|
||||
```ts
|
||||
rl.on('line', (line) => {
|
||||
const ev = JSON.parse(line);
|
||||
if (ev.type === 'system' && ev.subtype === 'session_end') {
|
||||
console.log('[embedder] clean shutdown, session', ev.data.session_id);
|
||||
// Flush metrics, close WebSockets, etc.
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
If the TUI crashes before `session_end`, the output stream closes
|
||||
(`EPIPE` on next write); embedders should handle both paths.
|
||||
|
||||
### POC 7 — failure drills (prove the flags never break the TUI)
|
||||
|
||||
```bash
|
||||
qwen --json-fd 1
|
||||
# stderr: "Warning: dual output disabled — ..."
|
||||
# TUI still launches normally.
|
||||
|
||||
qwen --json-fd 9999
|
||||
# stderr: "Warning: dual output disabled — fd 9999 not open"
|
||||
# TUI still launches normally.
|
||||
|
||||
qwen --json-fd 3 --json-file /tmp/x.jsonl
|
||||
# yargs rejects: "--json-fd and --json-file are mutually exclusive."
|
||||
# Process exits before TUI starts.
|
||||
|
||||
qwen --json-file /nonexistent/dir/x.jsonl
|
||||
# stderr warning; TUI still launches.
|
||||
```
|
||||
|
||||
## Relation to Claude Code
|
||||
|
||||
Claude Code exposes a similar stream-json event format under
|
||||
`--print --output-format stream-json`, but only in non-interactive mode
|
||||
— it has no equivalent of running the TUI and a structured sidecar
|
||||
channel at the same time. Dual Output fills that gap.
|
||||
|
|
@ -162,6 +162,9 @@ export interface CliArgs {
|
|||
excludeTools: string[] | undefined;
|
||||
authType: string | undefined;
|
||||
channel: string | undefined;
|
||||
jsonFd?: number | undefined;
|
||||
jsonFile?: string | undefined;
|
||||
inputFile?: string | undefined;
|
||||
}
|
||||
|
||||
function normalizeOutputFormat(
|
||||
|
|
@ -460,6 +463,25 @@ export async function parseArguments(): Promise<CliArgs> {
|
|||
'Include partial assistant messages when using stream-json output.',
|
||||
default: false,
|
||||
})
|
||||
.option('json-fd', {
|
||||
type: 'number',
|
||||
description:
|
||||
'File descriptor for structured JSON event output (dual output mode). ' +
|
||||
'The TUI renders normally on stdout while JSON events are written to this fd. ' +
|
||||
'The caller must provide this fd via spawn stdio configuration.',
|
||||
})
|
||||
.option('json-file', {
|
||||
type: 'string',
|
||||
description:
|
||||
'File path for structured JSON event output (dual output mode). ' +
|
||||
'Can be a regular file, FIFO (named pipe), or /dev/fd/N.',
|
||||
})
|
||||
.option('input-file', {
|
||||
type: 'string',
|
||||
description:
|
||||
'File path for receiving remote input commands (bidirectional sync). ' +
|
||||
'An external process writes JSONL commands; the TUI watches and processes them.',
|
||||
})
|
||||
.option('continue', {
|
||||
alias: 'c',
|
||||
type: 'boolean',
|
||||
|
|
@ -575,6 +597,9 @@ export async function parseArguments(): Promise<CliArgs> {
|
|||
if (argv['resume'] && !isValidSessionId(argv['resume'] as string)) {
|
||||
return `Invalid --resume: "${argv['resume']}". Must be a valid UUID (e.g., "123e4567-e89b-12d3-a456-426614174000").`;
|
||||
}
|
||||
if (argv['jsonFd'] != null && argv['jsonFile'] != null) {
|
||||
return '--json-fd and --json-file are mutually exclusive. Use one or the other.';
|
||||
}
|
||||
return true;
|
||||
}),
|
||||
)
|
||||
|
|
@ -1140,6 +1165,13 @@ export async function loadCliConfig(
|
|||
hooks: settings.hooks, // Keep for backward compatibility
|
||||
disableAllHooks: settings.disableAllHooks ?? false,
|
||||
channel: argv.channel,
|
||||
// CLI flag wins over settings.json. `--json-fd` is fd-only (no settings
|
||||
// equivalent — fd passing is a spawn-time concern). `--json-file` and
|
||||
// `--input-file` fall back to settings.dualOutput.* when the flag is
|
||||
// absent.
|
||||
jsonFd: argv.jsonFd,
|
||||
jsonFile: argv.jsonFile ?? settings.dualOutput?.jsonFile,
|
||||
inputFile: argv.inputFile ?? settings.dualOutput?.inputFile,
|
||||
// Precedence: explicit CLI flag > settings file > default(true).
|
||||
// NOTE: do NOT set a yargs default for `chat-recording`, otherwise argv will
|
||||
// always be true and the settings file can never disable recording.
|
||||
|
|
|
|||
|
|
@ -448,6 +448,44 @@ const SETTINGS_SCHEMA = {
|
|||
},
|
||||
},
|
||||
|
||||
dualOutput: {
|
||||
type: 'object',
|
||||
label: 'Dual Output',
|
||||
category: 'Advanced',
|
||||
requiresRestart: true,
|
||||
default: {},
|
||||
description:
|
||||
'Dual-output sidecar mode: emit structured JSON events to a ' +
|
||||
'second channel while the TUI renders normally on stdout. See ' +
|
||||
'docs/users/features/dual-output.md. CLI flags take precedence ' +
|
||||
'over these settings.',
|
||||
showInDialog: false,
|
||||
properties: {
|
||||
jsonFile: {
|
||||
type: 'string',
|
||||
label: 'JSON Event File',
|
||||
category: 'Advanced',
|
||||
requiresRestart: true,
|
||||
default: undefined as string | undefined,
|
||||
description:
|
||||
'File path for structured JSON event output. Equivalent to ' +
|
||||
'--json-file. Ignored if --json-fd or --json-file is also set.',
|
||||
showInDialog: false,
|
||||
},
|
||||
inputFile: {
|
||||
type: 'string',
|
||||
label: 'Remote Input File',
|
||||
category: 'Advanced',
|
||||
requiresRestart: true,
|
||||
default: undefined as string | undefined,
|
||||
description:
|
||||
'File path for remote input commands (JSONL). Equivalent to ' +
|
||||
'--input-file. Ignored if --input-file is also set.',
|
||||
showInDialog: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
ui: {
|
||||
type: 'object',
|
||||
label: 'UI',
|
||||
|
|
|
|||
211
packages/cli/src/dualOutput/DualOutputBridge.test.ts
Normal file
211
packages/cli/src/dualOutput/DualOutputBridge.test.ts
Normal file
|
|
@ -0,0 +1,211 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||
import * as fs from 'node:fs';
|
||||
import * as os from 'node:os';
|
||||
import * as path from 'node:path';
|
||||
import type { Config } from '@qwen-code/qwen-code-core';
|
||||
import {
|
||||
DualOutputBridge,
|
||||
DUAL_OUTPUT_PROTOCOL_VERSION,
|
||||
SUPPORTED_EVENTS,
|
||||
} from './DualOutputBridge.js';
|
||||
|
||||
function createMockConfig(): Config {
|
||||
return {
|
||||
getSessionId: vi.fn().mockReturnValue('test-session'),
|
||||
getModel: vi.fn().mockReturnValue('test-model'),
|
||||
} as unknown as Config;
|
||||
}
|
||||
|
||||
function readJsonl(file: string): Array<Record<string, unknown>> {
|
||||
return fs
|
||||
.readFileSync(file, 'utf8')
|
||||
.split('\n')
|
||||
.filter(Boolean)
|
||||
.map((line) => JSON.parse(line) as Record<string, unknown>);
|
||||
}
|
||||
|
||||
describe('DualOutputBridge', () => {
|
||||
let tmpDir: string;
|
||||
let target: string;
|
||||
let config: Config;
|
||||
let bridge: DualOutputBridge | null = null;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'qwen-dual-output-'));
|
||||
target = path.join(tmpDir, 'events.jsonl');
|
||||
fs.writeFileSync(target, '');
|
||||
config = createMockConfig();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
bridge?.shutdown();
|
||||
bridge = null;
|
||||
// Give the stream a tick to flush before removing the directory
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
describe('--json-fd validation', () => {
|
||||
it.each([0, 1, 2])('rejects reserved fd %d', (fd) => {
|
||||
expect(() => new DualOutputBridge(config, { fd })).toThrow(/reserved/);
|
||||
});
|
||||
|
||||
it('rejects an unopened fd with a clear message', () => {
|
||||
// 9999 is extremely unlikely to be open in the test process
|
||||
expect(() => new DualOutputBridge(config, { fd: 9999 })).toThrow(
|
||||
/file descriptor is not open/,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('--json-file output', () => {
|
||||
it('creates the file automatically when it does not exist (ENOENT fallback)', async () => {
|
||||
const newFile = path.join(tmpDir, 'does-not-exist.jsonl');
|
||||
// newFile is NOT pre-created — tests the ENOENT fallback path
|
||||
bridge = new DualOutputBridge(config, { filePath: newFile });
|
||||
bridge.shutdown();
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const lines = readJsonl(newFile);
|
||||
expect(lines.length).toBeGreaterThan(0);
|
||||
expect(lines[0]).toMatchObject({
|
||||
type: 'system',
|
||||
subtype: 'session_start',
|
||||
});
|
||||
});
|
||||
|
||||
it('emits a session_start event immediately on construction', async () => {
|
||||
bridge = new DualOutputBridge(config, { filePath: target });
|
||||
bridge.shutdown();
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const lines = readJsonl(target);
|
||||
expect(lines.length).toBeGreaterThan(0);
|
||||
expect(lines[0]).toMatchObject({
|
||||
type: 'system',
|
||||
subtype: 'session_start',
|
||||
data: { session_id: 'test-session' },
|
||||
});
|
||||
});
|
||||
|
||||
it('session_start carries a capability handshake (version, protocol_version, supported_events)', async () => {
|
||||
bridge = new DualOutputBridge(
|
||||
config,
|
||||
{ filePath: target },
|
||||
{ version: '1.2.3' },
|
||||
);
|
||||
bridge.shutdown();
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const lines = readJsonl(target);
|
||||
const start = lines.find(
|
||||
(l) => l['type'] === 'system' && l['subtype'] === 'session_start',
|
||||
);
|
||||
expect(start).toBeDefined();
|
||||
const data = (start as { data: Record<string, unknown> }).data;
|
||||
expect(data['version']).toBe('1.2.3');
|
||||
expect(data['protocol_version']).toBe(DUAL_OUTPUT_PROTOCOL_VERSION);
|
||||
expect(data['supported_events']).toEqual([...SUPPORTED_EVENTS]);
|
||||
});
|
||||
|
||||
it('emits session_end on shutdown for a clean termination signal', async () => {
|
||||
bridge = new DualOutputBridge(config, { filePath: target });
|
||||
bridge.shutdown();
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const lines = readJsonl(target);
|
||||
const end = lines.find(
|
||||
(l) => l['type'] === 'system' && l['subtype'] === 'session_end',
|
||||
);
|
||||
expect(end).toMatchObject({
|
||||
type: 'system',
|
||||
subtype: 'session_end',
|
||||
data: { session_id: 'test-session' },
|
||||
});
|
||||
});
|
||||
|
||||
it('shutdown is idempotent — calling it twice emits session_end only once', async () => {
|
||||
bridge = new DualOutputBridge(config, { filePath: target });
|
||||
bridge.shutdown();
|
||||
bridge.shutdown();
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const lines = readJsonl(target);
|
||||
const endEvents = lines.filter(
|
||||
(l) => l['type'] === 'system' && l['subtype'] === 'session_end',
|
||||
);
|
||||
expect(endEvents).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('emitControlError routes through the adapter as a control_response error', async () => {
|
||||
bridge = new DualOutputBridge(config, { filePath: target });
|
||||
bridge.emitControlError('req-missing', 'unknown request_id');
|
||||
bridge.shutdown();
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const lines = readJsonl(target);
|
||||
const errorResponse = lines.find(
|
||||
(l) =>
|
||||
l['type'] === 'control_response' &&
|
||||
(l['response'] as { subtype?: string })?.subtype === 'error',
|
||||
);
|
||||
expect(errorResponse).toMatchObject({
|
||||
type: 'control_response',
|
||||
response: {
|
||||
subtype: 'error',
|
||||
request_id: 'req-missing',
|
||||
error: 'unknown request_id',
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('routes permission requests + responses through the adapter', async () => {
|
||||
bridge = new DualOutputBridge(config, { filePath: target });
|
||||
bridge.emitPermissionRequest('req-1', 'shell', 'tu-1', { cmd: 'ls' });
|
||||
bridge.emitControlResponse('req-1', false);
|
||||
bridge.shutdown();
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const lines = readJsonl(target);
|
||||
const request = lines.find((l) => l['type'] === 'control_request');
|
||||
const response = lines.find((l) => l['type'] === 'control_response');
|
||||
expect(request).toMatchObject({
|
||||
type: 'control_request',
|
||||
request_id: 'req-1',
|
||||
request: {
|
||||
subtype: 'can_use_tool',
|
||||
tool_name: 'shell',
|
||||
tool_use_id: 'tu-1',
|
||||
input: { cmd: 'ls' },
|
||||
blocked_path: null,
|
||||
},
|
||||
});
|
||||
expect(response).toMatchObject({
|
||||
type: 'control_response',
|
||||
response: {
|
||||
subtype: 'success',
|
||||
request_id: 'req-1',
|
||||
response: { allowed: false },
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('reports isConnected=false after shutdown and silently drops further events', async () => {
|
||||
bridge = new DualOutputBridge(config, { filePath: target });
|
||||
bridge.shutdown();
|
||||
expect(bridge.isConnected).toBe(false);
|
||||
|
||||
// Should not throw
|
||||
expect(() =>
|
||||
bridge!.emitPermissionRequest('req', 'tool', 'tu', {}),
|
||||
).not.toThrow();
|
||||
expect(() => bridge!.emitControlResponse('req', true)).not.toThrow();
|
||||
});
|
||||
});
|
||||
});
|
||||
314
packages/cli/src/dualOutput/DualOutputBridge.ts
Normal file
314
packages/cli/src/dualOutput/DualOutputBridge.ts
Normal file
|
|
@ -0,0 +1,314 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import {
|
||||
createWriteStream,
|
||||
fstatSync,
|
||||
openSync,
|
||||
constants,
|
||||
type WriteStream,
|
||||
} from 'node:fs';
|
||||
import type {
|
||||
Config,
|
||||
ServerGeminiStreamEvent,
|
||||
ToolCallRequestInfo,
|
||||
ToolCallResponseInfo,
|
||||
} from '@qwen-code/qwen-code-core';
|
||||
import { createDebugLogger } from '@qwen-code/qwen-code-core';
|
||||
import type { Part } from '@google/genai';
|
||||
import { StreamJsonOutputAdapter } from '../nonInteractive/io/index.js';
|
||||
|
||||
const debugLogger = createDebugLogger('DUAL_OUTPUT');
|
||||
|
||||
/**
|
||||
* Structured-event kinds this bridge version is known to emit. Exposed to
|
||||
* consumers in `session_start.data.supported_events` so they can
|
||||
* feature-detect rather than sniffing the stream or hard-coding a minimum
|
||||
* CLI version.
|
||||
*
|
||||
* When adding a new event kind, append it here and bump the handshake
|
||||
* `protocol_version` below so consumers can gate on the combination.
|
||||
*/
|
||||
export const SUPPORTED_EVENTS = [
|
||||
'system',
|
||||
'user',
|
||||
'assistant',
|
||||
'stream_event',
|
||||
'result',
|
||||
'control_request',
|
||||
'control_response',
|
||||
] as const;
|
||||
|
||||
/**
|
||||
* Monotonically-increasing integer bumped whenever the wire protocol
|
||||
* changes in a way consumers might care about (new event types,
|
||||
* new payload fields that are not purely additive, etc.).
|
||||
*
|
||||
* History:
|
||||
* 1 — initial release (session_start, session_end, full stream-json).
|
||||
*/
|
||||
export const DUAL_OUTPUT_PROTOCOL_VERSION = 1;
|
||||
|
||||
/**
|
||||
* Optional metadata wired into the `session_start` capability handshake.
|
||||
*/
|
||||
export interface DualOutputBridgeOptions {
|
||||
/** CLI version string (e.g. "0.14.5"). Surfaced in session_start. */
|
||||
version?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Bridges TUI-mode events to a sidecar StreamJsonOutputAdapter that writes
|
||||
* structured JSON events to a secondary output channel (fd or file).
|
||||
*
|
||||
* This enables "dual output" mode: the TUI renders normally on stdout while
|
||||
* a parallel JSON event stream is emitted on a separate channel for
|
||||
* programmatic consumption by IDE extensions, web frontends, CI pipelines, etc.
|
||||
*
|
||||
* Usage:
|
||||
* qwen --json-fd 3 # JSON events written to fd 3
|
||||
* qwen --json-file /path # JSON events written to file/FIFO
|
||||
*/
|
||||
export class DualOutputBridge {
|
||||
private readonly adapter: StreamJsonOutputAdapter;
|
||||
private readonly stream: WriteStream;
|
||||
private readonly sessionId: string;
|
||||
private active = true;
|
||||
private shutdownCalled = false;
|
||||
|
||||
constructor(
|
||||
config: Config,
|
||||
target: { fd: number } | { filePath: string },
|
||||
options: DualOutputBridgeOptions = {},
|
||||
) {
|
||||
this.sessionId = config.getSessionId();
|
||||
if ('fd' in target) {
|
||||
// Reject stdin/stdout/stderr to prevent corrupting TUI output
|
||||
if (target.fd <= 2) {
|
||||
throw new Error(
|
||||
`--json-fd ${target.fd}: file descriptors 0 (stdin), 1 (stdout), and 2 (stderr) ` +
|
||||
'are reserved. Use fd 3 or higher.',
|
||||
);
|
||||
}
|
||||
// Validate fd is open before attempting to use it
|
||||
try {
|
||||
fstatSync(target.fd);
|
||||
} catch {
|
||||
throw new Error(
|
||||
`--json-fd ${target.fd}: file descriptor is not open. ` +
|
||||
'The caller must provide this fd via spawn stdio configuration ' +
|
||||
'or shell redirection (e.g., 3>/tmp/events.jsonl).',
|
||||
);
|
||||
}
|
||||
this.stream = createWriteStream('', { fd: target.fd });
|
||||
} else {
|
||||
// Open with O_WRONLY|O_NONBLOCK to avoid blocking the event loop on FIFOs.
|
||||
// On FIFO, a regular open(O_WRONLY) blocks until a reader connects.
|
||||
// O_NONBLOCK makes it return immediately (ENXIO if no reader yet, which
|
||||
// createWriteStream handles via its internal retry/error mechanism).
|
||||
try {
|
||||
const fd = openSync(
|
||||
target.filePath,
|
||||
constants.O_WRONLY | constants.O_NONBLOCK,
|
||||
);
|
||||
this.stream = createWriteStream('', { fd });
|
||||
} catch (err) {
|
||||
const code = (err as NodeJS.ErrnoException).code;
|
||||
// ENXIO: FIFO has no reader yet — fall back to blocking open.
|
||||
// ENOENT: regular file doesn't exist yet — create it.
|
||||
if (code === 'ENXIO' || code === 'ENOENT') {
|
||||
this.stream = createWriteStream(target.filePath, { flags: 'w' });
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.stream.on('error', (err) => {
|
||||
const code = (err as NodeJS.ErrnoException).code;
|
||||
// Consumer disconnected — gracefully stop writing, don't crash the TUI
|
||||
if (code === 'EPIPE' || code === 'ERR_STREAM_DESTROYED') {
|
||||
debugLogger.warn('DualOutput: consumer disconnected, disabling');
|
||||
} else {
|
||||
debugLogger.error('DualOutput stream error:', err);
|
||||
}
|
||||
// Disable on any stream error to prevent repeated write failures
|
||||
this.active = false;
|
||||
});
|
||||
|
||||
this.adapter = new StreamJsonOutputAdapter(
|
||||
config,
|
||||
true, // includePartialMessages — always emit streaming events
|
||||
this.stream,
|
||||
);
|
||||
|
||||
// Announce the session immediately so consumers can correlate the channel
|
||||
// with a session before any other event arrives. The data payload also
|
||||
// serves as a capability handshake: consumers can read `protocol_version`
|
||||
// and `supported_events` to feature-detect without sniffing the stream.
|
||||
try {
|
||||
this.adapter.emitSystemMessage('session_start', {
|
||||
session_id: this.sessionId,
|
||||
cwd: process.cwd(),
|
||||
protocol_version: DUAL_OUTPUT_PROTOCOL_VERSION,
|
||||
version: options.version,
|
||||
supported_events: [...SUPPORTED_EVENTS],
|
||||
});
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput session_start error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
processEvent(event: ServerGeminiStreamEvent): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.processEvent(event);
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput processEvent error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
startAssistantMessage(): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.startAssistantMessage();
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput startAssistantMessage error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
finalizeAssistantMessage(): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.finalizeAssistantMessage();
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput finalizeAssistantMessage error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
emitUserMessage(parts: Part[]): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.emitUserMessage(parts);
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput emitUserMessage error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
emitToolResult(
|
||||
request: ToolCallRequestInfo,
|
||||
response: ToolCallResponseInfo,
|
||||
): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.emitToolResult(request, response);
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput emitToolResult error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
/** Whether the underlying stream is still writable. */
|
||||
get isConnected(): boolean {
|
||||
return this.active;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a `can_use_tool` permission request so an external consumer can
|
||||
* approve or deny the tool call. Pairs with {@link emitControlResponse}.
|
||||
*/
|
||||
emitPermissionRequest(
|
||||
requestId: string,
|
||||
toolName: string,
|
||||
toolUseId: string,
|
||||
input: unknown,
|
||||
blockedPath: string | null = null,
|
||||
): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.emitPermissionRequest(
|
||||
requestId,
|
||||
toolName,
|
||||
toolUseId,
|
||||
input,
|
||||
blockedPath,
|
||||
);
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput emitPermissionRequest error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits the result of a permission decision (made either in the TUI or by
|
||||
* the external consumer) so all observers stay in sync.
|
||||
*/
|
||||
emitControlResponse(requestId: string, allowed: boolean): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.emitControlResponse(requestId, allowed);
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput emitControlResponse error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a `control_response` with subtype `error` — used when an external
|
||||
* `confirmation_response` cannot be satisfied (unknown request_id, the
|
||||
* tool call already resolved, stream already closed, etc.). Lets
|
||||
* consumers retry or surface the error instead of silently hanging.
|
||||
*/
|
||||
emitControlError(requestId: string, message: string): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.emitControlError(requestId, message);
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput emitControlError error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
/** General-purpose system event escape hatch. */
|
||||
emitSystemMessage(subtype: string, data?: unknown): void {
|
||||
if (!this.active) return;
|
||||
try {
|
||||
this.adapter.emitSystemMessage(subtype, data);
|
||||
} catch (err) {
|
||||
debugLogger.error('DualOutput emitSystemMessage error:', err);
|
||||
this.active = false;
|
||||
}
|
||||
}
|
||||
|
||||
shutdown(): void {
|
||||
if (this.shutdownCalled) return;
|
||||
this.shutdownCalled = true;
|
||||
// Try to emit session_end before tearing the stream down so consumers
|
||||
// get a definitive termination signal rather than inferring it from
|
||||
// EPIPE. Failures here are swallowed — the stream may already be in an
|
||||
// error state if the consumer disconnected first.
|
||||
if (this.active) {
|
||||
try {
|
||||
this.adapter.emitSystemMessage('session_end', {
|
||||
session_id: this.sessionId,
|
||||
});
|
||||
} catch {
|
||||
// ignore — stream likely already closed
|
||||
}
|
||||
}
|
||||
this.active = false;
|
||||
try {
|
||||
this.stream.end();
|
||||
} catch (err) {
|
||||
debugLogger.debug('DualOutput: stream end error during shutdown:', err);
|
||||
}
|
||||
}
|
||||
}
|
||||
25
packages/cli/src/dualOutput/DualOutputContext.tsx
Normal file
25
packages/cli/src/dualOutput/DualOutputContext.tsx
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { createContext, useContext } from 'react';
|
||||
import type { DualOutputBridge } from './DualOutputBridge.js';
|
||||
|
||||
/**
|
||||
* React context for the dual output bridge.
|
||||
* Provides access to the sidecar JSON event emitter throughout the
|
||||
* interactive UI component tree.
|
||||
*/
|
||||
export const DualOutputContext = createContext<DualOutputBridge | null>(null);
|
||||
|
||||
/**
|
||||
* Hook to access the dual output bridge from any component or hook
|
||||
* within the interactive UI.
|
||||
*
|
||||
* Returns null when dual output is not enabled (no --json-fd or --json-file).
|
||||
*/
|
||||
export function useDualOutput(): DualOutputBridge | null {
|
||||
return useContext(DualOutputContext);
|
||||
}
|
||||
8
packages/cli/src/dualOutput/index.ts
Normal file
8
packages/cli/src/dualOutput/index.ts
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
export { DualOutputBridge } from './DualOutputBridge.js';
|
||||
export { DualOutputContext, useDualOutput } from './DualOutputContext.js';
|
||||
|
|
@ -66,6 +66,10 @@ import { computeWindowTitle } from './utils/windowTitle.js';
|
|||
import { validateNonInteractiveAuth } from './validateNonInterActiveAuth.js';
|
||||
import { showResumeSessionPicker } from './ui/components/StandaloneSessionPicker.js';
|
||||
import { initializeLlmOutputLanguage } from './utils/languageUtils.js';
|
||||
import { DualOutputBridge } from './dualOutput/DualOutputBridge.js';
|
||||
import { DualOutputContext } from './dualOutput/DualOutputContext.js';
|
||||
import { RemoteInputWatcher } from './remoteInput/RemoteInputWatcher.js';
|
||||
import { RemoteInputContext } from './remoteInput/RemoteInputContext.js';
|
||||
|
||||
const debugLogger = createDebugLogger('STARTUP');
|
||||
|
||||
|
|
@ -152,35 +156,84 @@ export async function startInteractiveUI(
|
|||
const version = await getCliVersion();
|
||||
setWindowTitle(basename(workspaceRoot), settings);
|
||||
|
||||
// Create dual output bridge if --json-fd or --json-file is specified.
|
||||
// Errors are caught so a bad fd/path degrades gracefully instead of
|
||||
// preventing the TUI from launching.
|
||||
let dualOutputBridge: DualOutputBridge | null = null;
|
||||
const jsonFd = config.getJsonFd?.();
|
||||
const jsonFile = config.getJsonFile?.();
|
||||
try {
|
||||
if (jsonFd != null) {
|
||||
dualOutputBridge = new DualOutputBridge(
|
||||
config,
|
||||
{ fd: jsonFd },
|
||||
{ version },
|
||||
);
|
||||
} else if (jsonFile != null) {
|
||||
dualOutputBridge = new DualOutputBridge(
|
||||
config,
|
||||
{ filePath: jsonFile },
|
||||
{ version },
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
debugLogger.error('Failed to initialize dual output bridge:', err);
|
||||
writeStderrLine(
|
||||
`Warning: dual output disabled — ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Create remote input watcher if --input-file is specified.
|
||||
// This enables bidirectional sync: an external process writes JSONL
|
||||
// commands to this file, and the TUI processes them as user messages.
|
||||
let remoteInputWatcher: RemoteInputWatcher | null = null;
|
||||
const inputFile = config.getInputFile?.();
|
||||
if (inputFile) {
|
||||
try {
|
||||
remoteInputWatcher = new RemoteInputWatcher(inputFile);
|
||||
} catch (err) {
|
||||
debugLogger.error('Failed to initialize remote input watcher:', err);
|
||||
writeStderrLine(
|
||||
`Warning: remote input disabled — ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Create wrapper component to use hooks inside render
|
||||
const AppWrapper = () => {
|
||||
const kittyProtocolStatus = useKittyKeyboardProtocol();
|
||||
const nodeMajorVersion = parseInt(process.versions.node.split('.')[0], 10);
|
||||
return (
|
||||
<SettingsContext.Provider value={settings}>
|
||||
<KeypressProvider
|
||||
kittyProtocolEnabled={kittyProtocolStatus.enabled}
|
||||
config={config}
|
||||
debugKeystrokeLogging={settings.merged.general?.debugKeystrokeLogging}
|
||||
pasteWorkaround={
|
||||
process.platform === 'win32' || nodeMajorVersion < 20
|
||||
}
|
||||
>
|
||||
<SessionStatsProvider sessionId={config.getSessionId()}>
|
||||
<VimModeProvider settings={settings}>
|
||||
<AgentViewProvider config={config}>
|
||||
<AppContainer
|
||||
config={config}
|
||||
settings={settings}
|
||||
startupWarnings={startupWarnings}
|
||||
version={version}
|
||||
initializationResult={initializationResult}
|
||||
/>
|
||||
</AgentViewProvider>
|
||||
</VimModeProvider>
|
||||
</SessionStatsProvider>
|
||||
</KeypressProvider>
|
||||
</SettingsContext.Provider>
|
||||
<RemoteInputContext.Provider value={remoteInputWatcher}>
|
||||
<DualOutputContext.Provider value={dualOutputBridge}>
|
||||
<SettingsContext.Provider value={settings}>
|
||||
<KeypressProvider
|
||||
kittyProtocolEnabled={kittyProtocolStatus.enabled}
|
||||
config={config}
|
||||
debugKeystrokeLogging={
|
||||
settings.merged.general?.debugKeystrokeLogging
|
||||
}
|
||||
pasteWorkaround={
|
||||
process.platform === 'win32' || nodeMajorVersion < 20
|
||||
}
|
||||
>
|
||||
<SessionStatsProvider sessionId={config.getSessionId()}>
|
||||
<VimModeProvider settings={settings}>
|
||||
<AgentViewProvider config={config}>
|
||||
<AppContainer
|
||||
config={config}
|
||||
settings={settings}
|
||||
startupWarnings={startupWarnings}
|
||||
version={version}
|
||||
initializationResult={initializationResult}
|
||||
/>
|
||||
</AgentViewProvider>
|
||||
</VimModeProvider>
|
||||
</SessionStatsProvider>
|
||||
</KeypressProvider>
|
||||
</SettingsContext.Provider>
|
||||
</DualOutputContext.Provider>
|
||||
</RemoteInputContext.Provider>
|
||||
);
|
||||
};
|
||||
|
||||
|
|
@ -211,7 +264,11 @@ export async function startInteractiveUI(
|
|||
});
|
||||
}
|
||||
|
||||
registerCleanup(() => instance.unmount());
|
||||
registerCleanup(() => {
|
||||
remoteInputWatcher?.shutdown();
|
||||
dualOutputBridge?.shutdown();
|
||||
instance.unmount();
|
||||
});
|
||||
}
|
||||
|
||||
export async function main() {
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import type {
|
|||
CLIResultMessageSuccess,
|
||||
CLIUserMessage,
|
||||
ContentBlock,
|
||||
ControlMessage,
|
||||
ExtendedUsage,
|
||||
TextBlock,
|
||||
ThinkingBlock,
|
||||
|
|
@ -408,6 +409,15 @@ export abstract class BaseJsonOutputAdapter {
|
|||
*/
|
||||
protected abstract emitMessageImpl(message: CLIMessage): void;
|
||||
|
||||
/**
|
||||
* Emits a control-plane message (control_request / control_response).
|
||||
* Only meaningful in streaming adapters; batch adapters inherit this
|
||||
* no-op since control messages are not collected into the final array.
|
||||
*/
|
||||
protected emitControlMessageImpl(_message: ControlMessage): void {
|
||||
// Default: no-op for non-streaming / batch adapters.
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract method to determine if stream events should be emitted.
|
||||
*
|
||||
|
|
@ -1061,6 +1071,67 @@ export abstract class BaseJsonOutputAdapter {
|
|||
this.emitMessageImpl(systemMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a `can_use_tool` permission control_request so an external consumer
|
||||
* can approve or deny the tool call. Pairs with {@link emitControlResponse}.
|
||||
*/
|
||||
emitPermissionRequest(
|
||||
requestId: string,
|
||||
toolName: string,
|
||||
toolUseId: string,
|
||||
input: unknown,
|
||||
blockedPath: string | null = null,
|
||||
): void {
|
||||
const message: ControlMessage = {
|
||||
type: 'control_request',
|
||||
request_id: requestId,
|
||||
request: {
|
||||
subtype: 'can_use_tool',
|
||||
tool_name: toolName,
|
||||
tool_use_id: toolUseId,
|
||||
input,
|
||||
permission_suggestions: null,
|
||||
blocked_path: blockedPath,
|
||||
},
|
||||
};
|
||||
this.emitControlMessageImpl(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a control_response carrying a permission approval result.
|
||||
* Used both to mirror TUI-native resolutions back to external consumers
|
||||
* and to acknowledge externally-supplied confirmation_responses.
|
||||
*/
|
||||
emitControlResponse(requestId: string, allowed: boolean): void {
|
||||
const message: ControlMessage = {
|
||||
type: 'control_response',
|
||||
response: {
|
||||
subtype: 'success',
|
||||
request_id: requestId,
|
||||
response: { allowed },
|
||||
},
|
||||
};
|
||||
this.emitControlMessageImpl(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a control_response with subtype `error`. Used to reject an
|
||||
* external confirmation_response that cannot be honored (unknown
|
||||
* request_id, tool call already resolved, etc.) so the consumer can
|
||||
* surface or retry, instead of waiting forever for an implicit ack.
|
||||
*/
|
||||
emitControlError(requestId: string, errorMessage: string): void {
|
||||
const message: ControlMessage = {
|
||||
type: 'control_response',
|
||||
response: {
|
||||
subtype: 'error',
|
||||
request_id: requestId,
|
||||
error: errorMessage,
|
||||
},
|
||||
};
|
||||
this.emitControlMessageImpl(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a tool progress stream event.
|
||||
* Default implementation is a no-op. StreamJsonOutputAdapter overrides this
|
||||
|
|
|
|||
|
|
@ -0,0 +1,149 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach, vi } from 'vitest';
|
||||
import type { Config } from '@qwen-code/qwen-code-core';
|
||||
import { GeminiEventType } from '@qwen-code/qwen-code-core';
|
||||
import { StreamJsonOutputAdapter } from './StreamJsonOutputAdapter.js';
|
||||
|
||||
/**
|
||||
* Tests covering the dual-output extensions to StreamJsonOutputAdapter:
|
||||
* - injected outputStream (used by DualOutputBridge to redirect output
|
||||
* to fd / file instead of stdout);
|
||||
* - emitPermissionRequest / emitControlResponse, which carry tool
|
||||
* approval events over the same channel.
|
||||
*
|
||||
* Kept in a separate file from StreamJsonOutputAdapter.test.ts so new
|
||||
* assertions do not force a relint of the pre-existing file.
|
||||
*/
|
||||
|
||||
function createMockConfig(): Config {
|
||||
return {
|
||||
getSessionId: vi.fn().mockReturnValue('test-session-id'),
|
||||
getModel: vi.fn().mockReturnValue('test-model'),
|
||||
} as unknown as Config;
|
||||
}
|
||||
|
||||
describe('StreamJsonOutputAdapter — dual-output extensions', () => {
|
||||
let mockConfig: Config;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
let stdoutWriteSpy: any;
|
||||
|
||||
beforeEach(() => {
|
||||
mockConfig = createMockConfig();
|
||||
stdoutWriteSpy = vi
|
||||
.spyOn(process.stdout, 'write')
|
||||
.mockImplementation(() => true);
|
||||
});
|
||||
|
||||
describe('custom outputStream injection', () => {
|
||||
it('writes to the injected stream instead of stdout', () => {
|
||||
const writes: string[] = [];
|
||||
const customStream = {
|
||||
write(chunk: string): boolean {
|
||||
writes.push(chunk);
|
||||
return true;
|
||||
},
|
||||
} as unknown as NodeJS.WritableStream;
|
||||
|
||||
const adapter = new StreamJsonOutputAdapter(
|
||||
mockConfig,
|
||||
false,
|
||||
customStream,
|
||||
);
|
||||
adapter.startAssistantMessage();
|
||||
adapter.processEvent({
|
||||
type: GeminiEventType.Content,
|
||||
value: 'sidecar',
|
||||
});
|
||||
adapter.finalizeAssistantMessage();
|
||||
|
||||
expect(writes.length).toBeGreaterThan(0);
|
||||
expect(stdoutWriteSpy).not.toHaveBeenCalled();
|
||||
|
||||
const lastParsed = JSON.parse(writes[writes.length - 1]);
|
||||
expect(lastParsed.type).toBe('assistant');
|
||||
});
|
||||
});
|
||||
|
||||
describe('emitPermissionRequest / emitControlResponse', () => {
|
||||
it('emits a control_request with subtype can_use_tool', () => {
|
||||
const adapter = new StreamJsonOutputAdapter(mockConfig, false);
|
||||
stdoutWriteSpy.mockClear();
|
||||
|
||||
adapter.emitPermissionRequest(
|
||||
'req-1',
|
||||
'run_shell_command',
|
||||
'tool-use-1',
|
||||
{ command: 'ls' },
|
||||
'/etc/passwd',
|
||||
);
|
||||
|
||||
expect(stdoutWriteSpy).toHaveBeenCalledTimes(1);
|
||||
const parsed = JSON.parse(stdoutWriteSpy.mock.calls[0][0] as string);
|
||||
expect(parsed).toEqual({
|
||||
type: 'control_request',
|
||||
request_id: 'req-1',
|
||||
request: {
|
||||
subtype: 'can_use_tool',
|
||||
tool_name: 'run_shell_command',
|
||||
tool_use_id: 'tool-use-1',
|
||||
input: { command: 'ls' },
|
||||
permission_suggestions: null,
|
||||
blocked_path: '/etc/passwd',
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('emits a control_response with the supplied request_id and allowed flag', () => {
|
||||
const adapter = new StreamJsonOutputAdapter(mockConfig, false);
|
||||
stdoutWriteSpy.mockClear();
|
||||
|
||||
adapter.emitControlResponse('req-1', true);
|
||||
|
||||
expect(stdoutWriteSpy).toHaveBeenCalledTimes(1);
|
||||
const parsed = JSON.parse(stdoutWriteSpy.mock.calls[0][0] as string);
|
||||
expect(parsed).toEqual({
|
||||
type: 'control_response',
|
||||
response: {
|
||||
subtype: 'success',
|
||||
request_id: 'req-1',
|
||||
response: { allowed: true },
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('defaults blocked_path to null when omitted', () => {
|
||||
const adapter = new StreamJsonOutputAdapter(mockConfig, false);
|
||||
stdoutWriteSpy.mockClear();
|
||||
|
||||
adapter.emitPermissionRequest('req-2', 'read_file', 'tool-use-2', {
|
||||
path: 'README.md',
|
||||
});
|
||||
|
||||
const parsed = JSON.parse(stdoutWriteSpy.mock.calls[0][0] as string);
|
||||
expect(parsed.request.blocked_path).toBeNull();
|
||||
});
|
||||
|
||||
it('emitControlError produces a control_response with subtype error', () => {
|
||||
const adapter = new StreamJsonOutputAdapter(mockConfig, false);
|
||||
stdoutWriteSpy.mockClear();
|
||||
|
||||
adapter.emitControlError('req-x', 'unknown request_id');
|
||||
|
||||
expect(stdoutWriteSpy).toHaveBeenCalledTimes(1);
|
||||
const parsed = JSON.parse(stdoutWriteSpy.mock.calls[0][0] as string);
|
||||
expect(parsed).toEqual({
|
||||
type: 'control_response',
|
||||
response: {
|
||||
subtype: 'error',
|
||||
request_id: 'req-x',
|
||||
error: 'unknown request_id',
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -37,16 +37,19 @@ export class StreamJsonOutputAdapter
|
|||
implements JsonOutputAdapterInterface
|
||||
{
|
||||
private mainTurnMessageStartEmitted = false;
|
||||
private readonly outputStream: NodeJS.WritableStream;
|
||||
|
||||
constructor(
|
||||
config: Config,
|
||||
private readonly includePartialMessages: boolean,
|
||||
outputStream?: NodeJS.WritableStream,
|
||||
) {
|
||||
super(config);
|
||||
this.outputStream = outputStream ?? process.stdout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits message immediately to stdout (stream mode).
|
||||
* Emits message immediately to the output stream (stream mode).
|
||||
*/
|
||||
protected emitMessageImpl(message: CLIMessage | ControlMessage): void {
|
||||
// Track assistant messages for result generation
|
||||
|
|
@ -60,7 +63,15 @@ export class StreamJsonOutputAdapter
|
|||
}
|
||||
|
||||
// Emit messages immediately in stream mode
|
||||
process.stdout.write(`${JSON.stringify(message)}\n`);
|
||||
this.outputStream.write(`${JSON.stringify(message)}\n`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Control-plane messages (control_request / control_response) share the
|
||||
* same transport as data messages in stream mode.
|
||||
*/
|
||||
protected override emitControlMessageImpl(message: ControlMessage): void {
|
||||
this.outputStream.write(`${JSON.stringify(message)}\n`);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
7
packages/cli/src/nonInteractive/io/index.ts
Normal file
7
packages/cli/src/nonInteractive/io/index.ts
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
export { StreamJsonOutputAdapter } from './StreamJsonOutputAdapter.js';
|
||||
13
packages/cli/src/remoteInput/RemoteInputContext.tsx
Normal file
13
packages/cli/src/remoteInput/RemoteInputContext.tsx
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { createContext, useContext } from 'react';
|
||||
import type { RemoteInputWatcher } from './RemoteInputWatcher.js';
|
||||
|
||||
export const RemoteInputContext = createContext<RemoteInputWatcher | null>(
|
||||
null,
|
||||
);
|
||||
export const useRemoteInput = () => useContext(RemoteInputContext);
|
||||
134
packages/cli/src/remoteInput/RemoteInputWatcher.test.ts
Normal file
134
packages/cli/src/remoteInput/RemoteInputWatcher.test.ts
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||
import * as fs from 'node:fs';
|
||||
import * as os from 'node:os';
|
||||
import * as path from 'node:path';
|
||||
import { RemoteInputWatcher } from './RemoteInputWatcher.js';
|
||||
|
||||
describe('RemoteInputWatcher', () => {
|
||||
let tmpDir: string;
|
||||
let inputFile: string;
|
||||
let watcher: RemoteInputWatcher | null = null;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'qwen-remote-input-'));
|
||||
inputFile = path.join(tmpDir, 'input.jsonl');
|
||||
fs.writeFileSync(inputFile, '');
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
watcher?.shutdown();
|
||||
watcher = null;
|
||||
// Give fs handles a tick to release (needed on Windows)
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
try {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
} catch {
|
||||
// Ignore cleanup errors on Windows where file handles may linger
|
||||
}
|
||||
});
|
||||
|
||||
it('forwards submit commands to the registered submit fn', async () => {
|
||||
watcher = new RemoteInputWatcher(inputFile);
|
||||
const submitted: string[] = [];
|
||||
watcher.setSubmitFn((text) => {
|
||||
submitted.push(text);
|
||||
});
|
||||
|
||||
fs.appendFileSync(
|
||||
inputFile,
|
||||
JSON.stringify({ type: 'submit', text: 'hello' }) + '\n',
|
||||
);
|
||||
|
||||
await watcher.checkForNewInput();
|
||||
expect(submitted).toEqual(['hello']);
|
||||
});
|
||||
|
||||
it('dispatches confirmation_response immediately, bypassing the queue', async () => {
|
||||
watcher = new RemoteInputWatcher(inputFile);
|
||||
const handler = vi.fn();
|
||||
watcher.setConfirmationHandler(handler);
|
||||
|
||||
fs.appendFileSync(
|
||||
inputFile,
|
||||
JSON.stringify({
|
||||
type: 'confirmation_response',
|
||||
request_id: 'req-7',
|
||||
allowed: true,
|
||||
}) + '\n',
|
||||
);
|
||||
|
||||
await watcher.checkForNewInput();
|
||||
expect(handler).toHaveBeenCalledWith('req-7', true);
|
||||
});
|
||||
|
||||
it('retries queued submits when the TUI signals it has become idle', async () => {
|
||||
watcher = new RemoteInputWatcher(inputFile);
|
||||
|
||||
let busy = true;
|
||||
const accepted: string[] = [];
|
||||
watcher.setSubmitFn((text) => {
|
||||
if (busy) return false; // simulate TUI rejecting because it is responding
|
||||
accepted.push(text);
|
||||
return true;
|
||||
});
|
||||
|
||||
fs.appendFileSync(
|
||||
inputFile,
|
||||
JSON.stringify({ type: 'submit', text: 'queued' }) + '\n',
|
||||
);
|
||||
|
||||
// Trigger read — command will be queued then submitted, but TUI rejects (busy)
|
||||
await watcher.checkForNewInput();
|
||||
// processQueue runs async; give it a tick
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(accepted).toEqual([]);
|
||||
|
||||
busy = false;
|
||||
watcher.notifyIdle();
|
||||
|
||||
// processQueue runs async; give it a tick
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(accepted).toEqual(['queued']);
|
||||
});
|
||||
|
||||
it('skips malformed JSON lines without throwing', async () => {
|
||||
watcher = new RemoteInputWatcher(inputFile);
|
||||
const submitted: string[] = [];
|
||||
watcher.setSubmitFn((text) => {
|
||||
submitted.push(text);
|
||||
});
|
||||
|
||||
fs.appendFileSync(inputFile, 'not-json\n');
|
||||
fs.appendFileSync(
|
||||
inputFile,
|
||||
JSON.stringify({ type: 'submit', text: 'after-bad-line' }) + '\n',
|
||||
);
|
||||
|
||||
await watcher.checkForNewInput();
|
||||
expect(submitted).toEqual(['after-bad-line']);
|
||||
});
|
||||
|
||||
it('stops watching after shutdown', async () => {
|
||||
watcher = new RemoteInputWatcher(inputFile);
|
||||
const submitted: string[] = [];
|
||||
watcher.setSubmitFn((text) => {
|
||||
submitted.push(text);
|
||||
});
|
||||
watcher.shutdown();
|
||||
|
||||
fs.appendFileSync(
|
||||
inputFile,
|
||||
JSON.stringify({ type: 'submit', text: 'too-late' }) + '\n',
|
||||
);
|
||||
|
||||
// checkForNewInput should be a no-op after shutdown (active=false)
|
||||
await watcher.checkForNewInput();
|
||||
expect(submitted).toEqual([]);
|
||||
});
|
||||
});
|
||||
247
packages/cli/src/remoteInput/RemoteInputWatcher.ts
Normal file
247
packages/cli/src/remoteInput/RemoteInputWatcher.ts
Normal file
|
|
@ -0,0 +1,247 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { createReadStream, watchFile, unwatchFile, statSync } from 'node:fs';
|
||||
import { createInterface } from 'node:readline';
|
||||
import { createDebugLogger } from '@qwen-code/qwen-code-core';
|
||||
|
||||
const debugLogger = createDebugLogger('REMOTE_INPUT');
|
||||
|
||||
/**
|
||||
* JSONL command shapes written by an external process (IDE extension,
|
||||
* web frontend, automation script) into the file passed to --input-file.
|
||||
*
|
||||
* - `submit`: enqueue a user message that the TUI processes as if typed
|
||||
* into the prompt.
|
||||
* - `confirmation_response`: reply to a pending tool-permission
|
||||
* `control_request` previously emitted on the dual-output channel.
|
||||
*/
|
||||
export type RemoteInputCommand =
|
||||
| { type: 'submit'; text: string }
|
||||
| { type: 'confirmation_response'; request_id: string; allowed: boolean };
|
||||
|
||||
/**
|
||||
* Callback invoked when a `confirmation_response` command is read.
|
||||
*/
|
||||
export type ConfirmationHandler = (requestId: string, allowed: boolean) => void;
|
||||
|
||||
/**
|
||||
* Callback type for submitting a query from remote input.
|
||||
* Returns true if the submit was accepted, false if rejected (TUI busy).
|
||||
*/
|
||||
export type SubmitFn = (
|
||||
query: string,
|
||||
) => Promise<boolean | void> | boolean | void;
|
||||
|
||||
/**
|
||||
* Watches a JSONL file for remote input commands and calls the registered
|
||||
* submit function when new commands arrive.
|
||||
*
|
||||
* The watcher queues commands and retries when the TUI is busy (responding).
|
||||
* Call `notifyIdle()` when the TUI transitions to idle state to trigger
|
||||
* processing of queued commands.
|
||||
*/
|
||||
export class RemoteInputWatcher {
|
||||
private submitFn: SubmitFn | null = null;
|
||||
private confirmationHandler: ConfirmationHandler | null = null;
|
||||
private queue: Array<Extract<RemoteInputCommand, { type: 'submit' }>> = [];
|
||||
private processing = false;
|
||||
private active = true;
|
||||
private bytesRead = 0;
|
||||
private reading = false;
|
||||
private filePath: string;
|
||||
private retryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private readonly pollIntervalMs: number;
|
||||
|
||||
constructor(filePath: string, options?: { pollIntervalMs?: number }) {
|
||||
this.filePath = filePath;
|
||||
this.pollIntervalMs = options?.pollIntervalMs ?? 500;
|
||||
this.startWatching();
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the TUI's submit function. Called from AppContainer
|
||||
* once useGeminiStream's submitQuery is available.
|
||||
*/
|
||||
setSubmitFn(fn: SubmitFn): void {
|
||||
this.submitFn = fn;
|
||||
this.processQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the handler invoked when a `confirmation_response` command is
|
||||
* read from the input file. Used to bridge external approvals back into
|
||||
* the tool's `onConfirm` callback.
|
||||
*/
|
||||
setConfirmationHandler(fn: ConfirmationHandler): void {
|
||||
this.confirmationHandler = fn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the watcher that the TUI has become idle.
|
||||
* Call this when streamingState transitions to Idle — it triggers
|
||||
* processing of any queued commands that were deferred due to TUI busy.
|
||||
*/
|
||||
notifyIdle(): void {
|
||||
if (this.queue.length > 0 && !this.processing) {
|
||||
this.processQueue();
|
||||
}
|
||||
}
|
||||
|
||||
private startWatching(): void {
|
||||
try {
|
||||
const stat = statSync(this.filePath);
|
||||
this.bytesRead = stat.size;
|
||||
} catch {
|
||||
this.bytesRead = 0;
|
||||
}
|
||||
|
||||
watchFile(this.filePath, { interval: this.pollIntervalMs }, () => {
|
||||
if (!this.active) return;
|
||||
this.readNewLines();
|
||||
});
|
||||
|
||||
debugLogger.debug(`RemoteInput: watching ${this.filePath}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually trigger a check for new input. Returns a promise that resolves
|
||||
* once any new lines have been read and processed. In production the
|
||||
* `watchFile` poll calls this automatically; tests can call it directly
|
||||
* to avoid depending on filesystem-polling timing.
|
||||
*/
|
||||
checkForNewInput(): Promise<void> {
|
||||
return this.readNewLines();
|
||||
}
|
||||
|
||||
private readNewLines(): Promise<void> {
|
||||
if (!this.active || this.reading) return Promise.resolve();
|
||||
|
||||
let currentSize: number;
|
||||
try {
|
||||
const stat = statSync(this.filePath);
|
||||
currentSize = stat.size;
|
||||
} catch {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
if (currentSize <= this.bytesRead) return Promise.resolve();
|
||||
|
||||
this.reading = true;
|
||||
const stream = createReadStream(this.filePath, {
|
||||
start: this.bytesRead,
|
||||
encoding: 'utf-8',
|
||||
});
|
||||
const rl = createInterface({ input: stream, crlfDelay: Infinity });
|
||||
|
||||
rl.on('line', (line) => {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) return;
|
||||
try {
|
||||
const cmd = JSON.parse(trimmed);
|
||||
// confirmation_response is dispatched immediately rather than queued:
|
||||
// a pending tool call is blocking and the response must reach
|
||||
// onConfirm without waiting for any earlier `submit` to finish.
|
||||
if (
|
||||
cmd &&
|
||||
cmd.type === 'confirmation_response' &&
|
||||
typeof cmd.request_id === 'string' &&
|
||||
typeof cmd.allowed === 'boolean'
|
||||
) {
|
||||
debugLogger.debug(
|
||||
`RemoteInput: confirmation_response for ${cmd.request_id} (allowed=${cmd.allowed})`,
|
||||
);
|
||||
this.confirmationHandler?.(cmd.request_id, cmd.allowed);
|
||||
} else if (
|
||||
cmd &&
|
||||
cmd.type === 'submit' &&
|
||||
typeof cmd.text === 'string'
|
||||
) {
|
||||
debugLogger.debug(
|
||||
`RemoteInput: queued command: ${cmd.text.slice(0, 50)}...`,
|
||||
);
|
||||
this.queue.push(
|
||||
cmd as Extract<RemoteInputCommand, { type: 'submit' }>,
|
||||
);
|
||||
} else {
|
||||
debugLogger.warn(
|
||||
`RemoteInput: unknown command type: ${String(cmd?.type)}`,
|
||||
);
|
||||
}
|
||||
} catch (_err) {
|
||||
debugLogger.warn(`RemoteInput: failed to parse line: ${trimmed}`);
|
||||
}
|
||||
});
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
rl.on('close', () => {
|
||||
this.bytesRead = currentSize;
|
||||
this.reading = false;
|
||||
this.processQueue();
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private async processQueue(): Promise<void> {
|
||||
if (this.processing || !this.submitFn || this.queue.length === 0) return;
|
||||
|
||||
this.processing = true;
|
||||
if (this.retryTimer) {
|
||||
clearTimeout(this.retryTimer);
|
||||
this.retryTimer = null;
|
||||
}
|
||||
|
||||
try {
|
||||
while (this.queue.length > 0 && this.active) {
|
||||
if (!this.submitFn) break;
|
||||
const cmd = this.queue[0]!; // peek, don't shift yet
|
||||
debugLogger.debug(
|
||||
`RemoteInput: submitting: ${cmd.text.slice(0, 50)}...`,
|
||||
);
|
||||
try {
|
||||
const result = await this.submitFn(cmd.text);
|
||||
// If submitFn returns false explicitly, the TUI rejected it (busy)
|
||||
if (result === false) {
|
||||
debugLogger.debug('RemoteInput: TUI busy, will retry on idle');
|
||||
this.scheduleRetry();
|
||||
break;
|
||||
}
|
||||
// Success — remove from queue
|
||||
this.queue.shift();
|
||||
} catch (err) {
|
||||
debugLogger.error('RemoteInput: submit failed:', err);
|
||||
this.queue.shift(); // remove failed command to avoid infinite retry
|
||||
}
|
||||
// Small delay between commands to let the TUI process
|
||||
if (this.queue.length > 0) {
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.processing = false;
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleRetry(): void {
|
||||
if (this.retryTimer) return;
|
||||
// Retry after 2s if notifyIdle hasn't been called
|
||||
this.retryTimer = setTimeout(() => {
|
||||
this.retryTimer = null;
|
||||
if (this.queue.length > 0 && !this.processing) {
|
||||
this.processQueue();
|
||||
}
|
||||
}, 2000);
|
||||
}
|
||||
|
||||
shutdown(): void {
|
||||
this.active = false;
|
||||
unwatchFile(this.filePath);
|
||||
if (this.retryTimer) clearTimeout(this.retryTimer);
|
||||
this.queue.length = 0;
|
||||
debugLogger.debug('RemoteInput: shut down');
|
||||
}
|
||||
}
|
||||
8
packages/cli/src/remoteInput/index.ts
Normal file
8
packages/cli/src/remoteInput/index.ts
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
export { RemoteInputWatcher } from './RemoteInputWatcher.js';
|
||||
export { RemoteInputContext, useRemoteInput } from './RemoteInputContext.js';
|
||||
|
|
@ -54,6 +54,8 @@ import {
|
|||
ApprovalMode,
|
||||
ConditionalRulesRegistry,
|
||||
type PermissionMode,
|
||||
ToolConfirmationOutcome,
|
||||
type WaitingToolCall,
|
||||
} from '@qwen-code/qwen-code-core';
|
||||
import { buildResumedHistoryItems } from './utils/resumeHistoryUtils.js';
|
||||
import { validateAuthMethod } from '../config/auth.js';
|
||||
|
|
@ -128,6 +130,8 @@ import { useMemoryDialog } from './hooks/useMemoryDialog.js';
|
|||
import { useAttentionNotifications } from './hooks/useAttentionNotifications.js';
|
||||
import { useContextualTips } from './hooks/useContextualTips.js';
|
||||
import { getTipHistory } from '../services/tips/index.js';
|
||||
import { useRemoteInput } from '../remoteInput/RemoteInputContext.js';
|
||||
import { useDualOutput } from '../dualOutput/DualOutputContext.js';
|
||||
import {
|
||||
requestConsentInteractive,
|
||||
requestConsentOrFail,
|
||||
|
|
@ -753,6 +757,7 @@ export const AppContainer = (props: AppContainerProps) => {
|
|||
handleApprovalModeChange,
|
||||
activePtyId,
|
||||
loopDetectionConfirmationRequest,
|
||||
pendingToolCalls,
|
||||
} = useGeminiStream(
|
||||
config.getGeminiClient(),
|
||||
historyManager.history,
|
||||
|
|
@ -846,6 +851,132 @@ export const AppContainer = (props: AppContainerProps) => {
|
|||
// stays consistent with popAllMessages even before React re-renders.
|
||||
midTurnDrainRef.current = drainQueue;
|
||||
|
||||
// Connect remote input watcher to submitQuery for bidirectional sync.
|
||||
// When an external process writes a command to the input-file,
|
||||
// the watcher calls submitQuery as if the user typed it in the TUI.
|
||||
const remoteInput = useRemoteInput();
|
||||
useEffect(() => {
|
||||
if (!remoteInput) return;
|
||||
remoteInput.setSubmitFn((text: string) => submitQuery(text));
|
||||
}, [remoteInput, submitQuery]);
|
||||
|
||||
// Notify remote input watcher when TUI becomes idle so it can
|
||||
// retry queued commands that were deferred while TUI was busy.
|
||||
useEffect(() => {
|
||||
if (!remoteInput) return;
|
||||
if (streamingState === StreamingState.Idle) {
|
||||
remoteInput.notifyIdle();
|
||||
}
|
||||
}, [remoteInput, streamingState]);
|
||||
|
||||
// Dual-output tool-confirmation bridge.
|
||||
//
|
||||
// When a tool call enters awaiting_approval we emit a `control_request`
|
||||
// (subtype: can_use_tool) on the dual-output channel so an external
|
||||
// consumer can decide. Whichever side resolves first (TUI native UI or
|
||||
// `confirmation_response` written to --input-file) wins; we always emit
|
||||
// a `control_response` mirroring the final decision so observers stay in
|
||||
// sync.
|
||||
const dualOutput = useDualOutput();
|
||||
const confirmRequestMap = useRef(new Map<string, string>()); // requestId → callId
|
||||
const confirmCallIdMap = useRef(new Map<string, string>()); // callId → requestId
|
||||
const confirmEmitted = useRef(new Set<string>());
|
||||
|
||||
useEffect(() => {
|
||||
if (!dualOutput || !dualOutput.isConnected) return;
|
||||
for (const tc of pendingToolCalls) {
|
||||
if (
|
||||
tc.status === 'awaiting_approval' &&
|
||||
!confirmEmitted.current.has(tc.request.callId)
|
||||
) {
|
||||
const requestId = crypto.randomUUID();
|
||||
confirmRequestMap.current.set(requestId, tc.request.callId);
|
||||
confirmCallIdMap.current.set(tc.request.callId, requestId);
|
||||
confirmEmitted.current.add(tc.request.callId);
|
||||
dualOutput.emitPermissionRequest(
|
||||
requestId,
|
||||
tc.request.name,
|
||||
tc.request.callId,
|
||||
tc.request.args,
|
||||
);
|
||||
}
|
||||
}
|
||||
// Detect tools that left awaiting_approval (TUI-native resolution) so we
|
||||
// can emit a matching control_response back to external consumers.
|
||||
for (const [callId, requestId] of confirmCallIdMap.current) {
|
||||
const tc = pendingToolCalls.find((t) => t.request.callId === callId);
|
||||
if (
|
||||
tc &&
|
||||
tc.status !== 'awaiting_approval' &&
|
||||
confirmEmitted.current.has(callId)
|
||||
) {
|
||||
const allowed = tc.status !== 'cancelled' && tc.status !== 'error';
|
||||
dualOutput.emitControlResponse(requestId, allowed);
|
||||
confirmRequestMap.current.delete(requestId);
|
||||
confirmCallIdMap.current.delete(callId);
|
||||
confirmEmitted.current.delete(callId);
|
||||
}
|
||||
}
|
||||
}, [dualOutput, pendingToolCalls]);
|
||||
|
||||
// Keep latest state in refs so the confirmation handler (registered once)
|
||||
// always reads current values without needing re-registration.
|
||||
const pendingToolCallsRef = useRef(pendingToolCalls);
|
||||
pendingToolCallsRef.current = pendingToolCalls;
|
||||
const dualOutputRef = useRef(dualOutput);
|
||||
dualOutputRef.current = dualOutput;
|
||||
|
||||
// Route confirmation_response commands written to --input-file back into
|
||||
// the tool's onConfirm handler. Registered once (deps: [remoteInput]) to
|
||||
// avoid teardown/re-registration churn on every pendingToolCalls change.
|
||||
useEffect(() => {
|
||||
if (!remoteInput) return;
|
||||
remoteInput.setConfirmationHandler(
|
||||
(requestId: string, allowed: boolean) => {
|
||||
const callId = confirmRequestMap.current.get(requestId);
|
||||
if (!callId) {
|
||||
dualOutputRef.current?.emitControlError(
|
||||
requestId,
|
||||
'unknown request_id (already resolved, cancelled, or never issued)',
|
||||
);
|
||||
return;
|
||||
}
|
||||
const tc = pendingToolCallsRef.current.find(
|
||||
(t) =>
|
||||
t.request.callId === callId && t.status === 'awaiting_approval',
|
||||
);
|
||||
if (!tc) {
|
||||
dualOutputRef.current?.emitControlError(
|
||||
requestId,
|
||||
'tool call is no longer awaiting approval',
|
||||
);
|
||||
return;
|
||||
}
|
||||
const waitingTc = tc as WaitingToolCall;
|
||||
if (!waitingTc.confirmationDetails?.onConfirm) {
|
||||
dualOutputRef.current?.emitControlError(
|
||||
requestId,
|
||||
'tool call has no onConfirm handler',
|
||||
);
|
||||
return;
|
||||
}
|
||||
void waitingTc.confirmationDetails.onConfirm(
|
||||
allowed
|
||||
? ToolConfirmationOutcome.ProceedOnce
|
||||
: ToolConfirmationOutcome.Cancel,
|
||||
);
|
||||
// Do NOT clean up maps here — let the mirror useEffect (line ~870)
|
||||
// detect the state transition and emit control_response + clean up,
|
||||
// keeping the emission path symmetric for both TUI-native and
|
||||
// external-initiated resolutions.
|
||||
},
|
||||
);
|
||||
|
||||
return () => {
|
||||
remoteInput.setConfirmationHandler(() => {});
|
||||
};
|
||||
}, [remoteInput]);
|
||||
|
||||
// Callback for handling final submit (must be after addMessage from useMessageQueue)
|
||||
const handleFinalSubmit = useCallback(
|
||||
(submittedValue: string) => {
|
||||
|
|
|
|||
|
|
@ -76,6 +76,7 @@ import path from 'node:path';
|
|||
import { useSessionStats } from '../contexts/SessionContext.js';
|
||||
import type { LoadedSettings } from '../../config/settings.js';
|
||||
import { t } from '../../i18n/index.js';
|
||||
import { useDualOutput } from '../../dualOutput/DualOutputContext.js';
|
||||
|
||||
const debugLogger = createDebugLogger('GEMINI_STREAM');
|
||||
|
||||
|
|
@ -219,6 +220,7 @@ export const useGeminiStream = (
|
|||
const isSubmittingQueryRef = useRef(false);
|
||||
const lastPromptRef = useRef<PartListUnion | null>(null);
|
||||
const lastPromptErroredRef = useRef(false);
|
||||
const dualOutput = useDualOutput();
|
||||
const [isResponding, setIsResponding] = useState<boolean>(false);
|
||||
const [thought, setThought] = useState<ThoughtSummary | null>(null);
|
||||
const [pendingHistoryItem, pendingHistoryItemRef, setPendingHistoryItem] =
|
||||
|
|
@ -1110,7 +1112,9 @@ export const useGeminiStream = (
|
|||
let geminiMessageBuffer = '';
|
||||
let thoughtBuffer = '';
|
||||
const toolCallRequests: ToolCallRequestInfo[] = [];
|
||||
dualOutput?.startAssistantMessage();
|
||||
for await (const event of stream) {
|
||||
dualOutput?.processEvent(event);
|
||||
switch (event.type) {
|
||||
case ServerGeminiEventType.Thought:
|
||||
// If the thought has a subject, it's a discrete status update rather than
|
||||
|
|
@ -1212,6 +1216,7 @@ export const useGeminiStream = (
|
|||
}
|
||||
}
|
||||
}
|
||||
dualOutput?.finalizeAssistantMessage();
|
||||
if (toolCallRequests.length > 0) {
|
||||
scheduleToolCalls(toolCallRequests, signal);
|
||||
}
|
||||
|
|
@ -1236,6 +1241,7 @@ export const useGeminiStream = (
|
|||
handleUserPromptSubmitBlockedEvent,
|
||||
handleStopHookLoopEvent,
|
||||
addItem,
|
||||
dualOutput,
|
||||
],
|
||||
);
|
||||
|
||||
|
|
@ -1382,6 +1388,22 @@ export const useGeminiStream = (
|
|||
setInitError(null);
|
||||
|
||||
try {
|
||||
// Emit user message to dual output sidecar (if enabled).
|
||||
// Skip for tool-result submissions — those are emitted separately
|
||||
// when the tool completes.
|
||||
if (dualOutput && submitType !== SendMessageType.ToolResult) {
|
||||
const rawParts =
|
||||
typeof finalQueryToSend === 'string'
|
||||
? [finalQueryToSend]
|
||||
: Array.isArray(finalQueryToSend)
|
||||
? finalQueryToSend
|
||||
: [finalQueryToSend];
|
||||
const userParts: Part[] = rawParts.map((p) =>
|
||||
typeof p === 'string' ? { text: p } : p,
|
||||
);
|
||||
dualOutput.emitUserMessage(userParts);
|
||||
}
|
||||
|
||||
const stream = geminiClient.sendMessageStream(
|
||||
finalQueryToSend,
|
||||
abortSignal,
|
||||
|
|
@ -1489,6 +1511,7 @@ export const useGeminiStream = (
|
|||
pendingRetryCountdownItemRef,
|
||||
pendingRetryErrorItemRef,
|
||||
setPendingRetryErrorItem,
|
||||
dualOutput,
|
||||
],
|
||||
);
|
||||
|
||||
|
|
@ -1694,6 +1717,13 @@ export const useGeminiStream = (
|
|||
}
|
||||
}
|
||||
|
||||
// Emit tool results to dual output sidecar (if enabled)
|
||||
if (dualOutput) {
|
||||
for (const toolCall of geminiTools) {
|
||||
dualOutput.emitToolResult(toolCall.request, toolCall.response);
|
||||
}
|
||||
}
|
||||
|
||||
markToolsAsSubmitted(callIdsToMarkAsSubmitted);
|
||||
|
||||
// Don't continue if model was switched due to quota error
|
||||
|
|
@ -1730,6 +1760,7 @@ export const useGeminiStream = (
|
|||
config,
|
||||
midTurnDrainRef,
|
||||
addItem,
|
||||
dualOutput,
|
||||
],
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -443,6 +443,26 @@ export interface ConfigParameters {
|
|||
sdkMode?: boolean;
|
||||
sessionSubagents?: SubagentConfig[];
|
||||
channel?: string;
|
||||
/**
|
||||
* File descriptor number for structured JSON event output (dual output mode).
|
||||
* When set, Qwen Code outputs structured JSON events to this fd while
|
||||
* continuing to render the TUI on stdout. The caller must provide this fd
|
||||
* via spawn stdio configuration.
|
||||
* Mutually exclusive with jsonFile.
|
||||
*/
|
||||
jsonFd?: number;
|
||||
/**
|
||||
* File path for structured JSON event output (dual output mode).
|
||||
* Can be a regular file, FIFO (named pipe), or /dev/fd/N.
|
||||
* Mutually exclusive with jsonFd.
|
||||
*/
|
||||
jsonFile?: string;
|
||||
/**
|
||||
* File path for receiving remote input commands (bidirectional sync mode).
|
||||
* An external process writes JSONL commands to this file, and the TUI
|
||||
* watches it to process messages as if the user typed them.
|
||||
*/
|
||||
inputFile?: string;
|
||||
/** Model providers configuration grouped by authType */
|
||||
modelProvidersConfig?: ModelProvidersConfig;
|
||||
/** Multi-agent collaboration settings (Arena, Team, Swarm) */
|
||||
|
|
@ -658,6 +678,9 @@ export class Config {
|
|||
private readonly truncateToolOutputLines: number;
|
||||
private readonly eventEmitter?: EventEmitter;
|
||||
private readonly channel: string | undefined;
|
||||
private readonly jsonFd: number | undefined;
|
||||
private readonly jsonFile: string | undefined;
|
||||
private readonly inputFile: string | undefined;
|
||||
private readonly defaultFileEncoding: FileEncodingType | undefined;
|
||||
private readonly enableManagedAutoMemory: boolean;
|
||||
private readonly enableManagedAutoDream: boolean;
|
||||
|
|
@ -801,6 +824,9 @@ export class Config {
|
|||
this.truncateToolOutputLines =
|
||||
params.truncateToolOutputLines ?? DEFAULT_TRUNCATE_TOOL_OUTPUT_LINES;
|
||||
this.channel = params.channel;
|
||||
this.jsonFd = params.jsonFd;
|
||||
this.jsonFile = params.jsonFile;
|
||||
this.inputFile = params.inputFile;
|
||||
this.defaultFileEncoding = params.defaultFileEncoding;
|
||||
this.storage = new Storage(this.targetDir);
|
||||
this.inputFormat = params.inputFormat ?? InputFormat.TEXT;
|
||||
|
|
@ -2204,6 +2230,31 @@ export class Config {
|
|||
return this.channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the file descriptor for dual output JSON event stream.
|
||||
* When set, the TUI mode will also emit structured JSON events to this fd.
|
||||
*/
|
||||
getJsonFd(): number | undefined {
|
||||
return this.jsonFd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the file path for dual output JSON event stream.
|
||||
* When set, the TUI mode will also emit structured JSON events to this file.
|
||||
*/
|
||||
getJsonFile(): string | undefined {
|
||||
return this.jsonFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the file path for remote input commands (bidirectional sync).
|
||||
* When set, the TUI mode will watch this file for JSONL commands written
|
||||
* by an external process and submit them as user messages.
|
||||
*/
|
||||
getInputFile(): string | undefined {
|
||||
return this.inputFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the default file encoding for new files.
|
||||
* @returns FileEncodingType
|
||||
|
|
|
|||
|
|
@ -125,6 +125,20 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"dualOutput": {
|
||||
"description": "Dual-output sidecar mode: emit structured JSON events to a second channel while the TUI renders normally on stdout. See docs/users/features/dual-output.md. CLI flags take precedence over these settings.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"jsonFile": {
|
||||
"description": "File path for structured JSON event output. Equivalent to --json-file. Ignored if --json-fd or --json-file is also set.",
|
||||
"type": "string"
|
||||
},
|
||||
"inputFile": {
|
||||
"description": "File path for remote input commands (JSONL). Equivalent to --input-file. Ignored if --input-file is also set.",
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"ui": {
|
||||
"description": "User interface settings.",
|
||||
"type": "object",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue