fix(telegram): persist accepted update offsets

This commit is contained in:
Peter Steinberger 2026-04-25 01:34:41 +01:00
parent 7c0549bd9f
commit ae57a7998e
No known key found for this signature in database
5 changed files with 280 additions and 76 deletions

View file

@ -13,6 +13,8 @@ Docs: https://docs.openclaw.ai
- Agents/failover: forward embedded run abort signals into provider-owned model streams, cap implicit LLM idle watchdogs below long run timeouts, and mark 429 responses without usable retry timing as non-retryable so GitHub Copilot rate limits fail over or surface promptly instead of hanging until run timeout. Fixes #71120.
- Plugins/Google Meet: make meeting creation join by default, with an explicit URL-only opt-out, so agents that create a Meet also enter it.
- Telegram/polling: persist accepted update offsets before long-running handlers complete so poller restarts do not replay already-ingested updates, while keeping same-process retries for handler failures.
- Telegram/config: include generated Telegram channel config schema metadata in packaged plugin manifests so forum-topic/group config is accepted before runtime loads.
- Browser/tool: keep explicit AI snapshots from inheriting the efficient role-snapshot default and preserve numeric Playwright AI refs, so `--format ai` remains a real AI snapshot path. Fixes #62550. Thanks @ly85206559.
- Gateway/config: keep in-process config patch reload comparisons on the resolved source snapshot when `${VAR}` env refs are restored on disk, avoiding false full gateway restarts for unchanged gateway/plugin secrets. Fixes #71208. Thanks @robbiethompson18.
- Slack/messages: serialize write-client requests and whole outbound sends per target so rapid multi-message Slack replies preserve send order. Fixes #69101. (#69105) Thanks @nightq and @ztexydt-cqh.

View file

@ -283,51 +283,24 @@ export function createTelegramBotCore(
const activeHandledUpdateKeys = new Map<string, boolean>();
const initialUpdateId =
typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null;
// Track update_ids that have entered the middleware pipeline but have not completed yet.
// This includes updates that are "queued" behind sequentialize(...) for a chat/topic key.
// We only persist a watermark that is strictly less than the smallest pending update_id,
// so we never write an offset that would skip an update still waiting to run.
const pendingUpdateIds = new Set<number>();
const failedUpdateIds = new Set<number>();
let highestCompletedUpdateId: number | null = initialUpdateId;
let highestAcceptedUpdateId: number | null = initialUpdateId;
let highestPersistedUpdateId: number | null = initialUpdateId;
const maybePersistSafeWatermark = () => {
const persistAcceptedUpdateId = (updateId: number) => {
if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) {
return;
}
highestAcceptedUpdateId = updateId;
if (typeof opts.updateOffset?.onUpdateId !== "function") {
return;
}
if (highestCompletedUpdateId === null) {
if (highestPersistedUpdateId !== null && updateId <= highestPersistedUpdateId) {
return;
}
let safe = highestCompletedUpdateId;
if (pendingUpdateIds.size > 0) {
let minPending: number | null = null;
for (const id of pendingUpdateIds) {
if (minPending === null || id < minPending) {
minPending = id;
}
}
if (minPending !== null) {
safe = Math.min(safe, minPending - 1);
}
}
if (failedUpdateIds.size > 0) {
let minFailed: number | null = null;
for (const id of failedUpdateIds) {
if (minFailed === null || id < minFailed) {
minFailed = id;
}
}
if (minFailed !== null) {
safe = Math.min(safe, minFailed - 1);
}
}
if (highestPersistedUpdateId !== null && safe <= highestPersistedUpdateId) {
return;
}
highestPersistedUpdateId = safe;
highestPersistedUpdateId = updateId;
void Promise.resolve()
.then(() => opts.updateOffset?.onUpdateId?.(safe))
.then(() => opts.updateOffset?.onUpdateId?.(updateId))
.catch((err) => {
runtime.error?.(`telegram: failed to persist update watermark: ${formatErrorMessage(err)}`);
});
@ -341,8 +314,7 @@ export function createTelegramBotCore(
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
const updateId = resolveTelegramUpdateId(ctx);
const skipCutoff = highestPersistedUpdateId ?? initialUpdateId;
if (typeof updateId === "number" && skipCutoff !== null && updateId <= skipCutoff) {
if (typeof updateId === "number" && initialUpdateId !== null && updateId <= initialUpdateId) {
return true;
}
const key = buildTelegramUpdateKey(ctx);
@ -370,20 +342,26 @@ export function createTelegramBotCore(
const updateKey = buildTelegramUpdateKey(ctx);
let completed = false;
if (typeof updateId === "number") {
failedUpdateIds.delete(updateId);
pendingUpdateIds.add(updateId);
if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) {
if (!failedUpdateIds.has(updateId)) {
logSkippedUpdate(`update:${updateId}`);
return;
}
} else {
failedUpdateIds.delete(updateId);
}
}
if (updateKey) {
if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) {
logSkippedUpdate(updateKey);
if (typeof updateId === "number") {
pendingUpdateIds.delete(updateId);
}
return;
}
pendingUpdateKeys.add(updateKey);
activeHandledUpdateKeys.set(updateKey, false);
}
if (typeof updateId === "number") {
persistAcceptedUpdateId(updateId);
}
try {
await next();
completed = true;
@ -396,12 +374,8 @@ export function createTelegramBotCore(
pendingUpdateKeys.delete(updateKey);
}
if (typeof updateId === "number") {
pendingUpdateIds.delete(updateId);
if (completed) {
if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) {
highestCompletedUpdateId = updateId;
}
maybePersistSafeWatermark();
failedUpdateIds.delete(updateId);
} else {
failedUpdateIds.add(updateId);
}

View file

@ -1091,7 +1091,7 @@ describe("createTelegramBot", () => {
expect(replySpy).toHaveBeenCalledTimes(1);
});
it("does not persist update offset past pending updates", async () => {
it("persists accepted update offsets before completion", async () => {
// For this test we need sequentialize(...) to behave like a normal middleware and call next().
sequentializeSpy.mockImplementationOnce(
() => async (_ctx: unknown, next: () => Promise<void>) => {
@ -1146,26 +1146,20 @@ describe("createTelegramBot", () => {
releaseUpdate101 = resolve;
});
// Start processing update 101 but keep it pending (simulates an update queued behind sequentialize()).
// Start processing update 101 but keep it pending (simulates a long-running turn).
const p101 = runMiddlewareChain({ update: { update_id: 101 } }, async () => update101Gate);
// Let update 101 enter the chain and mark itself pending before 102 completes.
// Let update 101 enter the chain and persist acceptance before 102 completes.
await Promise.resolve();
expect(onUpdateId).toHaveBeenCalledWith(101);
// Complete update 102 while 101 is still pending. The persisted watermark must not jump to 102.
// Complete update 102 while 101 is still pending. Restart replay protection is at-most-once.
await runMiddlewareChain({ update: { update_id: 102 } }, async () => {});
const persistedValues = onUpdateId.mock.calls.map((call) => Number(call[0]));
const maxPersisted = persistedValues.length > 0 ? Math.max(...persistedValues) : -Infinity;
expect(maxPersisted).toBeLessThan(101);
expect(onUpdateId).toHaveBeenCalledWith(102);
releaseUpdate101?.();
await p101;
// Once the pending update finishes, the watermark can safely catch up.
const persistedAfterDrain = onUpdateId.mock.calls.map((call) => Number(call[0]));
const maxPersistedAfterDrain =
persistedAfterDrain.length > 0 ? Math.max(...persistedAfterDrain) : -Infinity;
expect(maxPersistedAfterDrain).toBe(102);
expect(onUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([101, 102]);
});
it("logs and swallows update watermark persistence failures", async () => {
sequentializeSpy.mockImplementationOnce(
@ -1237,7 +1231,7 @@ describe("createTelegramBot", () => {
}
});
it("does not persist failed updates into the watermark", async () => {
it("persists failed updates once accepted while preserving same-process retries", async () => {
sequentializeSpy.mockImplementationOnce(
() => async (_ctx: unknown, next: () => Promise<void>) => {
await next();
@ -1288,18 +1282,100 @@ describe("createTelegramBot", () => {
throw new Error("middleware boom");
}),
).rejects.toThrow("middleware boom");
await flushTelegramTestMicrotasks();
expect(onUpdateId).toHaveBeenCalledWith(201);
await runMiddlewareChain({ update: { update_id: 202 } }, async () => {});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(onUpdateId).not.toHaveBeenCalled();
expect(onUpdateId).not.toHaveBeenCalledWith(201);
expect(onUpdateId).not.toHaveBeenCalledWith(202);
await runMiddlewareChain({ update: { update_id: 201 } }, async () => {});
await flushTelegramTestMicrotasks();
expect(onUpdateId).toHaveBeenCalledWith(202);
const retryHandler = vi.fn();
await runMiddlewareChain({ update: { update_id: 201 } }, async () => {
retryHandler();
});
await flushTelegramTestMicrotasks();
expect(retryHandler).toHaveBeenCalledTimes(1);
expect(onUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([201, 202]);
});
it("skips replayed update ids even when the semantic update key differs", async () => {
sequentializeSpy.mockImplementationOnce(
() => async (_ctx: unknown, next: () => Promise<void>) => {
await next();
},
);
const onUpdateId = vi.fn();
createTelegramBot({
token: "tok",
updateOffset: {
lastUpdateId: 300,
onUpdateId,
},
});
type Middleware = (
ctx: Record<string, unknown>,
next: () => Promise<void>,
) => Promise<void> | void;
const middlewares = middlewareUseSpy.mock.calls
.map((call) => call[0])
.filter((fn): fn is Middleware => typeof fn === "function");
const runMiddlewareChain = async (
ctx: Record<string, unknown>,
finalNext: () => Promise<void>,
) => {
let idx = -1;
const dispatch = async (i: number): Promise<void> => {
if (i <= idx) {
throw new Error("middleware dispatch called multiple times");
}
idx = i;
const fn = middlewares[i];
if (!fn) {
await finalNext();
return;
}
await fn(ctx, async () => dispatch(i + 1));
};
await dispatch(0);
};
const handler = vi.fn();
await runMiddlewareChain(
{
update: {
update_id: 301,
message: { chat: { id: 1 }, message_id: 10 },
},
},
async () => {
handler();
},
);
const replayHandler = vi.fn();
await runMiddlewareChain(
{
update: {
update_id: 301,
message: { chat: { id: 1 }, message_id: 11 },
},
},
async () => {
replayHandler();
},
);
await flushTelegramTestMicrotasks();
expect(onUpdateId).toHaveBeenCalledWith(301);
expect(handler).toHaveBeenCalledTimes(1);
expect(replayHandler).not.toHaveBeenCalled();
});
it("allows distinct callback_query ids without update_id", async () => {
loadConfig.mockReturnValue({

View file

@ -1,6 +1,7 @@
import fs from "node:fs";
import path from "node:path";
import { pathToFileURL } from "node:url";
import JSON5 from "json5";
import { NON_PACKAGED_BUNDLED_PLUGIN_DIRS } from "./lib/bundled-plugin-build-entries.mjs";
import { shouldBuildBundledCluster } from "./lib/optional-bundled-clusters.mjs";
import {
@ -10,6 +11,8 @@ import {
} from "./runtime-postbuild-shared.mjs";
const GENERATED_BUNDLED_SKILLS_DIR = "bundled-skills";
const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA_PATH =
"src/config/bundled-channel-config-metadata.generated.ts";
const TRANSIENT_COPY_ERROR_CODES = new Set(["EEXIST", "ENOENT", "ENOTEMPTY", "EBUSY"]);
const COPY_RETRY_DELAYS_MS = [10, 25, 50];
@ -217,6 +220,86 @@ function copyDeclaredPluginSkillPaths(params) {
return copiedSkills;
}
function readGeneratedBundledChannelConfigs(repoRoot) {
const metadataPath = path.join(repoRoot, GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA_PATH);
if (!fs.existsSync(metadataPath)) {
return new Map();
}
const source = fs.readFileSync(metadataPath, "utf8");
const match = source.match(
/export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = ([\s\S]*?) as const;/u,
);
if (!match?.[1]) {
return new Map();
}
let entries;
try {
entries = JSON5.parse(match[1]);
} catch {
return new Map();
}
if (!Array.isArray(entries)) {
return new Map();
}
const byPlugin = new Map();
for (const entry of entries) {
if (
!entry ||
typeof entry !== "object" ||
typeof entry.pluginId !== "string" ||
typeof entry.channelId !== "string" ||
!entry.schema ||
typeof entry.schema !== "object"
) {
continue;
}
const pluginConfigs = byPlugin.get(entry.pluginId) ?? {};
pluginConfigs[entry.channelId] = {
schema: entry.schema,
...(typeof entry.label === "string" && entry.label ? { label: entry.label } : {}),
...(typeof entry.description === "string" && entry.description
? { description: entry.description }
: {}),
...(entry.uiHints && typeof entry.uiHints === "object" ? { uiHints: entry.uiHints } : {}),
};
byPlugin.set(entry.pluginId, pluginConfigs);
}
return byPlugin;
}
function mergeGeneratedChannelConfigs(manifest, generatedChannelConfigs) {
if (!generatedChannelConfigs || Object.keys(generatedChannelConfigs).length === 0) {
return manifest;
}
const existingChannelConfigs =
manifest.channelConfigs && typeof manifest.channelConfigs === "object"
? manifest.channelConfigs
: {};
const channelConfigs = { ...existingChannelConfigs };
for (const [channelId, generated] of Object.entries(generatedChannelConfigs)) {
const existing =
existingChannelConfigs[channelId] && typeof existingChannelConfigs[channelId] === "object"
? existingChannelConfigs[channelId]
: {};
channelConfigs[channelId] = {
...generated,
...existing,
schema: generated.schema,
...(generated.uiHints || existing.uiHints
? { uiHints: { ...generated.uiHints, ...existing.uiHints } }
: {}),
...(existing.label || generated.label ? { label: existing.label ?? generated.label } : {}),
...(existing.description || generated.description
? { description: existing.description ?? generated.description }
: {}),
};
}
return {
...manifest,
channelConfigs,
};
}
/**
* @param {{
* cwd?: string;
@ -233,6 +316,7 @@ export function copyBundledPluginMetadata(params = {}) {
return;
}
const generatedChannelConfigsByPlugin = readGeneratedBundledChannelConfigs(repoRoot);
const sourcePluginDirs = new Set();
for (const dirent of fs.readdirSync(extensionsRoot, { withFileTypes: true })) {
if (!dirent.isDirectory()) {
@ -275,18 +359,22 @@ export function copyBundledPluginMetadata(params = {}) {
if (fs.existsSync(manifestPath)) {
const manifest = JSON.parse(fs.readFileSync(manifestPath, "utf8"));
const manifestWithGeneratedChannelConfigs = mergeGeneratedChannelConfigs(
manifest,
generatedChannelConfigsByPlugin.get(manifest.id),
);
// Generated skill assets live under a dedicated dist-owned directory. Runtime
// dependency staging owns dist plugin node_modules; do not remove it here.
removePathIfExists(path.join(distPluginDir, GENERATED_BUNDLED_SKILLS_DIR));
const copiedSkills = copyDeclaredPluginSkillPaths({
manifest,
manifest: manifestWithGeneratedChannelConfigs,
pluginDir,
distPluginDir,
repoRoot,
});
const bundledManifest = Array.isArray(manifest.skills)
? { ...manifest, skills: copiedSkills }
: manifest;
const bundledManifest = Array.isArray(manifestWithGeneratedChannelConfigs.skills)
? { ...manifestWithGeneratedChannelConfigs, skills: copiedSkills }
: manifestWithGeneratedChannelConfigs;
writeTextFileIfChanged(distManifestPath, `${JSON.stringify(bundledManifest, null, 2)}\n`);
} else {
removeFileIfExists(distManifestPath);

View file

@ -45,13 +45,13 @@ function createPlugin(
return pluginDir;
}
function readBundledManifest(repoRoot: string, pluginId: string) {
function readBundledManifest(repoRoot: string, pluginId: string): Record<string, unknown> {
return JSON.parse(
fs.readFileSync(
path.join(repoRoot, "dist", "extensions", pluginId, "openclaw.plugin.json"),
"utf8",
),
) as { skills?: string[] };
) as Record<string, unknown>;
}
function readBundledPackageJson(repoRoot: string, pluginId: string) {
@ -126,6 +126,70 @@ describe("copyBundledPluginMetadata", () => {
expect(packageJson.openclaw?.extensions).toEqual(["./index.js"]);
});
it("copies generated bundled channel config schemas into dist manifests", () => {
const repoRoot = makeRepoRoot("openclaw-bundled-channel-config-meta-");
createPlugin(repoRoot, {
id: "telegram",
packageName: "@openclaw/telegram",
manifest: {
channels: ["telegram"],
channelConfigs: {
telegram: {
schema: { type: "object", properties: { stale: { type: "boolean" } } },
uiHints: {
"channels.telegram.stale": { help: "stale hint" },
},
},
},
},
packageOpenClaw: { extensions: ["./index.ts"] },
});
fs.mkdirSync(path.join(repoRoot, "src", "config"), { recursive: true });
fs.writeFileSync(
path.join(repoRoot, "src", "config", "bundled-channel-config-metadata.generated.ts"),
[
"// generated test fixture",
"export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [",
" {",
' pluginId: "telegram",',
' channelId: "telegram",',
' label: "Telegram",',
" schema: {",
' type: "object",',
" properties: {",
' groups: { type: "object" }',
" }",
" },",
" uiHints: {",
' "channels.telegram.groups": { help: "generated hint" }',
" }",
" }",
"] as const;",
"",
].join("\n"),
"utf8",
);
copyBundledPluginMetadata({ repoRoot });
const manifest = readBundledManifest(repoRoot, "telegram");
expect(manifest.channelConfigs).toEqual({
telegram: {
schema: {
type: "object",
properties: {
groups: { type: "object" },
},
},
label: "Telegram",
uiHints: {
"channels.telegram.groups": { help: "generated hint" },
"channels.telegram.stale": { help: "stale hint" },
},
},
});
});
it("relocates node_modules-backed skill paths into bundled-skills and rewrites the manifest", () => {
const repoRoot = makeRepoRoot("openclaw-bundled-plugin-node-modules-");
const pluginDir = createTlonSkillPlugin(repoRoot);