fix(whatsapp): emit message received hooks (#71217)

* fix(whatsapp): emit message received hooks

* fix(whatsapp): harden message received hooks
This commit is contained in:
Vincent Koc 2026-04-24 13:05:10 -07:00 committed by GitHub
parent 7a168150e6
commit 8154337cb6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 498 additions and 18 deletions

View file

@ -58,6 +58,7 @@ Docs: https://docs.openclaw.ai
- Config/plugins: accept `plugins.entries.*.hooks.allowConversationAccess` in validation, generated schema metadata, and plugin policy inspection so trusted external plugins can enable conversation-access hooks such as `agent_end` without local schema patches. Fixes #71215. (#71221) Thanks @BillChirico.
- Codex harness/models: keep legacy `codex/*` harness shorthand out of model picker and `/models` choice surfaces while migrating primary legacy refs to canonical `openai/*` plus explicit Codex harness config. (#71193) Thanks @vincentkoc.
- Plugins/runtime deps: respect explicit plugin and channel disablement when repairing bundled runtime dependencies, so doctor and health checks no longer install deps for disabled configured channels.
- WhatsApp/plugins: support an explicit opt-in for inbound `message_received` hooks with canonical channel, conversation, session, and sender fields. Thanks @vincentkoc.
- Diagnostics: harden tool and model diagnostic events against hostile errors, blocking listeners, and unsafe stability reason fields. Thanks @vincentkoc.
- Plugins/onboarding: record local plugin install source metadata without duplicating raw absolute local paths in persisted `plugins.installs`, while preserving linked load-path cleanup. (#70970) Thanks @vincentkoc.
- Browser/tool: tell agents not to pass per-call `timeoutMs` on existing-session type, evaluate, and other Chrome MCP actions that reject timeout overrides.

View file

@ -152,6 +152,46 @@ OpenClaw recommends running WhatsApp on a separate number when possible. (The ch
- Group sessions are isolated (`agent:<agentId>:whatsapp:group:<jid>`).
- WhatsApp Web transport honors standard proxy environment variables on the gateway host (`HTTPS_PROXY`, `HTTP_PROXY`, `NO_PROXY` / lowercase variants). Prefer host-level proxy config over channel-specific WhatsApp proxy settings.
## Plugin hooks and privacy
WhatsApp inbound messages can contain personal message content, phone numbers,
group identifiers, sender names, and session correlation fields. For that reason,
WhatsApp does not broadcast inbound `message_received` hook payloads to plugins
unless you explicitly opt in:
```json5
{
channels: {
whatsapp: {
pluginHooks: {
messageReceived: true,
},
},
},
}
```
You can scope the opt-in to one account:
```json5
{
channels: {
whatsapp: {
accounts: {
work: {
pluginHooks: {
messageReceived: true,
},
},
},
},
},
}
```
Only enable this for plugins you trust to receive inbound WhatsApp message
content and identifiers.
## Access control and activation
<Tabs>

View file

@ -4,6 +4,17 @@
"configSchema": {
"type": "object",
"additionalProperties": false,
"properties": {}
"properties": {
"pluginHooks": {
"type": "object",
"additionalProperties": false,
"properties": {
"messageReceived": {
"type": "boolean",
"description": "Opt in to broadcasting inbound WhatsApp message_received hook payloads to loaded plugins."
}
}
}
}
}
}

View file

@ -1,9 +1,10 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// Hoisted mocks used across tests so vi.mock factories can reference them.
const { resolvePolicyMock, buildContextMock } = vi.hoisted(() => ({
const { resolvePolicyMock, buildContextMock, runMessageReceivedMock } = vi.hoisted(() => ({
resolvePolicyMock: vi.fn(),
buildContextMock: vi.fn(),
runMessageReceivedMock: vi.fn(async () => undefined),
}));
vi.mock("../../inbound-policy.js", async (importOriginal) => {
@ -30,6 +31,13 @@ vi.mock("./inbound-dispatch.js", async (importOriginal) => {
};
});
vi.mock("openclaw/plugin-sdk/plugin-runtime", () => ({
getGlobalHookRunner: () => ({
hasHooks: (hookName: string) => hookName === "message_received",
runMessageReceived: runMessageReceivedMock,
}),
}));
vi.mock("../../identity.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../identity.js")>();
return {
@ -113,6 +121,7 @@ vi.mock("./runtime-api.js", async (importOriginal) => {
};
});
import { clearInternalHooks, registerInternalHook } from "openclaw/plugin-sdk/hook-runtime";
import { processMessage } from "./process-message.js";
// ---------------------------------------------------------------------------
@ -172,9 +181,9 @@ const baseRoute = {
matchedBy: "default",
};
function callProcessMessage() {
function callProcessMessage(overrides: { cfg?: unknown } = {}) {
return processMessage({
cfg: {} as never,
cfg: (overrides.cfg ?? {}) as never,
msg: baseMsg as never,
route: baseRoute as never,
groupHistoryKey: "whatsapp:default:group:123@g.us",
@ -201,6 +210,8 @@ describe("processMessage group system prompt wiring", () => {
beforeEach(() => {
buildContextMock.mockReset();
resolvePolicyMock.mockReset();
runMessageReceivedMock.mockClear();
clearInternalHooks();
buildContextMock.mockImplementation(
(params: { groupSystemPrompt?: string; combinedBody?: string }) => ({
GroupSystemPrompt: params.groupSystemPrompt,
@ -209,6 +220,10 @@ describe("processMessage group system prompt wiring", () => {
);
});
afterEach(() => {
clearInternalHooks();
});
it("resolves group systemPrompt from account config and passes it into buildWhatsAppInboundContext", async () => {
resolvePolicyMock.mockReturnValue(
makePolicy(makeAccount({ [GROUP_JID]: { systemPrompt: "from config" } })),
@ -218,4 +233,91 @@ describe("processMessage group system prompt wiring", () => {
expect(buildContextMock.mock.calls[0][0].groupSystemPrompt).toBe("from config");
});
it("fires message_received hooks with canonical WhatsApp correlation fields", async () => {
const internalReceived = vi.fn();
registerInternalHook("message:received", internalReceived);
resolvePolicyMock.mockReturnValue(makePolicy(makeAccount()));
buildContextMock.mockImplementationOnce(() => ({
Body: "hi",
BodyForCommands: "hi",
RawBody: "hi",
CommandBody: "hi",
From: GROUP_JID,
To: "+15550001111",
SessionKey: baseRoute.sessionKey,
AccountId: "default",
MessageSid: "msg1",
SenderId: "+15550002222",
SenderName: "Alice",
SenderE164: "+15550002222",
Timestamp: 1710000000,
Provider: "whatsapp",
Surface: "whatsapp",
OriginatingChannel: "whatsapp",
OriginatingTo: GROUP_JID,
GroupSubject: "Test Group",
}));
await callProcessMessage({
cfg: {
channels: {
whatsapp: {
pluginHooks: {
messageReceived: true,
},
},
},
},
});
await Promise.resolve();
await Promise.resolve();
expect(runMessageReceivedMock).toHaveBeenCalledTimes(1);
expect(runMessageReceivedMock).toHaveBeenCalledWith(
expect.objectContaining({
from: GROUP_JID,
content: "hi",
timestamp: 1710000000,
messageId: "msg1",
senderId: "+15550002222",
sessionKey: baseRoute.sessionKey,
}),
expect.objectContaining({
channelId: "whatsapp",
accountId: "default",
conversationId: GROUP_JID,
sessionKey: baseRoute.sessionKey,
messageId: "msg1",
senderId: "+15550002222",
}),
);
expect(internalReceived).toHaveBeenCalledWith(
expect.objectContaining({
type: "message",
action: "received",
sessionKey: baseRoute.sessionKey,
context: expect.objectContaining({
from: GROUP_JID,
content: "hi",
timestamp: 1710000000,
channelId: "whatsapp",
accountId: "default",
conversationId: GROUP_JID,
messageId: "msg1",
}),
}),
);
});
it("does not fire WhatsApp message_received hooks without explicit opt-in", async () => {
const internalReceived = vi.fn();
registerInternalHook("message:received", internalReceived);
resolvePolicyMock.mockReturnValue(makePolicy(makeAccount()));
await callProcessMessage();
expect(runMessageReceivedMock).not.toHaveBeenCalled();
expect(internalReceived).not.toHaveBeenCalled();
});
});

View file

@ -1,3 +1,13 @@
import {
createInternalHookEvent,
deriveInboundMessageHookContext,
fireAndForgetBoundedHook,
toInternalMessageReceivedContext,
toPluginMessageContext,
toPluginMessageReceivedEvent,
triggerInternalHook,
} from "openclaw/plugin-sdk/hook-runtime";
import { getGlobalHookRunner } from "openclaw/plugin-sdk/plugin-runtime";
import { resolveBatchedReplyThreadingPolicy } from "openclaw/plugin-sdk/reply-reference";
import { getPrimaryIdentityId, getSelfIdentity, getSenderIdentity } from "../../identity.js";
import {
@ -49,6 +59,100 @@ import {
type resolveAgentRoute,
} from "./runtime-api.js";
const WHATSAPP_MESSAGE_RECEIVED_HOOK_LIMITS = {
maxConcurrency: 8,
maxQueue: 128,
timeoutMs: 2_000,
};
type WhatsAppMessageReceivedHookConfig = {
pluginHooks?: {
messageReceived?: unknown;
};
accounts?: Record<string, unknown>;
};
function readWhatsAppMessageReceivedHookOptIn(value: unknown): boolean | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
const pluginHooks = (value as WhatsAppMessageReceivedHookConfig).pluginHooks;
return pluginHooks?.messageReceived === true ? true : undefined;
}
function shouldEmitWhatsAppMessageReceivedHooks(params: {
cfg: ReturnType<LoadConfigFn>;
accountId?: string;
}): boolean {
const channelConfig = params.cfg.channels?.whatsapp as
| WhatsAppMessageReceivedHookConfig
| undefined;
const accountConfig =
params.accountId && channelConfig?.accounts
? channelConfig.accounts[params.accountId]
: undefined;
return (
readWhatsAppMessageReceivedHookOptIn(accountConfig) ??
readWhatsAppMessageReceivedHookOptIn(channelConfig) ??
false
);
}
function emitWhatsAppMessageReceivedHooks(params: {
ctx: ReturnType<typeof buildWhatsAppInboundContext>;
sessionKey: string;
}): void {
const canonical = deriveInboundMessageHookContext(params.ctx);
const hookRunner = getGlobalHookRunner();
if (hookRunner?.hasHooks("message_received")) {
fireAndForgetBoundedHook(
() =>
hookRunner.runMessageReceived(
toPluginMessageReceivedEvent(canonical),
toPluginMessageContext(canonical),
),
"whatsapp: message_received plugin hook failed",
undefined,
WHATSAPP_MESSAGE_RECEIVED_HOOK_LIMITS,
);
}
fireAndForgetBoundedHook(
() =>
triggerInternalHook(
createInternalHookEvent(
"message",
"received",
params.sessionKey,
toInternalMessageReceivedContext(canonical),
),
),
"whatsapp: message_received internal hook failed",
undefined,
WHATSAPP_MESSAGE_RECEIVED_HOOK_LIMITS,
);
}
function emitWhatsAppMessageReceivedHooksIfEnabled(params: {
cfg: ReturnType<LoadConfigFn>;
ctx: ReturnType<typeof buildWhatsAppInboundContext>;
accountId?: string;
sessionKey: string;
}): void {
if (
!shouldEmitWhatsAppMessageReceivedHooks({
cfg: params.cfg,
accountId: params.accountId,
})
) {
return;
}
emitWhatsAppMessageReceivedHooks({
ctx: params.ctx,
sessionKey: params.sessionKey,
});
}
function resolvePinnedMainDmRecipient(params: {
cfg: ReturnType<LoadConfigFn>;
allowFrom?: string[];
@ -171,7 +275,7 @@ export async function processMessage(params: {
sessionKey: params.route.sessionKey,
conversationId,
verbose: params.verbose,
accountId: params.route.accountId,
accountId: account.accountId,
info: params.replyLogger.info.bind(params.replyLogger),
warn: params.replyLogger.warn.bind(params.replyLogger),
});
@ -265,6 +369,12 @@ export async function processMessage(params: {
replyThreading,
visibleReplyTo: visibleReplyTo ?? undefined,
});
emitWhatsAppMessageReceivedHooksIfEnabled({
cfg: params.cfg,
ctx: ctxPayload,
accountId: params.route.accountId,
sessionKey: params.route.sessionKey,
});
const pinnedMainDmRecipient = resolvePinnedMainDmRecipient({
cfg: params.cfg,

View file

@ -1,12 +1,18 @@
import { describe, expect, it, vi } from "vitest";
import { fireAndForgetHook } from "./fire-and-forget.js";
import { fireAndForgetBoundedHook, fireAndForgetHook } from "./fire-and-forget.js";
describe("fireAndForgetHook", () => {
it("logs rejection errors", async () => {
it("logs rejection errors as sanitized single-line messages", async () => {
const logger = vi.fn();
fireAndForgetHook(Promise.reject(new Error("boom")), "hook failed", logger);
fireAndForgetHook(
Promise.reject(new Error("boom\nforged\tsecret sk-test1234567890")),
"hook failed",
logger,
);
await Promise.resolve();
expect(logger).toHaveBeenCalledWith("hook failed: Error: boom");
expect(logger).toHaveBeenCalledWith(expect.stringMatching(/^hook failed: boom forged secret/));
expect(logger.mock.calls[0]?.[0]).not.toContain("\n");
expect(logger.mock.calls[0]?.[0]).not.toContain("sk-test1234567890");
});
it("does not log for resolved tasks", async () => {
@ -16,3 +22,48 @@ describe("fireAndForgetHook", () => {
expect(logger).not.toHaveBeenCalled();
});
});
describe("fireAndForgetBoundedHook", () => {
it("limits queued fire-and-forget hooks", async () => {
const logger = vi.fn();
let resolveFirst: (() => void) | undefined;
const first = new Promise<void>((resolve) => {
resolveFirst = resolve;
});
const starts: string[] = [];
fireAndForgetBoundedHook(
async () => {
starts.push("first");
await first;
},
"hook failed",
logger,
{ maxConcurrency: 1, maxQueue: 1, timeoutMs: 10_000 },
);
fireAndForgetBoundedHook(
async () => {
starts.push("second");
},
"hook failed",
logger,
{ maxConcurrency: 1, maxQueue: 1, timeoutMs: 10_000 },
);
fireAndForgetBoundedHook(
async () => {
starts.push("third");
},
"hook failed",
logger,
{ maxConcurrency: 1, maxQueue: 1, timeoutMs: 10_000 },
);
await Promise.resolve();
expect(starts).toEqual(["first"]);
expect(logger).toHaveBeenCalledWith("hook failed: queue full; dropping hook");
resolveFirst?.();
await new Promise((resolve) => setTimeout(resolve, 0));
expect(starts).toEqual(["first", "second"]);
});
});

View file

@ -1,4 +1,68 @@
import { logVerbose } from "../globals.js";
import { formatErrorMessage } from "../infra/errors.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
const DEFAULT_MAX_CONCURRENT_FIRE_AND_FORGET_HOOKS = 16;
const DEFAULT_MAX_QUEUED_FIRE_AND_FORGET_HOOKS = 256;
const DEFAULT_FIRE_AND_FORGET_HOOK_TIMEOUT_MS = 2_000;
const MAX_HOOK_LOG_MESSAGE_LENGTH = 500;
type FireAndForgetHookJob = {
task: () => Promise<unknown>;
label: string;
logger: (message: string) => void;
timeoutMs: number;
};
type FireAndForgetHookState = {
active: number;
queue: FireAndForgetHookJob[];
};
export type FireAndForgetBoundedHookOptions = {
maxConcurrency?: number;
maxQueue?: number;
timeoutMs?: number;
};
const getFireAndForgetHookState = () =>
resolveGlobalSingleton<FireAndForgetHookState>(
Symbol.for("openclaw.fireAndForgetHookState"),
() => ({
active: 0,
queue: [],
}),
);
function positiveIntegerOrDefault(value: number | undefined, fallback: number): number {
return typeof value === "number" && Number.isInteger(value) && value > 0 ? value : fallback;
}
function replaceLogControlCharacters(value: string): string {
let result = "";
for (const char of value) {
const codePoint = char.codePointAt(0);
if (
codePoint === undefined ||
codePoint <= 0x1f ||
codePoint === 0x7f ||
codePoint === 0x2028 ||
codePoint === 0x2029
) {
result += " ";
continue;
}
result += char;
}
return result;
}
export function formatHookErrorForLog(err: unknown): string {
const formatted = replaceLogControlCharacters(formatErrorMessage(err))
.replace(/\s+/g, " ")
.trim();
return (formatted || "unknown error").slice(0, MAX_HOOK_LOG_MESSAGE_LENGTH);
}
export function fireAndForgetHook(
task: Promise<unknown>,
@ -6,6 +70,79 @@ export function fireAndForgetHook(
logger: (message: string) => void = logVerbose,
): void {
void task.catch((err) => {
logger(`${label}: ${String(err)}`);
logger(`${label}: ${formatHookErrorForLog(err)}`);
});
}
function runFireAndForgetHookJob(
state: FireAndForgetHookState,
job: FireAndForgetHookJob,
limits: { maxConcurrency: number },
): void {
state.active += 1;
let didLogTimeout = false;
const timeout =
job.timeoutMs > 0
? setTimeout(() => {
didLogTimeout = true;
job.logger(`${job.label}: timed out after ${job.timeoutMs}ms`);
}, job.timeoutMs)
: undefined;
void Promise.resolve()
.then(job.task)
.catch((err) => {
if (!didLogTimeout) {
job.logger(`${job.label}: ${formatHookErrorForLog(err)}`);
}
})
.finally(() => {
if (timeout) {
clearTimeout(timeout);
}
state.active -= 1;
drainFireAndForgetHookQueue(state, limits);
});
}
function drainFireAndForgetHookQueue(
state: FireAndForgetHookState,
limits: { maxConcurrency: number },
): void {
while (state.active < limits.maxConcurrency) {
const next = state.queue.shift();
if (!next) {
return;
}
runFireAndForgetHookJob(state, next, limits);
}
}
export function fireAndForgetBoundedHook(
task: () => Promise<unknown>,
label: string,
logger: (message: string) => void = logVerbose,
options: FireAndForgetBoundedHookOptions = {},
): void {
const state = getFireAndForgetHookState();
const maxConcurrency = positiveIntegerOrDefault(
options.maxConcurrency,
DEFAULT_MAX_CONCURRENT_FIRE_AND_FORGET_HOOKS,
);
const maxQueue = positiveIntegerOrDefault(
options.maxQueue,
DEFAULT_MAX_QUEUED_FIRE_AND_FORGET_HOOKS,
);
const timeoutMs = positiveIntegerOrDefault(
options.timeoutMs,
DEFAULT_FIRE_AND_FORGET_HOOK_TIMEOUT_MS,
);
if (state.active >= maxConcurrency && state.queue.length >= maxQueue) {
logger(`${label}: queue full; dropping hook`);
return;
}
state.queue.push({ task, label, logger, timeoutMs });
drainFireAndForgetHookQueue(state, { maxConcurrency });
}

View file

@ -107,7 +107,7 @@ describe("before_agent_reply hook runner (claiming pattern)", () => {
expect(result).toEqual({ handled: true, reply: { text: "ok" } });
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining("before_agent_reply handler from test-plugin failed: Error: boom"),
expect.stringContaining("before_agent_reply handler from test-plugin failed: boom"),
);
});

View file

@ -186,9 +186,38 @@ describe("before_tool_call terminal block semantics", () => {
});
await expect(runner.runBeforeToolCall(toolEvent, toolCtx)).rejects.toThrow(
"before_tool_call handler from failing failed: Error: boom",
"before_tool_call handler from failing failed: boom",
);
});
it("sanitizes caught hook error logs", async () => {
const logger = {
error: vi.fn(),
warn: vi.fn(),
};
addStaticTestHooks(registry, {
hookName: "message_received",
hooks: [
{
pluginId: "failing",
result: undefined,
handler: () => {
throw new Error("boom\nforged\tsecret sk-test1234567890");
},
},
],
});
const runner = createHookRunner(registry, { catchErrors: true, logger });
await runner.runMessageReceived({ from: "user-1", content: "hi" }, { channelId: "whatsapp" });
const message = String(logger.error.mock.calls[0]?.[0] ?? "");
expect(message).toMatch(
/^\[hooks\] message_received handler from failing failed: boom forged secret/,
);
expect(message).not.toContain("\n");
expect(message).not.toContain("sk-test1234567890");
});
});
describe("message_sending terminal cancel semantics", () => {

View file

@ -5,6 +5,7 @@
* error handling, priority ordering, and async support.
*/
import { formatHookErrorForLog } from "../hooks/fire-and-forget.js";
import { formatErrorMessage } from "../infra/errors.js";
import { concatOptionalTextSegments } from "../shared/text/join-segments.js";
import type { GlobalHookRunnerRegistry, HookRunnerRegistry } from "./hook-registry.types.js";
@ -281,9 +282,7 @@ export function createHookRunner(
pluginId: string;
error: unknown;
}): never | void => {
const msg = `[hooks] ${params.hookName} handler from ${params.pluginId} failed: ${String(
params.error,
)}`;
const msg = `[hooks] ${params.hookName} handler from ${params.pluginId} failed: ${formatHookErrorForLog(params.error)}`;
if (shouldCatchHookErrors(params.hookName)) {
logger?.error(msg);
return;

View file

@ -82,7 +82,7 @@ describe("inbound_claim hook runner", () => {
expect(result).toEqual({ handled: true });
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining("inbound_claim handler from test-plugin failed: Error: boom"),
expect.stringContaining("inbound_claim handler from test-plugin failed: boom"),
);
expect(succeeding).toHaveBeenCalledTimes(1);
});

View file

@ -81,7 +81,7 @@ describe("reply_dispatch hook runner", () => {
counts: { tool: 1, block: 0, final: 0 },
});
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining("reply_dispatch handler from test-plugin failed: Error: boom"),
expect.stringContaining("reply_dispatch handler from test-plugin failed: boom"),
);
expect(succeeding).toHaveBeenCalledTimes(1);
});