mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-28 06:31:11 +00:00
fix(memory-core): cap detached dream narratives (#73287)
Cap detached Dream Diary narrative subagent runs across cron dreaming sweeps so multi-workspace runs cannot fan out unbounded subagent sessions. Adds regression coverage that queued detached narratives resume and clean up, plus a unit-fast lane correction for the security symlink audit test.
This commit is contained in:
parent
89079a32ef
commit
b4e9f1bd1c
7 changed files with 208 additions and 37 deletions
|
|
@ -69,6 +69,7 @@ Docs: https://docs.openclaw.ai
|
|||
- Gateway/Docker: keep config-triggered restarts in-process inside containers instead of spawning a detached child and exiting PID 1 cleanly, so Docker Swarm and other on-failure supervisors do not leave the service stuck at 0/1 replicas. Fixes #73178. Thanks @du-nguyen-IT007.
|
||||
- CLI/tasks: ship the task-registry control runtime in npm packages so `openclaw tasks cancel` can load ACP/subagent cancellation helpers from published builds. Fixes #68997. Thanks @1OAKDesign.
|
||||
- Channels/Telegram: preserve unsent generated media after partial reply streaming has already delivered the text, so `image_generate` outputs still reach Telegram as photos instead of being dropped from the final payload. Fixes #73253. Thanks @mlaihk.
|
||||
- Memory-core/dreaming: cap detached Dream Diary narrative subagents across cron sweeps so multi-workspace dreaming no longer fans out unbounded subagent sessions, lock contention, and cascading narrative timeouts. Fixes #73198. (#73287) Thanks @KeWang0622.
|
||||
- Export/session: keep inline export HTML scripts and vendor libraries injected after template formatting so generated session exports open with the app code, markdown renderer, and syntax highlighter present. Fixes #41862 and #49957; carries forward #41861 and #68947. Thanks @briannewman, @martenzi, and @armanddp.
|
||||
- Agents/ACPX: stage the patched Claude ACP adapter as an ACPX runtime dependency and route known Codex/Claude ACP commands through local wrappers, so Gateway runtime no longer depends on live `npx` adapter resolution. Fixes #73202. Thanks @joerod26.
|
||||
- Memory/compaction: let pre-compaction memory flush use an exact `agents.defaults.compaction.memoryFlush.model` override such as `ollama/qwen3:8b` without inheriting the active session fallback chain, so local housekeeping can avoid paid conversation models. Fixes #53772. Thanks @limen96.
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import {
|
|||
formatBackfillDiaryDate,
|
||||
generateAndAppendDreamNarrative,
|
||||
removeBackfillDiaryEntries,
|
||||
runDetachedDreamNarrative,
|
||||
type NarrativePhaseData,
|
||||
writeBackfillDiaryEntries,
|
||||
} from "./dreaming-narrative.js";
|
||||
|
|
@ -1003,3 +1004,117 @@ describe("generateAndAppendDreamNarrative", () => {
|
|||
expect(subagent.deleteSession.mock.calls[1]?.[0]?.sessionKey).toBe(secondSessionKey);
|
||||
});
|
||||
});
|
||||
|
||||
describe("runDetachedDreamNarrative", () => {
|
||||
type Deferred<T> = { promise: Promise<T>; resolve: (v: T) => void };
|
||||
function deferred<T>(): Deferred<T> {
|
||||
let resolve!: (v: T) => void;
|
||||
const promise = new Promise<T>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
return { promise, resolve };
|
||||
}
|
||||
|
||||
function createBlockingSubagent() {
|
||||
const runDeferreds: Array<Deferred<{ runId: string }>> = [];
|
||||
const subagent = {
|
||||
run: vi.fn(() => {
|
||||
const d = deferred<{ runId: string }>();
|
||||
runDeferreds.push(d);
|
||||
return d.promise;
|
||||
}),
|
||||
// Resolve the rest of the pipeline as a no-op so a single resolve()
|
||||
// on a deferred unblocks the slot for the queued task.
|
||||
waitForRun: vi.fn().mockResolvedValue({ status: "timeout" }),
|
||||
getSessionMessages: vi.fn().mockResolvedValue({ messages: [] }),
|
||||
deleteSession: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
return { subagent, runDeferreds };
|
||||
}
|
||||
|
||||
function createMockLogger() {
|
||||
return { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
}
|
||||
|
||||
async function drainMicrotasks(rounds = 30): Promise<void> {
|
||||
for (let i = 0; i < rounds; i += 1) {
|
||||
await Promise.resolve();
|
||||
}
|
||||
}
|
||||
|
||||
it("caps the number of in-flight detached narratives at 3", async () => {
|
||||
const { subagent, runDeferreds } = createBlockingSubagent();
|
||||
const workspaceDir = await createTempWorkspace("openclaw-dreaming-detach-");
|
||||
const logger = createMockLogger();
|
||||
|
||||
for (let i = 0; i < 5; i += 1) {
|
||||
runDetachedDreamNarrative({
|
||||
subagent,
|
||||
workspaceDir,
|
||||
data: { phase: "light", snippets: [`fragment-${i}`] },
|
||||
nowMs: Date.parse("2026-04-28T03:00:00Z"),
|
||||
logger,
|
||||
});
|
||||
}
|
||||
|
||||
await drainMicrotasks();
|
||||
|
||||
// Only the first 3 should have reached subagent.run; the rest are queued.
|
||||
expect(subagent.run).toHaveBeenCalledTimes(3);
|
||||
|
||||
// Drain the rest so module-level concurrency state does not leak into
|
||||
// subsequent tests. The mock subagent creates a new deferred every time
|
||||
// queued tasks acquire a slot, so loop until no new deferreds appear.
|
||||
for (let iter = 0; iter < 10; iter += 1) {
|
||||
const before = runDeferreds.length;
|
||||
for (const d of runDeferreds) {
|
||||
d.resolve({ runId: "drain" });
|
||||
}
|
||||
if (before >= 5) {
|
||||
break;
|
||||
}
|
||||
await vi.waitFor(() => {
|
||||
expect(runDeferreds.length).toBeGreaterThan(before);
|
||||
});
|
||||
}
|
||||
for (const d of runDeferreds) {
|
||||
d.resolve({ runId: "drain" });
|
||||
}
|
||||
await vi.waitFor(() => {
|
||||
expect(subagent.deleteSession).toHaveBeenCalledTimes(5);
|
||||
});
|
||||
expect(subagent.run).toHaveBeenCalledTimes(5);
|
||||
expect(subagent.waitForRun).toHaveBeenCalledTimes(5);
|
||||
});
|
||||
|
||||
it("swallows underlying narrative errors instead of leaving an unhandled rejection", async () => {
|
||||
const error = new Error("boom");
|
||||
const subagent = {
|
||||
run: vi.fn().mockRejectedValue(error),
|
||||
waitForRun: vi.fn().mockResolvedValue({ status: "ok" }),
|
||||
getSessionMessages: vi.fn().mockResolvedValue({ messages: [] }),
|
||||
deleteSession: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
const logger = createMockLogger();
|
||||
const workspaceDir = await createTempWorkspace("openclaw-dreaming-detach-");
|
||||
const unhandled = vi.fn();
|
||||
process.on("unhandledRejection", unhandled);
|
||||
|
||||
try {
|
||||
runDetachedDreamNarrative({
|
||||
subagent,
|
||||
workspaceDir,
|
||||
data: { phase: "light", snippets: ["fragment"] },
|
||||
nowMs: Date.parse("2026-04-28T03:00:00Z"),
|
||||
logger,
|
||||
});
|
||||
|
||||
await drainMicrotasks();
|
||||
|
||||
expect(subagent.run).toHaveBeenCalledOnce();
|
||||
expect(unhandled).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
process.off("unhandledRejection", unhandled);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1045,3 +1045,53 @@ export async function generateAndAppendDreamNarrative(params: {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
// ── Detached narrative concurrency limit ───────────────────────────────
|
||||
//
|
||||
// Cron-driven dreaming detaches narrative generation across light, REM, and
|
||||
// deep phases for every workspace, so a 10-workspace cron sweep used to fire
|
||||
// 30 concurrent narrative subagents at once. Each one holds the session
|
||||
// write-lock while it runs and burns a model slot, which caused lock
|
||||
// contention (>30 s) and cascading narrative timeouts (#73198).
|
||||
//
|
||||
// `runDetachedDreamNarrative` wraps `generateAndAppendDreamNarrative` with a
|
||||
// FIFO queue capped at `DETACHED_NARRATIVE_CONCURRENCY` so the total in-flight
|
||||
// detached narratives across phases/workspaces stays bounded.
|
||||
const DETACHED_NARRATIVE_CONCURRENCY = 3;
|
||||
|
||||
let activeDetachedNarratives = 0;
|
||||
const detachedNarrativeQueue: Array<() => void> = [];
|
||||
|
||||
function releaseDetachedNarrativeSlot(): void {
|
||||
activeDetachedNarratives -= 1;
|
||||
detachedNarrativeQueue.shift()?.();
|
||||
}
|
||||
|
||||
async function acquireDetachedNarrativeSlot(): Promise<void> {
|
||||
if (activeDetachedNarratives >= DETACHED_NARRATIVE_CONCURRENCY) {
|
||||
await new Promise<void>((resolve) => {
|
||||
detachedNarrativeQueue.push(resolve);
|
||||
});
|
||||
}
|
||||
activeDetachedNarratives += 1;
|
||||
}
|
||||
|
||||
export function runDetachedDreamNarrative(
|
||||
params: Parameters<typeof generateAndAppendDreamNarrative>[0],
|
||||
): void {
|
||||
queueMicrotask(() => {
|
||||
void (async () => {
|
||||
await acquireDetachedNarrativeSlot();
|
||||
try {
|
||||
await generateAndAppendDreamNarrative(params);
|
||||
} catch {
|
||||
// Detached narratives intentionally swallow errors — callers (cron
|
||||
// sweeps) cannot recover, and surfacing here would only cause noisy
|
||||
// unhandled rejections. Logging happens inside
|
||||
// generateAndAppendDreamNarrative.
|
||||
} finally {
|
||||
releaseDetachedNarrativeSlot();
|
||||
}
|
||||
})();
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,11 @@ import {
|
|||
} from "openclaw/plugin-sdk/memory-core-host-status";
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/plugin-entry";
|
||||
import { writeDailyDreamingPhaseBlock } from "./dreaming-markdown.js";
|
||||
import { generateAndAppendDreamNarrative, type NarrativePhaseData } from "./dreaming-narrative.js";
|
||||
import {
|
||||
generateAndAppendDreamNarrative,
|
||||
type NarrativePhaseData,
|
||||
runDetachedDreamNarrative,
|
||||
} from "./dreaming-narrative.js";
|
||||
import { asRecord, formatErrorMessage, normalizeTrimmedString } from "./dreaming-shared.js";
|
||||
import {
|
||||
filterLiveShortTermRecallEntries,
|
||||
|
|
@ -1574,16 +1578,14 @@ async function runLightDreaming(params: {
|
|||
...(themes.length > 0 ? { themes } : {}),
|
||||
};
|
||||
if (params.detachNarratives) {
|
||||
queueMicrotask(() => {
|
||||
void generateAndAppendDreamNarrative({
|
||||
subagent: params.subagent!,
|
||||
workspaceDir: params.workspaceDir,
|
||||
data,
|
||||
nowMs,
|
||||
timezone: params.config.timezone,
|
||||
model: params.config.execution?.model,
|
||||
logger: params.logger,
|
||||
}).catch(() => undefined);
|
||||
runDetachedDreamNarrative({
|
||||
subagent: params.subagent,
|
||||
workspaceDir: params.workspaceDir,
|
||||
data,
|
||||
nowMs,
|
||||
timezone: params.config.timezone,
|
||||
model: params.config.execution?.model,
|
||||
logger: params.logger,
|
||||
});
|
||||
} else {
|
||||
await generateAndAppendDreamNarrative({
|
||||
|
|
@ -1672,16 +1674,14 @@ async function runRemDreaming(params: {
|
|||
...(themes.length > 0 ? { themes } : {}),
|
||||
};
|
||||
if (params.detachNarratives) {
|
||||
queueMicrotask(() => {
|
||||
void generateAndAppendDreamNarrative({
|
||||
subagent: params.subagent!,
|
||||
workspaceDir: params.workspaceDir,
|
||||
data,
|
||||
nowMs,
|
||||
timezone: params.config.timezone,
|
||||
model: params.config.execution?.model,
|
||||
logger: params.logger,
|
||||
}).catch(() => undefined);
|
||||
runDetachedDreamNarrative({
|
||||
subagent: params.subagent,
|
||||
workspaceDir: params.workspaceDir,
|
||||
data,
|
||||
nowMs,
|
||||
timezone: params.config.timezone,
|
||||
model: params.config.execution?.model,
|
||||
logger: params.logger,
|
||||
});
|
||||
} else {
|
||||
await generateAndAppendDreamNarrative({
|
||||
|
|
|
|||
|
|
@ -1917,19 +1917,23 @@ describe("short-term dreaming trigger", () => {
|
|||
});
|
||||
|
||||
expect(result?.handled).toBe(true);
|
||||
expect(subagent.run).toHaveBeenCalled();
|
||||
expect(subagent.run.mock.calls[0]?.[0]).toMatchObject({
|
||||
model: "anthropic/claude-sonnet-4-6",
|
||||
});
|
||||
const memoryText = await fs.readFile(path.join(workspaceDir, "MEMORY.md"), "utf-8");
|
||||
expect(memoryText).toContain("Move backups to S3 Glacier.");
|
||||
// Detached cron narratives now go through a bounded queue
|
||||
// (see runDetachedDreamNarrative), so subagent.run lands a few extra
|
||||
// microtasks after promotion returns. Wait for the full delivery chain
|
||||
// rather than asserting on the exact tick order.
|
||||
await vi.waitFor(async () => {
|
||||
expect(subagent.run).toHaveBeenCalled();
|
||||
expect(subagent.waitForRun).toHaveBeenCalled();
|
||||
expect(subagent.getSessionMessages).toHaveBeenCalled();
|
||||
expect(subagent.deleteSession).toHaveBeenCalled();
|
||||
const dreamsText = await fs.readFile(path.join(workspaceDir, "DREAMS.md"), "utf-8");
|
||||
expect(dreamsText).toContain("A diary entry.");
|
||||
});
|
||||
expect(subagent.run.mock.calls[0]?.[0]).toMatchObject({
|
||||
model: "anthropic/claude-sonnet-4-6",
|
||||
});
|
||||
});
|
||||
|
||||
it("skips dreaming promotion cleanly when limit is zero", async () => {
|
||||
|
|
|
|||
|
|
@ -23,7 +23,11 @@ import type { OpenClawPluginApi } from "openclaw/plugin-sdk/plugin-entry";
|
|||
import { peekSystemEventEntries } from "openclaw/plugin-sdk/system-event-runtime";
|
||||
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { writeDeepDreamingReport } from "./dreaming-markdown.js";
|
||||
import { generateAndAppendDreamNarrative, type NarrativePhaseData } from "./dreaming-narrative.js";
|
||||
import {
|
||||
generateAndAppendDreamNarrative,
|
||||
type NarrativePhaseData,
|
||||
runDetachedDreamNarrative,
|
||||
} from "./dreaming-narrative.js";
|
||||
import { runDreamingSweepPhases } from "./dreaming-phases.js";
|
||||
import {
|
||||
formatErrorMessage,
|
||||
|
|
@ -631,16 +635,14 @@ export async function runShortTermDreamingPromotionIfTriggered(params: {
|
|||
promotions: applied.appliedCandidates.map((c) => c.snippet).filter(Boolean),
|
||||
};
|
||||
if (detachNarratives) {
|
||||
queueMicrotask(() => {
|
||||
void generateAndAppendDreamNarrative({
|
||||
subagent: params.subagent!,
|
||||
workspaceDir,
|
||||
data,
|
||||
nowMs: sweepNowMs,
|
||||
timezone: params.config.timezone,
|
||||
model: params.config.execution?.model,
|
||||
logger: params.logger,
|
||||
}).catch(() => undefined);
|
||||
runDetachedDreamNarrative({
|
||||
subagent: params.subagent,
|
||||
workspaceDir,
|
||||
data,
|
||||
nowMs: sweepNowMs,
|
||||
timezone: params.config.timezone,
|
||||
model: params.config.execution?.model,
|
||||
logger: params.logger,
|
||||
});
|
||||
} else {
|
||||
await generateAndAppendDreamNarrative({
|
||||
|
|
|
|||
|
|
@ -121,7 +121,6 @@ export const forcedUnitFastTestFiles = [
|
|||
"src/realtime-voice/session-runtime.test.ts",
|
||||
"src/security/audit-channel-dm-policy.test.ts",
|
||||
"src/security/audit-channel-readonly-resolution.test.ts",
|
||||
"src/security/audit-config-symlink.test.ts",
|
||||
"src/security/audit-exec-surface.test.ts",
|
||||
"src/security/audit-exec-safe-bins.test.ts",
|
||||
"src/security/audit-extra.sync.test.ts",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue