fix(core): throttle shell tool live text updates (#3902)

* fix(core): throttle shell tool live text updates

The previous lastUpdateTime = Date.now() at function entry meant the
first 'data' chunk's check (Date.now() - lastUpdateTime > INTERVAL)
was always false on first invocation, but shouldUpdate=true was set
unconditionally — so every text chunk forced a React render.

Initialize lastUpdateTime to NEGATIVE_INFINITY so the first chunk
always emits, then throttle subsequent text chunks to OUTPUT_UPDATE_INTERVAL_MS.
ANSI (Array<>) chunks are already throttled and deduped by
ShellExecutionService and continue to update at full rate.

Final ToolResult still carries the complete output after command
completion — only the live preview is throttled.

Generated with AI

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(core): add trailing-edge flush to shell throttle

wenshao CHANGES_REQUESTED (2026-05-07T22:42): the leading-edge-only throttle
left the last suppressed plain-text chunk unshown if the command went quiet
within the 1s window (e.g. a status line printed once and then no more output).

Fix: when a plain-text chunk is suppressed, schedule a setTimeout for the
remaining window duration that calls the existing doUpdate() helper. The timer
is cancelled if a subsequent leading-edge update arrives first (preventing a
redundant render), or when the command settles via await resultPromise.

The do-update logic is extracted into a local doUpdate() helper to avoid
duplicating the string/ANSI branching between the immediate path and the timer.

Test changes:
- Updated existing throttle test to reflect new 3-call sequence: leading-edge
  ('line 1'), trailing flush ('line 2'), leading-edge ('line 3').
- Added 'trailing flush' test: leading update fires, next chunk suppressed,
  time advances → trailing flush emits the last suppressed chunk.
- Added 'ANSI passthrough' test: two back-to-back ANSI chunks both trigger
  updateOutput immediately (ANSI branch bypasses throttle, regression guard).

Generated with AI

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(test): correct AnsiToken shape in shell ANSI throttle test

Use fg/bg string fields and remove non-existent properties
(strikethrough, hidden, blink, foreground, background) so the
object literals satisfy the AnsiToken interface and tsc passes.

Generated with AI

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(core): harden shell throttle timer lifecycle + add coverage

Centralizes trailing-flush timer cancellation in `doUpdate()` so every
emit path (leading-edge text, ANSI passthrough, binary_detected,
binary_progress) supersedes a pending timer instead of leaving a stale
one that could double-fire. Adds an abort listener so user-cancel /
timeout cancels the timer before the result settles, and wraps both
`ShellExecutionService.execute()` and `await resultPromise` in
try/finally so a thrown PTY import or rejected result still tears down
the timer + listener.

Adds five regression tests covering the lifecycle invariants flagged
in review:
  - 3+ rapid suppressed chunks coalesce into one trailing flush
  - command settling cancels a pending trailing-flush timer
  - leading-edge update path produces no duplicate trailing flush
  - abort signal cancels a pending trailing-flush timer
  - execute() rejection cleans up listeners (no late updateOutput)

Generated with AI

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

---------

Co-authored-by: 秦奇 <gary.gq@alibaba-inc.com>
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
ChiGao 2026-05-10 07:26:58 +08:00 committed by GitHub
parent 4e91dbaff0
commit f4d0ad6b7f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 495 additions and 65 deletions

View file

@ -736,7 +736,7 @@ describe('ShellTool', () => {
describe('Streaming to `updateOutput`', () => {
let updateOutputMock: Mock;
beforeEach(() => {
vi.useFakeTimers({ toFake: ['Date'] });
vi.useFakeTimers({ toFake: ['Date', 'setTimeout', 'clearTimeout'] });
updateOutputMock = vi.fn();
});
afterEach(() => {
@ -789,6 +789,364 @@ describe('ShellTool', () => {
});
await promise;
});
it('should throttle live text updates while preserving the latest output', async () => {
const invocation = shellTool.build({
command: 'npm test',
is_background: false,
});
const promise = invocation.execute(mockAbortSignal, updateOutputMock);
// Leading-edge fires immediately
mockShellOutputCallback({ type: 'data', chunk: 'line 1' });
expect(updateOutputMock).toHaveBeenCalledOnce();
expect(updateOutputMock).toHaveBeenLastCalledWith('line 1');
// Suppressed: trailing flush scheduled
mockShellOutputCallback({ type: 'data', chunk: 'line 2' });
expect(updateOutputMock).toHaveBeenCalledOnce();
// Advance time: trailing flush fires, emitting 'line 2'
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS + 1);
expect(updateOutputMock).toHaveBeenCalledTimes(2);
expect(updateOutputMock).toHaveBeenLastCalledWith('line 2');
// Advance time past the interval window again so next chunk fires immediately
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS + 1);
mockShellOutputCallback({ type: 'data', chunk: 'line 3' });
expect(updateOutputMock).toHaveBeenCalledTimes(3);
expect(updateOutputMock).toHaveBeenLastCalledWith('line 3');
resolveExecutionPromise({
rawOutput: Buffer.from('line 1\nline 2\nline 3'),
output: 'line 1\nline 2\nline 3',
exitCode: 0,
signal: null,
error: null,
aborted: false,
pid: 12345,
executionMethod: 'child_process',
});
await promise;
});
it('should flush the last suppressed text chunk when the command goes quiet', async () => {
const invocation = shellTool.build({
command: 'long-running-cmd',
is_background: false,
});
const promise = invocation.execute(mockAbortSignal, updateOutputMock);
// Leading-edge update
mockShellOutputCallback({ type: 'data', chunk: 'progress: 0%' });
expect(updateOutputMock).toHaveBeenCalledOnce();
// Suppressed: within the throttle window
mockShellOutputCallback({ type: 'data', chunk: 'progress: 50%' });
expect(updateOutputMock).toHaveBeenCalledOnce();
// Advance time to trigger the trailing flush timer
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS + 1);
// The trailing flush must have fired with the latest suppressed chunk
expect(updateOutputMock).toHaveBeenCalledTimes(2);
expect(updateOutputMock).toHaveBeenLastCalledWith('progress: 50%');
resolveExecutionPromise({
rawOutput: Buffer.from('progress: 50%'),
output: 'progress: 50%',
exitCode: 0,
signal: null,
error: null,
aborted: false,
pid: 12345,
executionMethod: 'child_process',
});
await promise;
});
it('should coalesce 3+ rapid text chunks within a window into a single trailing flush', async () => {
// Regression: in one throttle window, the leading-edge chunk fires
// immediately, and any subsequent chunks (regardless of count) are
// collapsed into ONE trailing flush carrying the latest text. The
// timer must not be repeatedly rescheduled per chunk — that would
// be wasteful and (depending on the math) could push the flush
// beyond the original window.
const invocation = shellTool.build({
command: 'streaming-cmd',
is_background: false,
});
const promise = invocation.execute(mockAbortSignal, updateOutputMock);
// Leading edge: fires immediately at t=0
mockShellOutputCallback({ type: 'data', chunk: 'chunk 1' });
expect(updateOutputMock).toHaveBeenCalledOnce();
expect(updateOutputMock).toHaveBeenLastCalledWith('chunk 1');
// Three rapid suppressed chunks within the same window. None of
// these should fire updateOutput synchronously, and the trailing
// flush should not have run yet.
await vi.advanceTimersByTimeAsync(50);
mockShellOutputCallback({ type: 'data', chunk: 'chunk 2' });
await vi.advanceTimersByTimeAsync(50);
mockShellOutputCallback({ type: 'data', chunk: 'chunk 3' });
await vi.advanceTimersByTimeAsync(50);
mockShellOutputCallback({ type: 'data', chunk: 'chunk 4' });
expect(updateOutputMock).toHaveBeenCalledOnce();
// Drain the throttle window. The single trailing flush should
// fire exactly once and carry the LATEST suppressed chunk.
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS + 1);
expect(updateOutputMock).toHaveBeenCalledTimes(2);
expect(updateOutputMock).toHaveBeenLastCalledWith('chunk 4');
resolveExecutionPromise({
rawOutput: Buffer.from('chunk 1chunk 2chunk 3chunk 4'),
output: 'chunk 1chunk 2chunk 3chunk 4',
exitCode: 0,
signal: null,
error: null,
aborted: false,
pid: 12345,
executionMethod: 'child_process',
});
await promise;
});
it('should cancel a pending trailing flush when the command completes', async () => {
// Lifecycle invariant: if the command resolves while a trailing
// flush timer is pending, the timer MUST be cancelled. Otherwise
// the timer would fire after `execute()` returns and trigger a
// phantom updateOutput call against stale `cumulativeOutput`,
// racing against the consumer that has already moved on.
const invocation = shellTool.build({
command: 'quick-cmd',
is_background: false,
});
const promise = invocation.execute(mockAbortSignal, updateOutputMock);
// Leading-edge update + suppressed chunk (timer pending)
mockShellOutputCallback({ type: 'data', chunk: 'first' });
expect(updateOutputMock).toHaveBeenCalledOnce();
mockShellOutputCallback({ type: 'data', chunk: 'second' });
expect(updateOutputMock).toHaveBeenCalledOnce();
// Resolve BEFORE the throttle window elapses. No further chunks.
resolveExecutionPromise({
rawOutput: Buffer.from('first\nsecond'),
output: 'first\nsecond',
exitCode: 0,
signal: null,
error: null,
aborted: false,
pid: 12345,
executionMethod: 'child_process',
});
await promise;
// Advancing time past the original window must not produce a
// late updateOutput call — the timer was cancelled on settle.
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS * 2);
expect(updateOutputMock).toHaveBeenCalledOnce();
});
it('should not fire a duplicate trailing flush after a leading-edge update', async () => {
// After a trailing flush emits in window N, the next chunk in
// window N+1 takes the leading-edge path. `doUpdate()` is the
// single point that cancels any pending trailing-flush timer,
// so even if a stale timer were somehow still scheduled when a
// leading-edge update fires, no duplicate updateOutput call can
// escape. This test asserts the end-to-end invariant: suppress
// → trailing flush → leading-edge → suppress → trailing flush
// produces exactly the expected sequence with no duplicates.
const invocation = shellTool.build({
command: 'multi-window-cmd',
is_background: false,
});
const promise = invocation.execute(mockAbortSignal, updateOutputMock);
// Window 1: leading-edge 'a' at t=0
mockShellOutputCallback({ type: 'data', chunk: 'a' });
expect(updateOutputMock).toHaveBeenCalledTimes(1);
expect(updateOutputMock).toHaveBeenLastCalledWith('a');
// Window 1: suppressed 'b' schedules trailing flush
await vi.advanceTimersByTimeAsync(100);
mockShellOutputCallback({ type: 'data', chunk: 'b' });
expect(updateOutputMock).toHaveBeenCalledTimes(1);
// Trailing flush fires at the window boundary with 'b'
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS);
expect(updateOutputMock).toHaveBeenCalledTimes(2);
expect(updateOutputMock).toHaveBeenLastCalledWith('b');
// Window 2: advance past the interval, next chunk takes the
// leading-edge path. If `doUpdate()` failed to cancel the (now
// already-fired) timer, no harm; if doUpdate fails to cancel a
// *future* timer scheduled later, we'd see duplicates below.
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS + 1);
mockShellOutputCallback({ type: 'data', chunk: 'c' });
expect(updateOutputMock).toHaveBeenCalledTimes(3);
expect(updateOutputMock).toHaveBeenLastCalledWith('c');
// Window 2: suppressed 'd' schedules another trailing flush
await vi.advanceTimersByTimeAsync(50);
mockShellOutputCallback({ type: 'data', chunk: 'd' });
expect(updateOutputMock).toHaveBeenCalledTimes(3);
// The trailing flush fires exactly once with 'd'.
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS);
expect(updateOutputMock).toHaveBeenCalledTimes(4);
expect(updateOutputMock).toHaveBeenLastCalledWith('d');
// Drain a long quiet period — no spurious late updates from
// any zombie timers.
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS * 5);
expect(updateOutputMock).toHaveBeenCalledTimes(4);
resolveExecutionPromise({
rawOutput: Buffer.from('abcd'),
output: 'abcd',
exitCode: 0,
signal: null,
error: null,
aborted: false,
pid: 12345,
executionMethod: 'child_process',
});
await promise;
});
it('should cancel a pending trailing flush when the abort signal fires', async () => {
// If the user cancels (or the timeout fires) while a trailing
// flush is pending, the abort listener must cancel the timer.
// Otherwise we'd flash a stale frame between the abort and the
// result promise settling with `aborted: true`.
const ac = new AbortController();
const invocation = shellTool.build({
command: 'sleep 1',
is_background: false,
});
const promise = invocation.execute(ac.signal, updateOutputMock);
// Leading-edge + suppressed (timer pending)
mockShellOutputCallback({ type: 'data', chunk: 'partial' });
expect(updateOutputMock).toHaveBeenCalledOnce();
mockShellOutputCallback({ type: 'data', chunk: 'more partial' });
expect(updateOutputMock).toHaveBeenCalledOnce();
// Abort. The timer must be cancelled synchronously.
ac.abort();
// Drain the would-be window. updateOutput must NOT be called.
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS * 2);
expect(updateOutputMock).toHaveBeenCalledOnce();
// Settle the execution as aborted so the test cleanly exits.
resolveExecutionPromise({
rawOutput: Buffer.from('partial'),
output: 'partial',
exitCode: null,
signal: 15,
error: null,
aborted: true,
pid: 12345,
executionMethod: 'child_process',
});
await promise;
// Even after settle + further time, no late update.
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS * 2);
expect(updateOutputMock).toHaveBeenCalledOnce();
});
it('should clean up a pending trailing flush if execute() rejects', async () => {
// ShellExecutionService.execute() can throw before resolving
// (e.g. PTY dynamic import failure). The tool must propagate the
// error AND ensure no scheduled timer survives to fire a late
// updateOutput call after the caller has already seen the error.
// (No chunks can arrive before execute() resolves, so the timer
// is never actually scheduled in this path. The contract we
// verify here is that the abort listener is torn down — which we
// observe indirectly via "no late update on subsequent abort".)
const ac = new AbortController();
mockShellExecutionService.mockImplementationOnce(() => {
throw new Error('pty-import-failed');
});
const invocation = shellTool.build({
command: 'pty-cmd',
is_background: false,
});
await expect(
invocation.execute(ac.signal, updateOutputMock),
).rejects.toThrow('pty-import-failed');
// After rejection, aborting must not crash and must not produce
// any updateOutput calls (no listener leak).
ac.abort();
await vi.advanceTimersByTimeAsync(OUTPUT_UPDATE_INTERVAL_MS * 2);
expect(updateOutputMock).not.toHaveBeenCalled();
});
it('should pass ANSI chunks through immediately without throttling', async () => {
const invocation = shellTool.build({
command: 'interactive-cmd',
is_background: false,
});
const promise = invocation.execute(mockAbortSignal, updateOutputMock);
const ansiChunk1: import('../utils/terminalSerializer.js').AnsiOutput =
[
[
{
text: 'Hello',
bold: false,
italic: false,
dim: false,
underline: false,
inverse: false,
fg: '',
bg: '',
},
],
];
const ansiChunk2: import('../utils/terminalSerializer.js').AnsiOutput =
[
[
{
text: 'World',
bold: false,
italic: false,
dim: false,
underline: false,
inverse: false,
fg: '',
bg: '',
},
],
];
// Both ANSI chunks should fire updateOutput immediately, back-to-back
mockShellOutputCallback({ type: 'data', chunk: ansiChunk1 });
mockShellOutputCallback({ type: 'data', chunk: ansiChunk2 });
expect(updateOutputMock).toHaveBeenCalledTimes(2);
resolveExecutionPromise({
rawOutput: Buffer.from(''),
output: '',
exitCode: 0,
signal: null,
error: null,
aborted: false,
pid: 12345,
executionMethod: 'child_process',
});
await promise;
});
});
describe('long-running foreground hint', () => {

View file

@ -1505,77 +1505,138 @@ export class ShellToolInvocation extends BaseToolInvocation<
: null;
let cumulativeOutput: string | AnsiOutput = '';
let lastUpdateTime = Date.now();
let lastUpdateTime = Number.NEGATIVE_INFINITY;
let isBinaryStream = false;
let totalLines = 0;
let totalBytes = 0;
let trailingFlushTimer: ReturnType<typeof setTimeout> | null = null;
const { result: resultPromise, pid } = await ShellExecutionService.execute(
commandToExecute,
cwd,
(event: ShellOutputEvent) => {
let shouldUpdate = false;
const cancelTrailingFlush = () => {
if (trailingFlushTimer !== null) {
clearTimeout(trailingFlushTimer);
trailingFlushTimer = null;
}
};
switch (event.type) {
case 'data':
if (isBinaryStream) break;
cumulativeOutput = event.chunk;
// Stats are only consumed by the ANSI-output branch below,
// so skip the per-chunk accounting for plain string chunks.
if (Array.isArray(event.chunk)) {
totalLines = event.chunk.length;
totalBytes = event.chunk.reduce(
(sum, line) =>
sum +
line.reduce(
(ls, token) => ls + Buffer.byteLength(token.text, 'utf-8'),
0,
),
0,
);
}
shouldUpdate = true;
break;
case 'binary_detected':
isBinaryStream = true;
cumulativeOutput = '[Binary output detected. Halting stream...]';
shouldUpdate = true;
break;
case 'binary_progress':
isBinaryStream = true;
cumulativeOutput = `[Receiving binary output... ${formatMemoryUsage(
event.bytesReceived,
)} received]`;
if (Date.now() - lastUpdateTime > OUTPUT_UPDATE_INTERVAL_MS) {
shouldUpdate = true;
}
break;
default: {
throw new Error('An unhandled ShellOutputEvent was found.');
const doUpdate = () => {
// Any path that emits an update supersedes a pending trailing flush —
// cancel centrally so leading-edge text, ANSI, binary_detected, and
// binary_progress branches all stay consistent without each having to
// remember to clear the timer themselves.
cancelTrailingFlush();
lastUpdateTime = Date.now();
if (!updateOutput) return;
if (typeof cumulativeOutput === 'string') {
updateOutput(cumulativeOutput);
} else {
updateOutput({
ansiOutput: cumulativeOutput,
totalLines,
totalBytes,
...(this.params.timeout != null && {
timeoutMs: this.params.timeout,
}),
});
}
};
// If the command is aborted (user cancel or timeout) while a trailing
// flush is pending, cancel the timer so we don't emit a stale frame
// between the abort signal firing and the result promise settling.
const onAbort = () => {
cancelTrailingFlush();
};
combinedSignal.addEventListener('abort', onAbort, { once: true });
const onShellOutputEvent = (event: ShellOutputEvent) => {
let shouldUpdate = false;
switch (event.type) {
case 'data':
if (isBinaryStream) break;
cumulativeOutput = event.chunk;
// Stats are only consumed by the ANSI-output branch below,
// so skip the per-chunk accounting for plain string chunks.
if (Array.isArray(event.chunk)) {
totalLines = event.chunk.length;
totalBytes = event.chunk.reduce(
(sum, line) =>
sum +
line.reduce(
(ls, token) => ls + Buffer.byteLength(token.text, 'utf-8'),
0,
),
0,
);
}
}
if (shouldUpdate && updateOutput) {
if (typeof cumulativeOutput === 'string') {
updateOutput(cumulativeOutput);
} else {
updateOutput({
ansiOutput: cumulativeOutput,
totalLines,
totalBytes,
// Only include timeout when user explicitly set it
...(this.params.timeout != null && {
timeoutMs: this.params.timeout,
}),
});
// ANSI output is already throttled and semantically deduped by
// ShellExecutionService, so preserve its live responsiveness.
// Plain text data can arrive in bursts and does not need every
// chunk to force a React render; the final ToolResult still
// carries the complete output after command completion.
if (Array.isArray(event.chunk)) {
shouldUpdate = true;
} else if (Date.now() - lastUpdateTime > OUTPUT_UPDATE_INTERVAL_MS) {
shouldUpdate = true;
} else if (trailingFlushTimer === null) {
// Throttled: schedule a trailing flush so the last suppressed
// chunk is still shown if the command goes quiet within the
// window. The timer's callback reads `cumulativeOutput` by
// closure, so subsequent suppressed chunks within the same
// window don't need to reschedule — the latest value will be
// emitted when the timer fires.
const remaining =
OUTPUT_UPDATE_INTERVAL_MS - (Date.now() - lastUpdateTime);
trailingFlushTimer = setTimeout(() => {
trailingFlushTimer = null;
doUpdate();
}, remaining);
}
lastUpdateTime = Date.now();
break;
case 'binary_detected':
isBinaryStream = true;
cumulativeOutput = '[Binary output detected. Halting stream...]';
shouldUpdate = true;
break;
case 'binary_progress':
isBinaryStream = true;
cumulativeOutput = `[Receiving binary output... ${formatMemoryUsage(
event.bytesReceived,
)} received]`;
if (Date.now() - lastUpdateTime > OUTPUT_UPDATE_INTERVAL_MS) {
shouldUpdate = true;
}
break;
default: {
throw new Error('An unhandled ShellOutputEvent was found.');
}
},
combinedSignal,
this.config.getShouldUseNodePtyShell(),
shellExecutionConfig ?? {},
);
}
if (shouldUpdate) {
doUpdate();
}
};
let executionHandle;
try {
executionHandle = await ShellExecutionService.execute(
commandToExecute,
cwd,
onShellOutputEvent,
combinedSignal,
this.config.getShouldUseNodePtyShell(),
shellExecutionConfig ?? {},
);
} catch (err) {
// ShellExecutionService.execute() can throw before resolving (e.g.
// PTY dynamic import failure). Tear down the abort listener and any
// (theoretically) scheduled trailing flush so nothing fires after we
// re-throw to the caller.
cancelTrailingFlush();
combinedSignal.removeEventListener('abort', onAbort);
throw err;
}
const { result: resultPromise, pid } = executionHandle;
if (pid && setPidCallback) {
setPidCallback(pid);
@ -1604,7 +1665,18 @@ export class ShellToolInvocation extends BaseToolInvocation<
// difference matters here.
const executionStartTime = performance.now();
const result = await resultPromise;
let result;
try {
result = await resultPromise;
} finally {
// Cancel any pending trailing flush — the command has settled (or
// threw) and either the final ToolResult carries the complete output
// or the caller will surface an error. Either way the timer must not
// fire a stale frame after we've returned. `finally` covers both the
// happy path and the (theoretical) reject path so no timer leaks.
cancelTrailingFlush();
combinedSignal.removeEventListener('abort', onAbort);
}
// Background-promote path: the user pressed Ctrl+B (PR-3 wires the
// keybind to `promoteAbortController.abort({ kind: 'background' })`),