mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-28 06:31:11 +00:00
fix(gateway): unblock sidecar startup
This commit is contained in:
parent
e60905d754
commit
59faa023fe
12 changed files with 287 additions and 21 deletions
|
|
@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai
|
|||
- CLI/message: resolve targeted `openclaw message` channels to their owning plugin before loading the registry, and fall back to configured channel plugins when the channel must be inferred, so scripted sends avoid full bundled plugin registry scans without assuming channel ids match plugin ids. Fixes #73006. Thanks @jasonftl.
|
||||
- Plugins/startup: parse strict JSON plugin manifests with native JSON first and keep JSON5 as the compatibility fallback, reducing manifest registry CPU during Gateway boot and CLI startup. Fixes #73011. Thanks @jasonftl.
|
||||
- CLI/models: keep route-first `models status --json` stdout reserved for the JSON payload by routing auth-profile and startup diagnostics to stderr. Fixes #72962. Thanks @vishutdhar.
|
||||
- Gateway/runtime: keep dirty-tree status calls from rebuilding live `dist`, clear stale task and restart state across in-process restarts, retry transient Discord lazy imports, and let channel startup continue after slow model warmup so browser, Discord, and voice-call sidecars come online. Thanks @vincentkoc.
|
||||
- Sessions: ignore future-dated session activity timestamps during reset freshness checks and cap future `updatedAt` values at the merge boundary so clock-skewed messages cannot keep stale sessions alive forever. Fixes #72989. Thanks @martingarramon.
|
||||
- Sessions: apply search, activity filters, and limits before gateway row enrichment so bounded session lists avoid scanning discarded transcripts. Carries forward #72978. Thanks @yeager.
|
||||
- Sessions: remove trajectory runtime and pointer sidecars when session maintenance prunes, caps, or disk-evicts their owning session, while preserving sidecars still referenced by live rows. Fixes #73000. Thanks @jared-rebel.
|
||||
|
|
|
|||
|
|
@ -145,21 +145,45 @@ async function loadDiscordVoiceRuntime(): Promise<DiscordVoiceRuntimeModule> {
|
|||
if (loadDiscordVoiceRuntimeForTesting) {
|
||||
return await loadDiscordVoiceRuntimeForTesting();
|
||||
}
|
||||
discordVoiceRuntimePromise ??= import("../voice/manager.runtime.js");
|
||||
return await discordVoiceRuntimePromise;
|
||||
const promise = discordVoiceRuntimePromise ?? import("../voice/manager.runtime.js");
|
||||
discordVoiceRuntimePromise = promise;
|
||||
try {
|
||||
return await promise;
|
||||
} catch (error) {
|
||||
if (discordVoiceRuntimePromise === promise) {
|
||||
discordVoiceRuntimePromise = undefined;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function loadDiscordProviderSessionRuntime(): Promise<DiscordProviderSessionRuntimeModule> {
|
||||
if (loadDiscordProviderSessionRuntimeForTesting) {
|
||||
return await loadDiscordProviderSessionRuntimeForTesting();
|
||||
}
|
||||
discordProviderSessionRuntimePromise ??= import("./provider-session.runtime.js");
|
||||
return await discordProviderSessionRuntimePromise;
|
||||
const promise = discordProviderSessionRuntimePromise ?? import("./provider-session.runtime.js");
|
||||
discordProviderSessionRuntimePromise = promise;
|
||||
try {
|
||||
return await promise;
|
||||
} catch (error) {
|
||||
if (discordProviderSessionRuntimePromise === promise) {
|
||||
discordProviderSessionRuntimePromise = undefined;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function loadPluginRuntime() {
|
||||
pluginRuntimePromise ??= import("openclaw/plugin-sdk/plugin-runtime");
|
||||
return await pluginRuntimePromise;
|
||||
const promise = pluginRuntimePromise ?? import("openclaw/plugin-sdk/plugin-runtime");
|
||||
pluginRuntimePromise = promise;
|
||||
try {
|
||||
return await promise;
|
||||
} catch (error) {
|
||||
if (pluginRuntimePromise === promise) {
|
||||
pluginRuntimePromise = undefined;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeBooleanForTesting(value: unknown): boolean | undefined {
|
||||
|
|
|
|||
|
|
@ -801,6 +801,15 @@ const writeBuildStamp = (deps) => {
|
|||
|
||||
const shouldSkipCleanWatchRuntimeSync = (deps) => deps.env.OPENCLAW_WATCH_MODE === "1";
|
||||
|
||||
const isGatewayClientCommand = (args) =>
|
||||
args[0] === "gateway" && (args[1] === "call" || args[1] === "status");
|
||||
|
||||
const shouldUseExistingDistForGatewayClient = (deps, buildRequirement) =>
|
||||
buildRequirement.reason === "dirty_watched_tree" &&
|
||||
isGatewayClientCommand(deps.args) &&
|
||||
deps.env.OPENCLAW_FORCE_BUILD !== "1" &&
|
||||
statMtime(deps.distEntry, deps.fs) != null;
|
||||
|
||||
export async function runNodeMain(params = {}) {
|
||||
const deps = {
|
||||
spawn: params.spawn ?? spawn,
|
||||
|
|
@ -834,9 +843,16 @@ export async function runNodeMain(params = {}) {
|
|||
|
||||
try {
|
||||
let exitCode = 1;
|
||||
const buildRequirement = resolveBuildRequirement(deps);
|
||||
let buildRequirement = resolveBuildRequirement(deps);
|
||||
const useExistingGatewayClientDist = shouldUseExistingDistForGatewayClient(
|
||||
deps,
|
||||
buildRequirement,
|
||||
);
|
||||
if (useExistingGatewayClientDist) {
|
||||
buildRequirement = { shouldBuild: false, reason: "gateway_client_existing_dist" };
|
||||
}
|
||||
if (!buildRequirement.shouldBuild) {
|
||||
if (!shouldSkipCleanWatchRuntimeSync(deps)) {
|
||||
if (!useExistingGatewayClientDist && !shouldSkipCleanWatchRuntimeSync(deps)) {
|
||||
const runtimePostBuildRequirement = resolveRuntimePostBuildRequirement(deps);
|
||||
if (runtimePostBuildRequirement.shouldSync) {
|
||||
const synced = await withRunNodeBuildLock(deps, async () => {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ const consumeGatewayRestartIntentSync = vi.fn(() => false);
|
|||
const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false);
|
||||
const markGatewaySigusr1RestartHandled = vi.fn();
|
||||
const peekGatewaySigusr1RestartReason = vi.fn<() => string | undefined>(() => undefined);
|
||||
const resetGatewayRestartStateForInProcessRestart = vi.fn();
|
||||
const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?: string }) => ({
|
||||
ok: true,
|
||||
pid: process.pid,
|
||||
|
|
@ -23,6 +24,7 @@ const getActiveTaskCount = vi.fn(() => 0);
|
|||
const markGatewayDraining = vi.fn();
|
||||
const waitForActiveTasks = vi.fn(async (_timeoutMs?: number) => ({ drained: true }));
|
||||
const resetAllLanes = vi.fn();
|
||||
const reloadTaskRegistryFromStore = vi.fn();
|
||||
const getActiveBundledRuntimeDepsInstallCount = vi.fn(() => 0);
|
||||
const waitForBundledRuntimeDepsInstallIdle = vi.fn(async (_timeoutMs?: number) => ({
|
||||
drained: true,
|
||||
|
|
@ -71,6 +73,7 @@ vi.mock("../../infra/restart.js", () => ({
|
|||
isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(),
|
||||
markGatewaySigusr1RestartHandled: () => markGatewaySigusr1RestartHandled(),
|
||||
peekGatewaySigusr1RestartReason: () => peekGatewaySigusr1RestartReason(),
|
||||
resetGatewayRestartStateForInProcessRestart: () => resetGatewayRestartStateForInProcessRestart(),
|
||||
scheduleGatewaySigusr1Restart: (opts?: { delayMs?: number; reason?: string }) =>
|
||||
scheduleGatewaySigusr1Restart(opts),
|
||||
}));
|
||||
|
|
@ -91,6 +94,10 @@ vi.mock("../../process/command-queue.js", () => ({
|
|||
resetAllLanes: () => resetAllLanes(),
|
||||
}));
|
||||
|
||||
vi.mock("../../tasks/runtime-internal.js", () => ({
|
||||
reloadTaskRegistryFromStore: () => reloadTaskRegistryFromStore(),
|
||||
}));
|
||||
|
||||
vi.mock("../../plugins/bundled-runtime-deps-activity.js", () => ({
|
||||
getActiveBundledRuntimeDepsInstallCount: () => getActiveBundledRuntimeDepsInstallCount(),
|
||||
waitForBundledRuntimeDepsInstallIdle: (timeoutMs?: number) =>
|
||||
|
|
@ -306,7 +313,7 @@ describe("runGatewayLoop", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("restarts after SIGUSR1 even when drain times out, and resets lanes for the new iteration", async () => {
|
||||
it("restarts after SIGUSR1 even when drain times out, and resets runtime state for the new iteration", async () => {
|
||||
vi.clearAllMocks();
|
||||
loadConfig.mockReturnValue({
|
||||
gateway: {
|
||||
|
|
@ -395,6 +402,8 @@ describe("runGatewayLoop", () => {
|
|||
});
|
||||
expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(1);
|
||||
expect(resetAllLanes).toHaveBeenCalledTimes(1);
|
||||
expect(resetGatewayRestartStateForInProcessRestart).toHaveBeenCalledTimes(1);
|
||||
expect(reloadTaskRegistryFromStore).toHaveBeenCalledTimes(1);
|
||||
|
||||
sigusr1();
|
||||
|
||||
|
|
@ -407,6 +416,8 @@ describe("runGatewayLoop", () => {
|
|||
expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(2);
|
||||
expect(markGatewayDraining).toHaveBeenCalledTimes(2);
|
||||
expect(resetAllLanes).toHaveBeenCalledTimes(2);
|
||||
expect(resetGatewayRestartStateForInProcessRestart).toHaveBeenCalledTimes(2);
|
||||
expect(reloadTaskRegistryFromStore).toHaveBeenCalledTimes(2);
|
||||
expect(acquireGatewayLock).toHaveBeenCalledTimes(3);
|
||||
|
||||
sigterm();
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import {
|
|||
isGatewaySigusr1RestartExternallyAllowed,
|
||||
markGatewaySigusr1RestartHandled,
|
||||
peekGatewaySigusr1RestartReason,
|
||||
resetGatewayRestartStateForInProcessRestart,
|
||||
scheduleGatewaySigusr1Restart,
|
||||
} from "../../infra/restart.js";
|
||||
import { detectRespawnSupervisor } from "../../infra/supervisor-markers.js";
|
||||
|
|
@ -36,6 +37,7 @@ import {
|
|||
} from "../../process/command-queue.js";
|
||||
import { createRestartIterationHook } from "../../process/restart-recovery.js";
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
import { reloadTaskRegistryFromStore } from "../../tasks/runtime-internal.js";
|
||||
|
||||
const gatewayLog = createSubsystemLogger("gateway");
|
||||
const LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS = 1500;
|
||||
|
|
@ -422,10 +424,12 @@ export async function runGatewayLoop(params: {
|
|||
// After an in-process restart (SIGUSR1), reset command-queue lane state.
|
||||
// Interrupted tasks from the previous lifecycle may have left `active`
|
||||
// counts elevated (their finally blocks never ran), permanently blocking
|
||||
// new work from draining. This must happen here — at the restart
|
||||
// coordinator level — rather than inside individual subsystem init
|
||||
// functions, to avoid surprising cross-cutting side effects.
|
||||
// new work from draining. The same boundary also discards stale restart
|
||||
// deferral timers and reloads the task registry from durable state so
|
||||
// cancelled/completed work is not kept alive by old in-memory maps.
|
||||
resetAllLanes();
|
||||
resetGatewayRestartStateForInProcessRestart();
|
||||
reloadTaskRegistryFromStore();
|
||||
});
|
||||
|
||||
// Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required).
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ vi.mock("./server-tailscale.js", () => ({
|
|||
startGatewayTailscaleExposure: hoisted.startGatewayTailscaleExposure,
|
||||
}));
|
||||
|
||||
const { startGatewayPostAttachRuntime, startGatewaySidecars } =
|
||||
const { startGatewayPostAttachRuntime, startGatewaySidecars, __testing } =
|
||||
await import("./server-startup-post-attach.js");
|
||||
const { STARTUP_UNAVAILABLE_GATEWAY_METHODS } =
|
||||
await import("./server-startup-unavailable-methods.js");
|
||||
|
|
@ -225,6 +225,35 @@ describe("startGatewayPostAttachRuntime", () => {
|
|||
expect(returned).toBe(true);
|
||||
});
|
||||
|
||||
it("continues channel startup when primary model prewarm hangs", async () => {
|
||||
vi.useFakeTimers();
|
||||
const log = { warn: vi.fn() };
|
||||
const prewarm = vi.fn(async () => {
|
||||
await new Promise(() => undefined);
|
||||
});
|
||||
|
||||
try {
|
||||
const promise = __testing.prewarmConfiguredPrimaryModelWithTimeout(
|
||||
{
|
||||
cfg: {} as never,
|
||||
log,
|
||||
timeoutMs: 25,
|
||||
},
|
||||
prewarm as never,
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
await promise;
|
||||
|
||||
expect(prewarm).toHaveBeenCalledTimes(1);
|
||||
expect(log.warn).toHaveBeenCalledWith(
|
||||
"startup model warmup timed out after 25ms; continuing channel startup",
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps startup-gated methods unavailable while sidecars are still resuming", async () => {
|
||||
let resumeSidecars!: () => void;
|
||||
const sidecarsReady = new Promise<{ pluginServices: null }>((resolve) => {
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import type { startGatewayTailscaleExposure } from "./server-tailscale.js";
|
|||
const SESSION_LOCK_STALE_MS = 30 * 60 * 1000;
|
||||
const ACP_BACKEND_READY_TIMEOUT_MS = 5_000;
|
||||
const ACP_BACKEND_READY_POLL_MS = 50;
|
||||
const PRIMARY_MODEL_PREWARM_TIMEOUT_MS = 5_000;
|
||||
|
||||
type Awaitable<T> = T | Promise<T>;
|
||||
|
||||
|
|
@ -162,6 +163,34 @@ async function prewarmConfiguredPrimaryModel(params: {
|
|||
}
|
||||
}
|
||||
|
||||
async function prewarmConfiguredPrimaryModelWithTimeout(
|
||||
params: {
|
||||
cfg: OpenClawConfig;
|
||||
log: { warn: (msg: string) => void };
|
||||
timeoutMs?: number;
|
||||
},
|
||||
prewarm: typeof prewarmConfiguredPrimaryModel = prewarmConfiguredPrimaryModel,
|
||||
): Promise<void> {
|
||||
let settled = false;
|
||||
const warmup = prewarm(params)
|
||||
.catch((err) => {
|
||||
params.log.warn(`startup model warmup failed: ${String(err)}`);
|
||||
})
|
||||
.finally(() => {
|
||||
settled = true;
|
||||
});
|
||||
const timeout = sleep(params.timeoutMs ?? PRIMARY_MODEL_PREWARM_TIMEOUT_MS, undefined, {
|
||||
ref: false,
|
||||
}).then(() => {
|
||||
if (!settled) {
|
||||
params.log.warn(
|
||||
`startup model warmup timed out after ${params.timeoutMs ?? PRIMARY_MODEL_PREWARM_TIMEOUT_MS}ms; continuing channel startup`,
|
||||
);
|
||||
}
|
||||
});
|
||||
await Promise.race([warmup, timeout]);
|
||||
}
|
||||
|
||||
export async function startGatewaySidecars(params: {
|
||||
cfg: OpenClawConfig;
|
||||
pluginRegistry: ReturnType<typeof loadOpenClawPlugins>;
|
||||
|
|
@ -289,7 +318,7 @@ export async function startGatewaySidecars(params: {
|
|||
await measureStartup(params.startupTrace, "sidecars.channels", async () => {
|
||||
if (!skipChannels) {
|
||||
try {
|
||||
await prewarmConfiguredPrimaryModel({
|
||||
await prewarmConfiguredPrimaryModelWithTimeout({
|
||||
cfg: params.cfg,
|
||||
log: params.log,
|
||||
});
|
||||
|
|
@ -608,4 +637,5 @@ export async function startGatewayPostAttachRuntime(
|
|||
|
||||
export const __testing = {
|
||||
prewarmConfiguredPrimaryModel,
|
||||
prewarmConfiguredPrimaryModelWithTimeout,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -72,6 +72,11 @@ function clearActiveDeferralPolls(): void {
|
|||
activeDeferralPolls.clear();
|
||||
}
|
||||
|
||||
export function resetGatewayRestartStateForInProcessRestart(): void {
|
||||
clearActiveDeferralPolls();
|
||||
clearPendingScheduledRestart();
|
||||
}
|
||||
|
||||
export type RestartAuditInfo = {
|
||||
actor?: string;
|
||||
deviceId?: string;
|
||||
|
|
|
|||
|
|
@ -107,6 +107,10 @@ function statusCommandSpawn() {
|
|||
return [process.execPath, "openclaw.mjs", "status"];
|
||||
}
|
||||
|
||||
function gatewayCallStatusCommandSpawn() {
|
||||
return [process.execPath, "openclaw.mjs", "gateway", "call", "status", "--json"];
|
||||
}
|
||||
|
||||
function resolvePath(tmp: string, relativePath: string) {
|
||||
return path.join(tmp, relativePath);
|
||||
}
|
||||
|
|
@ -230,6 +234,32 @@ async function runStatusCommand(params: {
|
|||
});
|
||||
}
|
||||
|
||||
async function runGatewayCallStatusCommand(params: {
|
||||
tmp: string;
|
||||
spawn: (cmd: string, args: string[]) => ReturnType<typeof createExitedProcess>;
|
||||
spawnSync?: (cmd: string, args: string[]) => { status: number; stdout: string };
|
||||
env?: Record<string, string>;
|
||||
runRuntimePostBuild?: (params?: {
|
||||
cwd?: string;
|
||||
env?: Record<string, string | undefined>;
|
||||
}) => void | Promise<void>;
|
||||
}) {
|
||||
return await runNodeMain({
|
||||
cwd: params.tmp,
|
||||
args: ["gateway", "call", "status", "--json"],
|
||||
env: {
|
||||
...process.env,
|
||||
OPENCLAW_RUNNER_LOG: "0",
|
||||
...params.env,
|
||||
},
|
||||
spawn: params.spawn,
|
||||
...(params.spawnSync ? { spawnSync: params.spawnSync } : {}),
|
||||
...(params.runRuntimePostBuild ? { runRuntimePostBuild: params.runRuntimePostBuild } : {}),
|
||||
execPath: process.execPath,
|
||||
platform: process.platform,
|
||||
});
|
||||
}
|
||||
|
||||
async function runQaCommand(params: {
|
||||
tmp: string;
|
||||
spawn: (cmd: string, args: string[]) => ReturnType<typeof createExitedProcess>;
|
||||
|
|
@ -1057,6 +1087,41 @@ describe("run-node script", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("does not rebuild for gateway client calls against an existing dirty dist", async () => {
|
||||
await withTempDir({ prefix: "openclaw-run-node-" }, async (tmp) => {
|
||||
await setupTrackedProject(tmp, {
|
||||
files: {
|
||||
[ROOT_SRC]: "export const value = 1;\n",
|
||||
[RUNTIME_POSTBUILD_STAMP]: '{"head":"abc123"}\n',
|
||||
},
|
||||
buildPaths: [
|
||||
ROOT_SRC,
|
||||
ROOT_TSCONFIG,
|
||||
ROOT_PACKAGE,
|
||||
DIST_ENTRY,
|
||||
BUILD_STAMP,
|
||||
RUNTIME_POSTBUILD_STAMP,
|
||||
],
|
||||
});
|
||||
|
||||
const runRuntimePostBuild = vi.fn();
|
||||
const { spawnCalls, spawn, spawnSync } = createSpawnRecorder({
|
||||
gitHead: "abc123\n",
|
||||
gitStatus: ` M ${ROOT_SRC}\n`,
|
||||
});
|
||||
const exitCode = await runGatewayCallStatusCommand({
|
||||
tmp,
|
||||
spawn,
|
||||
spawnSync,
|
||||
runRuntimePostBuild,
|
||||
});
|
||||
|
||||
expect(exitCode).toBe(0);
|
||||
expect(spawnCalls).toEqual([gatewayCallStatusCommandSpawn()]);
|
||||
expect(runRuntimePostBuild).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("reports a clean tree explicitly when dist is current", async () => {
|
||||
await withTempDir({ prefix: "openclaw-run-node-" }, async (tmp) => {
|
||||
await setupTrackedProject(tmp, {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ export {
|
|||
markTaskTerminalByRunId,
|
||||
maybeDeliverTaskTerminalUpdate,
|
||||
recordTaskProgressByRunId,
|
||||
reloadTaskRegistryFromStore,
|
||||
resetTaskRegistryDeliveryRuntimeForTests,
|
||||
resolveTaskForLookupToken,
|
||||
resetTaskRegistryForTests,
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import {
|
|||
markTaskTerminalById,
|
||||
markTaskTerminalByRunId,
|
||||
recordTaskProgressByRunId,
|
||||
reloadTaskRegistryFromStore,
|
||||
resetTaskRegistryControlRuntimeForTests,
|
||||
resetTaskRegistryDeliveryRuntimeForTests,
|
||||
resetTaskRegistryForTests,
|
||||
|
|
@ -1834,6 +1835,75 @@ describe("task-registry", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("reloads from durable state instead of preserving stale in-memory tasks", async () => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
const now = Date.now();
|
||||
let durableTasks = new Map<string, ReturnType<typeof createTaskRecord>>();
|
||||
configureTaskRegistryRuntime({
|
||||
store: {
|
||||
loadSnapshot: () => ({
|
||||
tasks: durableTasks,
|
||||
deliveryStates: new Map(),
|
||||
}),
|
||||
saveSnapshot: () => {},
|
||||
upsertTask: () => {},
|
||||
upsertTaskWithDeliveryState: () => {},
|
||||
},
|
||||
});
|
||||
|
||||
const staleTask = createTaskRecord({
|
||||
runtime: "cli",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
runId: "run-stale-memory",
|
||||
task: "Stale in-memory task",
|
||||
status: "running",
|
||||
deliveryStatus: "pending",
|
||||
notifyPolicy: "silent",
|
||||
});
|
||||
setTaskTimingById({
|
||||
taskId: staleTask.taskId,
|
||||
startedAt: now - 60_000,
|
||||
lastEventAt: now - 60_000,
|
||||
});
|
||||
expect(getTaskRegistrySummary().active).toBe(1);
|
||||
|
||||
durableTasks = new Map([
|
||||
[
|
||||
"task-durable",
|
||||
{
|
||||
taskId: "task-durable",
|
||||
runtime: "cli",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
runId: "run-durable",
|
||||
task: "Durable terminal task",
|
||||
status: "cancelled",
|
||||
deliveryStatus: "not_applicable",
|
||||
notifyPolicy: "silent",
|
||||
createdAt: now - 30_000,
|
||||
startedAt: now - 30_000,
|
||||
endedAt: now - 10_000,
|
||||
lastEventAt: now - 10_000,
|
||||
},
|
||||
],
|
||||
]);
|
||||
|
||||
reloadTaskRegistryFromStore();
|
||||
|
||||
expect(findTaskByRunId("run-stale-memory")).toBeUndefined();
|
||||
expect(findTaskByRunId("run-durable")).toMatchObject({
|
||||
taskId: "task-durable",
|
||||
status: "cancelled",
|
||||
});
|
||||
expect(getTaskRegistrySummary().active).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
it("summarizes inspectable task audit findings", async () => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
|
|
|
|||
|
|
@ -319,6 +319,16 @@ function persistTaskDeliveryStateDelete(taskId: string) {
|
|||
});
|
||||
}
|
||||
|
||||
function clearTaskRegistryMemory(): void {
|
||||
tasks.clear();
|
||||
taskDeliveryStates.clear();
|
||||
taskIdsByRunId.clear();
|
||||
taskIdsByOwnerKey.clear();
|
||||
taskIdsByParentFlowId.clear();
|
||||
taskIdsByRelatedSessionKey.clear();
|
||||
tasksWithPendingDelivery.clear();
|
||||
}
|
||||
|
||||
function ensureDeliveryStatus(params: {
|
||||
ownerKey: string;
|
||||
scopeKind: TaskScopeKind;
|
||||
|
|
@ -946,6 +956,12 @@ export function ensureTaskRegistryReady() {
|
|||
ensureListener();
|
||||
}
|
||||
|
||||
export function reloadTaskRegistryFromStore(): void {
|
||||
clearTaskRegistryMemory();
|
||||
restoreAttempted = false;
|
||||
restoreTaskRegistryOnce();
|
||||
}
|
||||
|
||||
function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | null {
|
||||
const current = tasks.get(taskId);
|
||||
if (!current) {
|
||||
|
|
@ -2057,13 +2073,7 @@ export function deleteTaskRecordById(taskId: string): boolean {
|
|||
}
|
||||
|
||||
export function resetTaskRegistryForTests(opts?: { persist?: boolean }) {
|
||||
tasks.clear();
|
||||
taskDeliveryStates.clear();
|
||||
taskIdsByRunId.clear();
|
||||
taskIdsByOwnerKey.clear();
|
||||
taskIdsByParentFlowId.clear();
|
||||
taskIdsByRelatedSessionKey.clear();
|
||||
tasksWithPendingDelivery.clear();
|
||||
clearTaskRegistryMemory();
|
||||
restoreAttempted = false;
|
||||
resetTaskRegistryRuntimeForTests();
|
||||
if (listenerStop) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue