Squashed commit of the following:

commit 5193ef7501
Author: frdel <38891707+frdel@users.noreply.github.com>
Date:   Tue Mar 31 09:47:02 2026 +0200

    refactor: change default mode from dedicated to self-chat and reorder UI settings

    - Change default mode to self-chat across all modules
    - Update README to reflect self-chat as primary mode with security warning
    - Move session/media storage from usr/whatsapp to tmp/whatsapp
    - Reorder config UI: move Mode above Allowed Numbers
    - Add warning banner when allowed_numbers is empty in self-chat mode
    - Move Bridge Port and Poll Interval to bottom of settings
    - Update mode descriptions to clarify self-chat handles both self

commit 9fece911b5
Author: frdel <38891707+frdel@users.noreply.github.com>
Date:   Tue Mar 31 09:20:35 2026 +0200

    refactor: centralize WhatsApp storage paths and improve bridge dependency handling

    - Add storage_paths.py helper for consistent session/media/runtime paths
    - Replace hardcoded usr/whatsapp paths across all modules
    - Fix bridge lock to be event-loop-aware (recreate per loop)
    - Add automatic dependency reinstall on startup failures
    - Track bridge startup output for better error diagnostics
    - Add dependency state tracking with package.json hash validation
    - Implement force reinstall when node_modules appears

commit bc511d221d
Author: linuztx <linuztx@gmail.com>
Date:   Tue Mar 31 09:07:46 2026 +0800

    fix: stop poll loop immediately when Node.js is not installed

commit a9554e132f
Author: linuztx <linuztx@gmail.com>
Date:   Tue Mar 31 08:49:15 2026 +0800

    fix: auto-reinstall corrupt node_modules and stop poll loop after repeated bridge failures

    _ensure_npm_install now verifies key package exists, not just
    node_modules dir. Wipes and reinstalls if corrupt.

    Poll loop stops after 5 consecutive bridge start failures instead
    of spamming errors and making A0 unusable.

commit 61fa1bf487
Author: linuztx <linuztx@gmail.com>
Date:   Tue Mar 31 08:38:51 2026 +0800

    fix: move allowed_numbers filtering from JS bridge to Python handler

    The JS bridge used LIDs (internal WhatsApp identifiers) for sender
    matching which never matched actual phone numbers. Moved filtering
    to Python handler.py where config is read fresh each poll cycle.

    - Add senderNumber (resolved phone) to bridge message payload
    - Filter in poll_messages() with normalized number comparison
    - Remove --allowed-numbers CLI arg and JS-side filtering
    - Fix ensure_bridge_http_up not recording _bridge_config
    - Fix falsy empty-dict check in bridge restart detection

commit 64ee177897
Author: linuztx <linuztx@gmail.com>
Date:   Sat Mar 28 23:34:23 2026 +0800

    refactor: move email agent instructions to system prompt and update prompt labels

commit 0f53b41d80
Author: linuztx <linuztx@gmail.com>
Date:   Sat Mar 28 10:59:44 2026 +0800

    Add node_modules to gitignore

commit eb6a4d3ad2
Author: linuztx <linuztx@gmail.com>
Date:   Sat Mar 28 10:53:59 2026 +0800

    Add WhatsApp plugin thumbnail

commit 39bed4f538
Author: linuztx <linuztx@gmail.com>
Date:   Sat Mar 28 10:51:47 2026 +0800

    refactor: rename allowed_users to allowed_numbers across plugin

commit e4991b6e6e
Author: linuztx <linuztx@gmail.com>
Date:   Fri Mar 27 21:58:29 2026 +0800

    improve: move agent instructions from per-message to system prompt

commit 4f1be15fa7
Author: linuztx <linuztx@gmail.com>
Date:   Fri Mar 27 21:00:25 2026 +0800

    improve: add macOS port kill support and bridge process destructor cleanup

commit f5349753d7
Author: linuztx <linuztx@gmail.com>
Date:   Fri Mar 27 17:09:56 2026 +0800

    improve: remove redundant bridge_manager from execute, rely on poll loop finally

commit 9d9dd4bd7f
Author: linuztx <linuztx@gmail.com>
Date:   Fri Mar 27 14:41:14 2026 +0800

    fix: stop bridge and poll loop when plugin is disabled or toggled off

commit 66b0a7d3e0
Author: linuztx <linuztx@gmail.com>
Date:   Fri Mar 27 11:05:58 2026 +0800

    improve: fix allowed users input, auto-strip + prefix, log ignored messages

commit 938e7b9312
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 23:26:42 2026 +0800

    improve: add line break to allowed users description

commit 4ef64b9121
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 22:44:55 2026 +0800

    feat: convert markdown to WhatsApp formatting before sending replies

commit f549b49f44
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 22:34:56 2026 +0800

    improve: add progress update instructions to system context prompt

commit 66e5d51dcf
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 22:23:32 2026 +0800

    fix: stop typing indicator on agent error or generation failure

commit 3dd01cd04c
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 18:31:38 2026 +0800

    improve: persistent typing indicator with poll-based refresh

commit 8d0ec86f15
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 17:11:25 2026 +0800

    Update README.md

commit e664673c1c
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 16:05:44 2026 +0800

    feat: add agent prefix to self-chat replies for visual distinction

commit 18c5716d10
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 15:43:01 2026 +0800

    fix: clear typing indicator after sending reply in self-chat mode

commit 7c653c9d56
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 14:43:06 2026 +0800

    improve: merge WhatsApp Link and Disconnect into single Account field

commit 57c95e6f13
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 14:11:05 2026 +0800

    feat: add disconnect account option to switch WhatsApp accounts

commit c62695356e
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 14:00:00 2026 +0800

    improve: move mode description inline and reorder Allow Group after Allowed Users

commit 18a56ea446
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 13:44:17 2026 +0800

    fix: remove duplicate typing indicator before sending reply

commit 44c90a118f
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 13:30:06 2026 +0800

    improve: remove sender number from DM prompt

commit 64fe7d0302
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 13:17:29 2026 +0800

    fix: handle documentWithCaptionMessage wrapper for captioned documents

commit 00b6657185
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 13:06:40 2026 +0800

    feat: add attachment reader/writer with RFC and download all media types

commit 8041c085d2
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 11:45:17 2026 +0800

    improve: update group prompt and reply instructions

commit 71a6eb7557
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 11:26:36 2026 +0800

    feat: reply to specific messages in group chats with quote

commit 6bf63eb9c6
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 09:57:34 2026 +0800

    feat: detect replies to bot messages in group chats

commit b4492e0759
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 09:20:27 2026 +0800

    improve: resolve group names and sender LIDs in bridge messages

commit 14e673f165
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 04:44:50 2026 +0800

    feat: add allow_group toggle to respond only when mentioned in group chats

commit 40f4884319
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 03:20:02 2026 +0800

    refactor: rename mode value from bot to dedicated

commit 50af7c2bde
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 02:34:51 2026 +0800

    fix: kill orphaned bridge process on port before starting new one

commit 45b21c093a
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 02:07:45 2026 +0800

    improve: auto-restart bridge when config changes

commit a12183ba6e
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 01:39:55 2026 +0800

    feat: add bot and self-chat mode selection for WhatsApp bridge

commit bb8961ab73
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 00:56:56 2026 +0800

    improve: send typing indicator immediately on message receive

commit 84c12b0c23
Author: linuztx <linuztx@gmail.com>
Date:   Thu Mar 26 00:29:04 2026 +0800

    feat: add WhatsApp integration plugin with Baileys bridge and QR pairing
This commit is contained in:
frdel 2026-03-31 09:47:25 +02:00
parent 3df7065685
commit 80518f22a6
44 changed files with 5551 additions and 15 deletions

View file

@ -1,10 +1,14 @@
"""Inject email conversation context into system prompt for email sessions."""
from helpers.extension import Extension
from helpers import plugins
from agent import LoopData
from plugins._email_integration.helpers.dispatcher import CTX_EMAIL_HANDLER
PLUGIN_NAME = "_email_integration"
class EmailContextPrompt(Extension):
async def execute(
@ -16,7 +20,23 @@ class EmailContextPrompt(Extension):
if not self.agent:
return
if self.agent.context.data.get(CTX_EMAIL_HANDLER):
system_prompt.append(
self.agent.read_prompt("fw.email.system_context_reply.md")
)
handler_name = self.agent.context.data.get(CTX_EMAIL_HANDLER)
if not handler_name:
return
system_prompt.append(
self.agent.read_prompt("fw.email.system_context_reply.md")
)
config = plugins.get_plugin_config(PLUGIN_NAME) or {}
for h in config.get("handlers", []):
if h.get("name") == handler_name:
instructions = h.get("agent_instructions", "")
if instructions:
system_prompt.append(
self.agent.read_prompt(
"fw.email.user_message_instructions.md",
instructions=instructions,
)
)
break

View file

@ -394,19 +394,13 @@ def _is_own_email(sender: str, own_address: str) -> bool:
def _build_user_message(agent: Agent, msg: InboundMessage, handler_cfg: dict) -> str:
recipient = handler_cfg.get("username", "")
text = agent.read_prompt(
return agent.read_prompt(
"fw.email.user_message.md",
sender=msg.sender,
recipient=recipient,
subject=msg.subject,
body=msg.body,
)
instructions = handler_cfg.get("agent_instructions", "")
if instructions:
text += agent.read_prompt(
"fw.email.user_message_instructions.md", instructions=instructions,
)
return text
# ------------------------------------------------------------------

View file

@ -1,2 +1,2 @@
[Handler instructions: {{instructions}}]
# Email custom rules
{{instructions}}

View file

@ -1,2 +1,2 @@
[Handler instructions: {{instructions}}]
# Telegram custom rules
{{instructions}}

View file

@ -0,0 +1,63 @@
# WhatsApp Integration Plugin
Communicate with Agent Zero via WhatsApp using a Baileys-based Node.js bridge.
## Requirements
- **Node.js** (v18+) and npm installed on the system
- A WhatsApp account on a phone (for QR code pairing)
## Setup
### Install bridge dependencies
```bash
cd plugins/_whatsapp_integration/whatsapp-bridge
npm install --production
```
Dependencies are auto-installed on first bridge start if missing.
### Configure and pair
1. Enable the plugin in Settings > External > WhatsApp Integration
2. Configure allowed phone numbers
3. Click Show QR Code and scan with WhatsApp on your phone
4. Send a message from an allowed number to start a chat
The WhatsApp session persists across restarts in `tmp/whatsapp/session/`. No re-pairing needed unless you disconnect via settings.
Be careful: if you use your personal number and leave `allowed_numbers` open, other people could misuse your Agent Zero.
## Configuration
| Setting | Description | Default |
|---------|-------------|---------|
| `enabled` | Enable bridge and polling | `false` |
| `mode` | `self-chat` (personal number) or `dedicated` (separate number) | `self-chat` |
| `allow_group` | Respond in group chats when mentioned or replied to | `false` |
| `bridge_port` | Local HTTP port for bridge | `3100` |
| `poll_interval_seconds` | Poll frequency (min 2) | `3` |
| `allowed_numbers` | Phone numbers without + prefix | `[]` (all) |
| `project` | Activate project for WA chats | `""` |
| `agent_instructions` | Extra agent instructions | `""` |
## How It Works
1. The bridge connects to WhatsApp via Baileys and exposes HTTP endpoints on localhost
2. In personal-number mode, you can message your own WhatsApp number to talk to the agent, and the agent can also handle messages that other people send to that number
3. The plugin polls the bridge for new messages every few seconds
4. Incoming messages are routed to existing chats by WhatsApp chat ID or new chats are created
5. Agent responses are sent back via the bridge as WhatsApp messages
6. Media (images, documents) is supported in both directions
## Architecture
```
WhatsApp Phone
↕ (WhatsApp protocol via Baileys)
whatsapp-bridge/bridge.js (Node.js subprocess)
↕ (HTTP API on localhost)
Python helpers (wa_client, handler, bridge_manager)
↕ (Framework extensions)
Agent Zero
```

View file

@ -0,0 +1,27 @@
"""Disconnect WhatsApp account by stopping bridge and clearing session."""
import os
import shutil
from helpers.api import ApiHandler, Request
from helpers.errors import format_error
class Disconnect(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
try:
from plugins._whatsapp_integration.helpers import bridge_manager
from plugins._whatsapp_integration.helpers.storage_paths import get_bridge_session_dir
# Stop bridge first
await bridge_manager.stop_bridge()
# Delete session files
session_dir = get_bridge_session_dir()
if os.path.exists(session_dir):
shutil.rmtree(session_dir, ignore_errors=True)
return {"success": True, "message": "Account disconnected"}
except Exception as e:
return {"success": False, "message": format_error(e)}

View file

@ -0,0 +1,83 @@
"""WhatsApp QR code pairing endpoint."""
from helpers.api import ApiHandler, Request
from helpers.errors import format_error
from helpers import plugins
PLUGIN_NAME = "_whatsapp_integration"
class QrCode(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
config = plugins.get_plugin_config(PLUGIN_NAME) or {}
port = int(config.get("bridge_port", 3100))
mode = config.get("mode", "self-chat")
from plugins._whatsapp_integration.helpers.bridge_manager import (
ensure_bridge_http_up,
get_bridge_url,
is_process_alive,
)
from plugins._whatsapp_integration.helpers.storage_paths import (
get_bridge_media_dir,
get_bridge_session_dir,
)
from plugins._whatsapp_integration.helpers.wa_client import get_qr
session_dir = get_bridge_session_dir()
cache_dir = get_bridge_media_dir()
base_url = get_bridge_url(port)
# Start bridge if not running
if not is_process_alive():
try:
ok = await ensure_bridge_http_up(
port, session_dir, cache_dir, mode=mode,
)
if not ok:
return {
"status": "error",
"message": "Failed to start bridge",
"qr": None,
}
except Exception as e:
return {
"status": "error",
"message": format_error(e),
"qr": None,
}
# Fetch QR status from bridge
try:
data = await get_qr(base_url)
status = data.get("status", "error")
qr = data.get("qr")
if status == "connected":
return {
"status": "connected",
"message": "WhatsApp is already connected",
"qr": None,
}
if status == "waiting_scan" and qr:
return {
"status": "waiting_scan",
"message": "Scan the QR code with WhatsApp",
"qr": qr,
}
return {
"status": "waiting_qr",
"message": "Generating QR code...",
"qr": None,
}
except Exception as e:
return {
"status": "error",
"message": f"Bridge not reachable: {format_error(e)}",
"qr": None,
}

View file

@ -0,0 +1,41 @@
"""Start WhatsApp bridge immediately."""
from helpers.api import ApiHandler, Request
from helpers.errors import format_error
from helpers import plugins
PLUGIN_NAME = "_whatsapp_integration"
class Start(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
config = plugins.get_plugin_config(PLUGIN_NAME) or {}
port = int(config.get("bridge_port", 3100))
mode = config.get("mode", "self-chat")
from plugins._whatsapp_integration.helpers.bridge_manager import (
ensure_bridge_http_up,
is_process_alive,
)
from plugins._whatsapp_integration.helpers.storage_paths import (
get_bridge_media_dir,
get_bridge_session_dir,
)
session_dir = get_bridge_session_dir()
cache_dir = get_bridge_media_dir()
if is_process_alive():
return {"success": True, "message": "Bridge already running"}
try:
ok = await ensure_bridge_http_up(
port, session_dir, cache_dir, mode=mode,
)
if ok:
return {"success": True, "message": "Bridge started"}
return {"success": False, "message": "Failed to start bridge"}
except Exception as e:
return {"success": False, "message": format_error(e)}

View file

@ -0,0 +1,50 @@
"""Test WhatsApp bridge connection health."""
from helpers.api import ApiHandler, Request
from helpers.errors import format_error
from helpers import plugins
PLUGIN_NAME = "_whatsapp_integration"
class TestConnection(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
config = input.get("config") or plugins.get_plugin_config(PLUGIN_NAME) or {}
port = int(config.get("bridge_port", 3100))
results: list[dict] = []
await self._test_bridge(port, results)
ok = all(r["ok"] for r in results)
return {"success": ok, "results": results}
async def _test_bridge(self, port: int, results: list[dict]) -> None:
from plugins._whatsapp_integration.helpers.wa_client import get_health
from plugins._whatsapp_integration.helpers.bridge_manager import get_bridge_url
try:
health = await get_health(get_bridge_url(port))
status = health.get("status", "unknown")
queue = health.get("queueLength", 0)
uptime = health.get("uptime", 0)
if status == "connected":
results.append({
"test": "Bridge",
"ok": True,
"message": f"Connected (uptime: {uptime:.0f}s, queue: {queue})",
})
else:
results.append({
"test": "Bridge",
"ok": False,
"message": f"Bridge running but status: {status}",
})
except Exception as e:
results.append({
"test": "Bridge",
"ok": False,
"message": f"Bridge not reachable: {format_error(e)}",
})

View file

@ -0,0 +1,12 @@
enabled: false
mode: self-chat
# dedicated: separate phone number — people message it directly
# self-chat: personal number — you can message yourself, and the agent can also handle messages sent to your number
bridge_port: 3100
poll_interval_seconds: 3
allow_group: false
# allow_group: respond in group chats when mentioned or replied to
allowed_numbers: []
# Example: ["34652029134", "1234567890"]
project: ""
agent_instructions: ""

View file

@ -0,0 +1,121 @@
"""WhatsApp poll loop — start bridge and poll for incoming messages."""
import asyncio
from typing import Any
from helpers.extension import Extension
from helpers.errors import format_error
from helpers.print_style import PrintStyle
from helpers import plugins
PLUGIN_NAME: str = "_whatsapp_integration"
DEFAULT_INTERVAL: int = 3
MIN_INTERVAL: int = 2
MAX_CONSECUTIVE_FAILURES: int = 5
# ------------------------------------------------------------------
# Extension entry point
# ------------------------------------------------------------------
class WhatsAppAutoPoll(Extension):
async def execute(self, **kwargs: Any) -> None:
import plugins._whatsapp_integration.helpers.handler as handler_mod
config = plugins.get_plugin_config(PLUGIN_NAME) or {}
enabled = config.get("enabled", False)
if not enabled:
if handler_mod._poll_task and not handler_mod._poll_task.done():
handler_mod._poll_task.cancel()
handler_mod._poll_task = None
return
if not handler_mod._poll_task or handler_mod._poll_task.done():
handler_mod._poll_task = asyncio.create_task(_poll_loop())
# ------------------------------------------------------------------
# Poll loop
# ------------------------------------------------------------------
async def _poll_loop() -> None:
from plugins._whatsapp_integration.helpers import bridge_manager
from plugins._whatsapp_integration.helpers.handler import poll_messages
from plugins._whatsapp_integration.helpers.storage_paths import (
get_bridge_media_dir,
get_bridge_session_dir,
)
bridge_started = False
consecutive_failures = 0
try:
while True:
config = plugins.get_plugin_config(PLUGIN_NAME) or {}
if not config.get("enabled", False):
break
if PLUGIN_NAME not in plugins.get_enabled_plugins(None):
break
port = int(config.get("bridge_port", 3100))
session_dir = get_bridge_session_dir()
cache_dir = get_bridge_media_dir()
mode = config.get("mode", "self-chat")
# Detect config changes that require bridge restart
desired = {"port": port, "mode": mode}
running = bridge_manager.get_running_config()
if bridge_started and bridge_manager.is_process_alive() and running != desired:
PrintStyle.info(f"WhatsApp: config changed, restarting bridge")
await bridge_manager.stop_bridge()
bridge_started = False
consecutive_failures = 0
# Start bridge if needed
if not bridge_started or not bridge_manager.is_process_alive():
try:
bridge_started = await bridge_manager.start_bridge(
port, session_dir, cache_dir, mode=mode,
)
if bridge_started:
consecutive_failures = 0
else:
consecutive_failures += 1
except FileNotFoundError:
PrintStyle.error(
"WhatsApp: Node.js is not installed. "
"Stopping poll loop — install Node.js and re-enable the plugin."
)
break
except Exception as e:
consecutive_failures += 1
PrintStyle.error(f"WhatsApp bridge start error: {format_error(e)}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
PrintStyle.error(
f"WhatsApp: bridge failed {consecutive_failures} times in a row, "
f"stopping poll loop. Disable and re-enable the plugin to retry."
)
break
if not bridge_started:
await asyncio.sleep(10)
continue
try:
await poll_messages(config)
except Exception as e:
PrintStyle.error(f"WhatsApp poll error: {format_error(e)}")
sleep_sec = max(config.get("poll_interval_seconds", DEFAULT_INTERVAL), MIN_INTERVAL)
await asyncio.sleep(sleep_sec)
finally:
# Ensure bridge stops when poll loop exits (plugin disabled or task cancelled)
try:
if bridge_manager.is_process_alive():
await bridge_manager.stop_bridge()
except Exception:
pass

View file

@ -0,0 +1,33 @@
from helpers.extension import Extension
from helpers import plugins
from plugins._whatsapp_integration.helpers.handler import (
CTX_WA_CHAT_ID,
CTX_WA_TYPING_ACTIVE,
)
from plugins._whatsapp_integration.helpers import bridge_manager, wa_client
PLUGIN_NAME = "_whatsapp_integration"
class WhatsAppTypingCleanup(Extension):
async def execute(self, **kwargs): # type: ignore[override]
if not self.agent:
return
context = self.agent.context
if not context.data.get(CTX_WA_TYPING_ACTIVE):
return
chat_id = context.data.get(CTX_WA_CHAT_ID, "")
if not chat_id:
return
config = plugins.get_plugin_config(PLUGIN_NAME) or {}
port = int(config.get("bridge_port", 3100))
base_url = bridge_manager.get_bridge_url(port)
await wa_client.send_typing(base_url, chat_id, paused=True)
context.data[CTX_WA_TYPING_ACTIVE] = False

View file

@ -0,0 +1,80 @@
"""Auto-send WhatsApp reply when agent responds in a WhatsApp session."""
import asyncio
from helpers.extension import Extension
from helpers.print_style import PrintStyle
from agent import AgentContext, LoopData, UserMessage
from plugins._whatsapp_integration.helpers.handler import CTX_WA_CHAT_ID, CTX_WA_ATTACHMENTS, CTX_WA_REPLY_TO
MAX_SEND_RETRIES: int = 2
CTX_SEND_FAILURES: str = "_wa_send_failures"
class WhatsAppAutoReply(Extension):
async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
if not self.agent or self.agent.number != 0:
return
context = self.agent.context
if not context.data.get(CTX_WA_CHAT_ID):
return
response_text = _extract_last_response(context)
if not response_text:
return
attachments = context.data.pop(CTX_WA_ATTACHMENTS, [])
reply_to = context.data.pop(CTX_WA_REPLY_TO, "")
if attachments:
PrintStyle.info(f"WhatsApp: sending reply with {len(attachments)} attachment(s)")
asyncio.create_task(self._send_reply(context, response_text, attachments, reply_to))
async def _send_reply(
self, context: AgentContext, response_text: str, attachments: list[str],
reply_to: str = "",
):
from plugins._whatsapp_integration.helpers.handler import send_wa_reply
error = await send_wa_reply(context, response_text, attachments, reply_to=reply_to)
if not error:
context.data[CTX_SEND_FAILURES] = 0
return
failures = context.data.get(CTX_SEND_FAILURES, 0) + 1
context.data[CTX_SEND_FAILURES] = failures
if failures <= MAX_SEND_RETRIES:
_notify_agent_of_failure(context, error, failures)
else:
PrintStyle.error(
f"WhatsApp send failed {failures} times, giving up: {error}"
)
context.log.log(
type="error",
heading="WhatsApp send failed (max retries reached)",
content=error,
)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _extract_last_response(context: AgentContext) -> str:
with context.log._lock:
logs = list(context.log.logs)
if not logs:
return ""
for item in reversed(logs):
if item.type == "response":
return item.content or ""
return ""
def _notify_agent_of_failure(
context: AgentContext, error: str, attempt: int,
):
msg = context.agent0.read_prompt(
"fw.wa.send_failed.md", error=error, attempt=str(attempt),
max_retries=str(MAX_SEND_RETRIES),
)
context.log.log(type="error", heading="WhatsApp send failed", content=error)
context.communicate(UserMessage(message="", system_message=[msg]))

View file

@ -0,0 +1,32 @@
"""Inject WhatsApp conversation context into system prompt."""
from helpers.extension import Extension
from helpers import plugins
from agent import LoopData
from plugins._whatsapp_integration.helpers.handler import CTX_WA_CHAT_ID, PLUGIN_NAME
class WhatsAppContextPrompt(Extension):
async def execute(
self,
system_prompt: list[str] = [],
loop_data: LoopData = LoopData(),
**kwargs,
):
if not self.agent:
return
if self.agent.context.data.get(CTX_WA_CHAT_ID):
system_prompt.append(
self.agent.read_prompt("fw.wa.system_context_reply.md")
)
config = plugins.get_plugin_config(PLUGIN_NAME) or {}
instructions = config.get("agent_instructions", "")
if instructions:
system_prompt.append(
self.agent.read_prompt(
"fw.wa.user_message_instructions.md",
instructions=instructions,
)
)

View file

@ -0,0 +1,64 @@
"""Intercept response tool in WhatsApp sessions for attachments and break_loop."""
from helpers.extension import Extension
from helpers.print_style import PrintStyle
from helpers.tool import Response
from plugins._whatsapp_integration.helpers.handler import CTX_WA_CHAT_ID, CTX_WA_ATTACHMENTS, CTX_WA_REPLY_TO
class WhatsAppResponseIntercept(Extension):
async def execute(
self, tool_name: str = "", response: Response | None = None, **kwargs,
):
if tool_name != "response":
return
if not self.agent:
return
context = self.agent.context
if not context.data.get(CTX_WA_CHAT_ID):
return
tool = self.agent.loop_data.current_tool
if not tool:
return
# Capture attachments and reply_to for later (process_chain_end) or inline send
attachments = tool.args.get("attachments", [])
if attachments:
context.data[CTX_WA_ATTACHMENTS] = attachments
reply_to = tool.args.get("reply_to", "")
if reply_to:
context.data[CTX_WA_REPLY_TO] = reply_to
# Check break_loop arg from agent
agent_break = tool.args.get("break_loop", True)
if agent_break is False and response:
await self._send_inline(context, tool, response)
async def _send_inline(
self, context, tool, response: Response,
):
from plugins._whatsapp_integration.helpers.handler import send_wa_reply
agent = self.agent
assert agent is not None
text = tool.args.get("text", tool.args.get("message", ""))
attachments = context.data.pop(CTX_WA_ATTACHMENTS, [])
reply_to = context.data.pop(CTX_WA_REPLY_TO, "")
if attachments:
PrintStyle.info(f"WhatsApp: sending update with {len(attachments)} attachment(s)")
error = await send_wa_reply(context, text, attachments or None, reply_to=reply_to, keep_typing=True)
if error:
result = agent.read_prompt("fw.wa.update_error.md", error=error)
else:
result = agent.read_prompt("fw.wa.update_ok.md")
# Override response: don't break loop, add result to history
response.break_loop = False
response.message = result
agent.hist_add_tool_result("response", result)

View file

@ -0,0 +1,40 @@
"""
Read attachment files from execution runtime.
No agent/tool dependencies.
"""
import base64
import os
from typing import TypedDict
# ------------------------------------------------------------------
# Data models
# ------------------------------------------------------------------
class AttachmentData(TypedDict):
name: str
content_b64: str
error: str
# ------------------------------------------------------------------
# File reader
# ------------------------------------------------------------------
def read_attachment(path: str) -> AttachmentData:
try:
if not os.path.isfile(path):
return AttachmentData(
name="", content_b64="", error=f"file not found: {path}")
name = os.path.basename(path)
with open(path, "rb") as f:
content = f.read()
return AttachmentData(
name=name,
content_b64=base64.b64encode(content).decode(),
error="",
)
except Exception as e:
return AttachmentData(name="", content_b64="", error=str(e))

View file

@ -0,0 +1,35 @@
"""
Write attachment files into execution runtime.
No agent/tool dependencies.
"""
import base64
from typing import TypedDict
# ------------------------------------------------------------------
# Data models
# ------------------------------------------------------------------
class WriteResult(TypedDict):
path: str
error: str
# ------------------------------------------------------------------
# File writer
# ------------------------------------------------------------------
def write_attachment(rel_path: str, content_b64: str) -> WriteResult:
try:
from helpers import files
abs_path = files.get_abs_path(rel_path)
files.make_dirs(abs_path)
content = base64.b64decode(content_b64)
files.write_file_bin(abs_path, content)
return WriteResult(path=abs_path, error="")
except Exception as e:
return WriteResult(path="", error=str(e))

View file

@ -0,0 +1,548 @@
"""
WhatsApp bridge subprocess manager.
No agent/tool dependencies.
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import os
import platform
import shutil
import subprocess
import threading
from collections import deque
from pathlib import Path
from typing import Any, Sequence
from helpers.print_style import PrintStyle
_bridge_lock: asyncio.Lock | None = None
_bridge_lock_loop: asyncio.AbstractEventLoop | None = None
_bridge_config: dict = {} # config the running bridge was started with
MAX_STARTUP_LOG_LINES = 80
STARTUP_WAIT_ATTEMPTS = 20
STARTUP_WAIT_SECONDS = 0.5
DEPENDENCY_FAILURE_MARKERS = (
"ERR_MODULE_NOT_FOUND",
"MODULE_NOT_FOUND",
"Cannot find module",
"Cannot find package",
)
# ------------------------------------------------------------------
# Process wrapper with destructor
# ------------------------------------------------------------------
class _BridgeProcess:
"""Thin wrapper around Popen — kills the process on garbage collection."""
def __init__(self, process: subprocess.Popen, port: int):
self._process = process
self._port = port
self._recent_output: deque[str] = deque(maxlen=MAX_STARTUP_LOG_LINES)
def poll(self) -> int | None:
return self._process.poll()
def terminate(self) -> None:
self._process.terminate()
def wait(self, timeout: float | None = None) -> int:
return self._process.wait(timeout=timeout)
def kill(self) -> None:
self._process.kill()
def remember_output(self, text: str) -> None:
self._recent_output.append(text)
def recent_output(self) -> str:
return "\n".join(self._recent_output)
@property
def stdout(self):
return self._process.stdout
def __del__(self) -> None:
try:
if self._process.poll() is None:
PrintStyle.error("WhatsApp: bridge still running on GC, killing")
self._process.terminate()
try:
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
_kill_port_process(self._port)
except Exception as e:
PrintStyle.error(f"WhatsApp: bridge destructor error: {e}")
_bridge_process: _BridgeProcess | None = None
REPO_ROOT = Path(__file__).resolve().parents[3]
BRIDGE_DIR = str(Path(__file__).parent.parent / "whatsapp-bridge")
BRIDGE_SCRIPT = os.path.join(BRIDGE_DIR, "bridge.js")
BRIDGE_PACKAGE_JSON = os.path.join(BRIDGE_DIR, "package.json")
BRIDGE_PACKAGE_LOCK = os.path.join(BRIDGE_DIR, "package-lock.json")
BRIDGE_RUNTIME_DIR = os.path.join(REPO_ROOT, "usr", "whatsapp", "bridge-runtime")
BRIDGE_INSTALL_STATE = os.path.join(BRIDGE_RUNTIME_DIR, "deps-state.json")
BRIDGE_NPM_CACHE = os.path.join(BRIDGE_RUNTIME_DIR, "npm-cache")
NODE_MODULES_DIR = os.path.join(BRIDGE_DIR, "node_modules")
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
async def start_bridge(
port: int,
session_dir: str,
cache_dir: str,
mode: str = "self-chat",
) -> bool:
async with _get_bridge_lock():
return await _ensure_bridge_started(
port=port,
session_dir=session_dir,
cache_dir=cache_dir,
mode=mode,
require_connection=True,
start_label="WhatsApp: starting bridge",
)
async def stop_bridge() -> None:
async with _get_bridge_lock():
_stop_bridge_process()
async def is_bridge_running(port: int) -> bool:
if not _bridge_process or _bridge_process.poll() is not None:
return False
return await _check_health(port)
def get_bridge_url(port: int) -> str:
return f"http://127.0.0.1:{port}"
async def ensure_bridge_http_up(
port: int,
session_dir: str,
cache_dir: str,
mode: str = "self-chat",
) -> bool:
"""Start bridge if needed and wait for HTTP server only (not WA connection)."""
async with _get_bridge_lock():
return await _ensure_bridge_started(
port=port,
session_dir=session_dir,
cache_dir=cache_dir,
mode=mode,
require_connection=False,
start_label="WhatsApp: starting bridge for pairing",
)
def is_process_alive() -> bool:
return _bridge_process is not None and _bridge_process.poll() is None
def get_running_config() -> dict:
return dict(_bridge_config)
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _get_bridge_lock() -> asyncio.Lock:
global _bridge_lock, _bridge_lock_loop
loop = asyncio.get_running_loop()
if _bridge_lock is None or _bridge_lock_loop is not loop:
_bridge_lock = asyncio.Lock()
_bridge_lock_loop = loop
return _bridge_lock
async def _ensure_bridge_started(
*,
port: int,
session_dir: str,
cache_dir: str,
mode: str,
require_connection: bool,
start_label: str,
) -> bool:
global _bridge_process
if _bridge_process and _bridge_process.poll() is None:
if require_connection:
return True
if await _check_http_up(port):
return True
PrintStyle.warning("WhatsApp: bridge is running but HTTP is not responding, restarting")
_stop_bridge_process()
await _ensure_bridge_dependencies()
attempt = 0
while attempt < 2:
attempt += 1
success, output = await _start_bridge_once(
port=port,
session_dir=session_dir,
cache_dir=cache_dir,
mode=mode,
require_connection=require_connection,
start_label=start_label,
)
if success:
return True
if attempt == 1 and _looks_like_dependency_failure(output):
PrintStyle.warning(
"WhatsApp: bridge startup looks like a dependency issue, "
"reinstalling dependencies and retrying"
)
await _ensure_bridge_dependencies(force_reinstall=True)
continue
return False
return False
async def _start_bridge_once(
*,
port: int,
session_dir: str,
cache_dir: str,
mode: str,
require_connection: bool,
start_label: str,
) -> tuple[bool, str]:
global _bridge_process
cmd = [
"node", BRIDGE_SCRIPT,
"--port", str(port),
"--session", session_dir,
"--cache-dir", cache_dir,
"--mode", mode,
]
_kill_port_process(port)
PrintStyle.info(start_label)
_bridge_process = _BridgeProcess(subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=BRIDGE_DIR,
), port)
_start_log_reader(_bridge_process)
_bridge_config.clear()
_bridge_config.update({"port": port, "mode": mode})
healthy, output = await _wait_for_bridge_startup(
port=port,
require_connection=require_connection,
)
if healthy:
return True, output
if output:
PrintStyle.error(f"WhatsApp: bridge startup failed\n{output}")
return False, output
async def _wait_for_bridge_startup(*, port: int, require_connection: bool) -> tuple[bool, str]:
for _ in range(STARTUP_WAIT_ATTEMPTS):
await asyncio.sleep(STARTUP_WAIT_SECONDS)
process = _bridge_process
if process is None:
return False, ""
if process.poll() is not None:
output = _summarize_output(process.recent_output())
PrintStyle.error("WhatsApp: bridge process exited unexpectedly")
_clear_bridge_process()
return False, output
if require_connection:
if await _check_health(port):
return True, process.recent_output()
else:
if await _check_http_up(port):
return True, process.recent_output()
if require_connection:
PrintStyle.warning("WhatsApp: bridge started but not yet connected")
process = _bridge_process
return True, process.recent_output() if process else ""
process = _bridge_process
return False, _summarize_output(process.recent_output()) if process else ""
def _looks_like_dependency_failure(output: str) -> bool:
return any(marker in output for marker in DEPENDENCY_FAILURE_MARKERS)
async def _check_health(port: int) -> bool:
try:
from plugins._whatsapp_integration.helpers.wa_client import get_health
health = await get_health(get_bridge_url(port))
return health.get("status") == "connected"
except Exception:
return False
async def _check_http_up(port: int) -> bool:
try:
from plugins._whatsapp_integration.helpers.wa_client import get_health
await get_health(get_bridge_url(port))
return True
except Exception:
return False
async def _ensure_bridge_dependencies(force_reinstall: bool = False) -> None:
expected_state = await _build_dependency_state()
if not force_reinstall:
install_state = _load_dependency_state()
if os.path.isdir(NODE_MODULES_DIR) and await _validate_bridge_dependencies():
if install_state is None:
_write_dependency_state(expected_state)
return
if install_state == expected_state:
return
await _reinstall_bridge_dependencies()
if not await _validate_bridge_dependencies():
raise RuntimeError("WhatsApp: bridge dependencies failed validation after reinstall")
_write_dependency_state(await _build_dependency_state())
async def _build_dependency_state() -> dict[str, Any]:
return {
"package_json_hash": _sha256_file(BRIDGE_PACKAGE_JSON),
"package_lock_hash": _sha256_file(BRIDGE_PACKAGE_LOCK) if os.path.isfile(BRIDGE_PACKAGE_LOCK) else "",
"platform": platform.system(),
"arch": platform.machine(),
"node_version": (await _run_subprocess(["node", "--version"], cwd=BRIDGE_DIR)).strip(),
"npm_version": (await _run_subprocess(["npm", "--version"], cwd=BRIDGE_DIR)).strip(),
}
async def _validate_bridge_dependencies() -> bool:
dependency_names = _bridge_dependency_names()
if not dependency_names or not os.path.isdir(NODE_MODULES_DIR):
return False
imports = ", ".join(json.dumps(name) for name in dependency_names)
script = (
f"for (const name of [{imports}]) {{ await import(name); }}\n"
"process.stdout.write('ok');\n"
)
try:
output = await _run_subprocess(
["node", "--input-type=module", "--eval", script],
cwd=BRIDGE_DIR,
)
except RuntimeError as e:
PrintStyle.warning(f"WhatsApp: dependency validation failed: {e}")
return False
return output.strip() == "ok"
async def _reinstall_bridge_dependencies() -> None:
_ensure_runtime_dir()
if os.path.isdir(NODE_MODULES_DIR):
PrintStyle.warning("WhatsApp: bridge dependencies missing, outdated, or corrupt; reinstalling")
shutil.rmtree(NODE_MODULES_DIR, ignore_errors=True)
else:
PrintStyle.info("WhatsApp: installing bridge dependencies")
if os.path.isfile(BRIDGE_INSTALL_STATE):
try:
os.remove(BRIDGE_INSTALL_STATE)
except OSError:
pass
commands: list[list[str]] = []
if os.path.isfile(BRIDGE_PACKAGE_LOCK):
commands.append(["npm", "ci", "--omit=dev", "--no-audit", "--no-fund"])
commands.append(["npm", "install", "--omit=dev", "--no-audit", "--no-fund"])
env = {"npm_config_cache": BRIDGE_NPM_CACHE}
last_error: RuntimeError | None = None
for command in commands:
try:
await _run_subprocess(command, cwd=BRIDGE_DIR, env=env)
return
except RuntimeError as e:
last_error = e
PrintStyle.warning(f"WhatsApp: {' '.join(command)} failed: {e}")
raise RuntimeError(str(last_error) if last_error else "npm install failed")
def _bridge_dependency_names() -> list[str]:
with open(BRIDGE_PACKAGE_JSON, "r", encoding="utf-8") as f:
package_json = json.load(f)
dependencies = package_json.get("dependencies") or {}
return sorted(dependencies.keys())
def _load_dependency_state() -> dict[str, Any] | None:
if not os.path.isfile(BRIDGE_INSTALL_STATE):
return None
try:
with open(BRIDGE_INSTALL_STATE, "r", encoding="utf-8") as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return None
def _write_dependency_state(state: dict[str, Any]) -> None:
_ensure_runtime_dir()
with open(BRIDGE_INSTALL_STATE, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, sort_keys=True)
def _sha256_file(path: str) -> str:
digest = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
digest.update(chunk)
return digest.hexdigest()
def _ensure_runtime_dir() -> None:
os.makedirs(BRIDGE_RUNTIME_DIR, exist_ok=True)
async def _run_subprocess(
command: Sequence[str],
*,
cwd: str,
env: dict[str, str] | None = None,
) -> str:
process_env = os.environ.copy()
if env:
process_env.update(env)
proc = await asyncio.create_subprocess_exec(
*command,
cwd=cwd,
env=process_env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout, _ = await proc.communicate()
output = stdout.decode("utf-8", errors="replace").strip()
if proc.returncode != 0:
raise RuntimeError(output or f"{' '.join(command)} exited with code {proc.returncode}")
return output
def _stop_bridge_process() -> None:
global _bridge_process
if not _bridge_process:
return
try:
_bridge_process.terminate()
try:
_bridge_process.wait(timeout=5)
except subprocess.TimeoutExpired:
_bridge_process.kill()
except Exception:
pass
_clear_bridge_process()
PrintStyle.info("WhatsApp: bridge stopped")
def _clear_bridge_process() -> None:
global _bridge_process
_bridge_process = None
_bridge_config.clear()
def _kill_port_process(port: int) -> None:
"""Kill any orphaned process listening on the given TCP port."""
try:
system = platform.system()
if system == "Windows":
result = subprocess.run(
["netstat", "-ano", "-p", "TCP"],
capture_output=True, text=True, timeout=5,
)
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) >= 5 and parts[3] == "LISTENING":
if parts[1].endswith(f":{port}"):
try:
subprocess.run(
["taskkill", "/PID", parts[4], "/F"],
capture_output=True, timeout=5,
)
except subprocess.SubprocessError:
pass
elif system == "Darwin":
result = subprocess.run(
["lsof", "-ti", f"tcp:{port}"],
capture_output=True, text=True, timeout=5,
)
for pid_str in result.stdout.strip().splitlines():
try:
os.kill(int(pid_str.strip()), 9)
except (ValueError, OSError):
pass
else:
result = subprocess.run(
["fuser", f"{port}/tcp"],
capture_output=True, timeout=5,
)
if result.returncode == 0:
subprocess.run(
["fuser", "-k", f"{port}/tcp"],
capture_output=True, timeout=5,
)
except Exception:
pass
def _start_log_reader(process: _BridgeProcess) -> None:
def _reader() -> None:
assert process.stdout
for line in iter(process.stdout.readline, b""):
text = line.decode("utf-8", errors="replace").rstrip()
if text:
process.remember_output(text)
PrintStyle.standard(f"WhatsApp bridge: {text}")
process.stdout.close()
thread = threading.Thread(target=_reader, daemon=True)
thread.start()
def _summarize_output(output: str, max_lines: int = 12) -> str:
if not output:
return ""
lines = [line for line in output.splitlines() if line.strip()]
return "\n".join(lines[-max_lines:])

View file

@ -0,0 +1,406 @@
"""
WhatsApp handler orchestrates poll, dispatch, and reply.
Requires agent context.
"""
import asyncio
import base64
import os
import re
import uuid
from agent import Agent, AgentContext, UserMessage
from helpers import plugins, files, runtime
from helpers import message_queue as mq
from helpers.persist_chat import save_tmp_chat
from helpers.print_style import PrintStyle
from helpers.errors import format_error
from initialize import initialize_agent
from plugins._whatsapp_integration.helpers import wa_client
from plugins._whatsapp_integration.helpers import bridge_manager
from plugins._whatsapp_integration.helpers.number_utils import (
normalize_allowed_numbers,
normalize_number,
)
PLUGIN_NAME = "_whatsapp_integration"
MEDIA_FOLDER = "usr/whatsapp/media"
# Context data keys (no underscore prefix — must persist across restarts)
CTX_WA_CHAT_ID = "wa_chat_id"
CTX_WA_SENDER_NAME = "wa_sender_name"
CTX_WA_SENDER_NUMBER = "wa_sender_number"
CTX_WA_IS_GROUP = "wa_is_group"
CTX_WA_LAST_BODY = "wa_last_body"
CTX_WA_LAST_MSG_ID = "wa_last_msg_id"
# Transient — consumed per-reply, not persisted
CTX_WA_ATTACHMENTS = "_wa_response_attachments"
CTX_WA_REPLY_TO = "_wa_reply_to"
CTX_WA_TYPING_ACTIVE = "_wa_typing_active"
# Poll task — lives here (not in extension module) because
# extension modules are re-executed on each job_loop tick,
# which would reset module-level state and orphan running tasks.
_poll_task: asyncio.Task | None = None # type: ignore[type-arg]
# ------------------------------------------------------------------
# Poll loop
# ------------------------------------------------------------------
async def _refresh_typing(base_url: str) -> None:
"""Re-send composing for all contexts with active typing flag."""
for ctx in AgentContext._contexts.values():
if not isinstance(ctx, AgentContext):
continue
if not ctx.data.get(CTX_WA_TYPING_ACTIVE):
continue
chat_id = ctx.data.get(CTX_WA_CHAT_ID, "")
if chat_id:
await wa_client.send_typing(base_url, chat_id)
async def poll_messages(config: dict) -> None:
if not config.get("enabled", False):
return
if PLUGIN_NAME not in plugins.get_enabled_plugins(None):
return
port = int(config.get("bridge_port", 3100))
base_url = bridge_manager.get_bridge_url(port)
# Refresh typing indicator for active sessions (beats 25s WhatsApp timeout)
await _refresh_typing(base_url)
try:
messages = await wa_client.get_messages(base_url)
except Exception as e:
PrintStyle.error(f"WhatsApp poll error: {format_error(e)}")
return
if not messages:
return
# Allowed-numbers filtering is authoritative in Python.
allowed_set = normalize_allowed_numbers(config.get("allowed_numbers"))
for msg in messages:
try:
# Filter by allowed numbers if configured
if allowed_set:
sender_num = normalize_number(msg.get("senderNumber", "") or msg.get("senderId", ""))
if sender_num not in allowed_set:
PrintStyle.debug(
f"WhatsApp: ignored message from {sender_num} "
f"(senderId: {msg.get('senderId', '')}, allowed: {allowed_set})"
)
continue
await _dispatch_message(config, msg)
except Exception as e:
PrintStyle.error(f"WhatsApp dispatch error: {format_error(e)}")
# ------------------------------------------------------------------
# Dispatch a single inbound message
# ------------------------------------------------------------------
async def _dispatch_message(config: dict, msg: dict) -> None:
chat_id = msg.get("chatId", "")
is_group = msg.get("isGroup", False)
# Group filtering: skip unless allow_group enabled AND bot was mentioned or replied to
if is_group:
if not config.get("allow_group", False):
PrintStyle.debug(f"WhatsApp: skipping group message (allow_group disabled)")
return
if not msg.get("mentionedMe", False) and not msg.get("repliedToMe", False):
PrintStyle.debug(f"WhatsApp: skipping group message (not mentioned or replied to)")
return
# Show typing indicator immediately so user sees activity
port = int(config.get("bridge_port", 3100))
base_url = bridge_manager.get_bridge_url(port)
await wa_client.send_typing(base_url, chat_id)
existing = _find_chats_by_jid(chat_id)
if existing:
# Continue most recent chat for this JID
await _route_to_chat(msg, existing[0])
else:
await _start_new_chat(config, msg)
# ------------------------------------------------------------------
# Chat creation and routing
# ------------------------------------------------------------------
async def _start_new_chat(config: dict, msg: dict) -> None:
from helpers import projects
sender_name = msg.get("senderName", "Unknown")
sender_number = msg.get("senderNumber", "") or normalize_number(msg.get("senderId", ""))
chat_id = msg.get("chatId", "")
is_group = msg.get("isGroup", False)
agent_config = initialize_agent()
context = AgentContext(agent_config, name=f"WhatsApp: {sender_name[:50]}")
context.data[CTX_WA_CHAT_ID] = chat_id
context.data[CTX_WA_SENDER_NAME] = sender_name
context.data[CTX_WA_SENDER_NUMBER] = sender_number
context.data[CTX_WA_IS_GROUP] = is_group
context.data[CTX_WA_LAST_BODY] = msg.get("body", "")
context.data[CTX_WA_LAST_MSG_ID] = msg.get("messageId", "")
context.data[CTX_WA_TYPING_ACTIVE] = True
project = config.get("project", "")
if project:
projects.activate_project(context.id, project)
save_tmp_chat(context)
user_msg = _build_user_message(context.agent0, msg)
system_ctx = context.agent0.read_prompt("fw.wa.system_context.md")
msg_id = str(uuid.uuid4())
media_urls = msg.get("mediaUrls", [])
attachments = await _save_incoming_media(media_urls) if media_urls else []
mq.log_user_message(
context, user_msg, attachments, message_id=msg_id, source=" (whatsapp)",
)
context.communicate(UserMessage(
message=user_msg,
system_message=[system_ctx],
attachments=attachments,
id=msg_id,
))
PrintStyle.success(
f"WhatsApp: new chat {context.id} for {sender_name} ({sender_number})"
)
async def _route_to_chat(
msg: dict, context_id: str,
) -> None:
context = AgentContext.get(context_id)
if not context:
return
context.data[CTX_WA_LAST_BODY] = msg.get("body", "")
context.data[CTX_WA_LAST_MSG_ID] = msg.get("messageId", "")
context.data[CTX_WA_TYPING_ACTIVE] = True
user_msg = _build_user_message(context.agent0, msg)
msg_id = str(uuid.uuid4())
media_urls = msg.get("mediaUrls", [])
attachments = await _save_incoming_media(media_urls) if media_urls else []
mq.log_user_message(
context, user_msg, attachments, message_id=msg_id, source=" (whatsapp)",
)
context.communicate(UserMessage(
message=user_msg,
attachments=attachments,
id=msg_id,
))
save_tmp_chat(context)
PrintStyle.info(f"WhatsApp: continuing chat {context_id}")
# ------------------------------------------------------------------
# Chat discovery
# ------------------------------------------------------------------
def _find_chats_by_jid(chat_id: str) -> list[str]:
"""Return context IDs for chats matching the given WhatsApp JID, newest first."""
results = []
for ctx_id, ctx in AgentContext._contexts.items():
if not isinstance(ctx, AgentContext):
continue
if ctx.data.get(CTX_WA_CHAT_ID) != chat_id:
continue
results.append(ctx_id)
results.sort(reverse=True)
return results
# ------------------------------------------------------------------
# Markdown to WhatsApp formatting
# ------------------------------------------------------------------
def _md_to_whatsapp(text: str) -> str:
"""Convert markdown formatting to WhatsApp formatting."""
# Protect code blocks from conversion
code_blocks: list[str] = []
def _save_code(m: re.Match) -> str:
code_blocks.append(m.group(0))
return f"\x00CB{len(code_blocks) - 1}\x00"
text = re.sub(r"```[\s\S]*?```", _save_code, text)
# Protect inline code
inline_codes: list[str] = []
def _save_inline(m: re.Match) -> str:
inline_codes.append(m.group(0))
return f"\x00IC{len(inline_codes) - 1}\x00"
text = re.sub(r"`[^`]+`", _save_inline, text)
# Bold+italic ***text*** → *_text_*
text = re.sub(r"\*{3}(.+?)\*{3}", r"*_\1_*", text)
# Bold **text** or __text__ → *text*
text = re.sub(r"\*{2}(.+?)\*{2}", r"*\1*", text)
text = re.sub(r"__(.+?)__", r"*\1*", text)
# Italic _text_ stays _text_ (same in WhatsApp)
# Strikethrough ~~text~~ → ~text~
text = re.sub(r"~~(.+?)~~", r"~\1~", text)
# Headings → bold
text = re.sub(r"^#{1,6}\s+(.+)$", r"*\1*", text, flags=re.MULTILINE)
# Restore code blocks and inline code
for i, block in enumerate(code_blocks):
text = text.replace(f"\x00CB{i}\x00", block)
for i, code in enumerate(inline_codes):
text = text.replace(f"\x00IC{i}\x00", code)
return text
# ------------------------------------------------------------------
# Message builders
# ------------------------------------------------------------------
def _build_user_message(agent: Agent, msg: dict) -> str:
sender_name = msg.get("senderName", "Unknown")
sender_number = msg.get("senderNumber", "") or normalize_number(msg.get("senderId", ""))
is_group = msg.get("isGroup", False)
prompt = "fw.wa.user_message_group.md" if is_group else "fw.wa.user_message.md"
text = agent.read_prompt(
prompt,
sender_name=sender_name,
sender_number=sender_number,
group_name=msg.get("chatName", ""),
message_id=msg.get("messageId", ""),
body=msg.get("body", ""),
)
return text
# ------------------------------------------------------------------
# Reply sending (called from process_chain_end extension)
# ------------------------------------------------------------------
async def send_wa_reply(
context: AgentContext,
response_text: str,
attachments: list[str] | None = None,
reply_to: str = "",
keep_typing: bool = False,
) -> str | None:
chat_id = context.data.get(CTX_WA_CHAT_ID)
if not chat_id:
return "No WhatsApp chat ID"
config = plugins.get_plugin_config(PLUGIN_NAME) or {}
port = int(config.get("bridge_port", 3100))
base_url = bridge_manager.get_bridge_url(port)
# For group chats, auto-reply to last received message if no explicit reply_to
if not reply_to and context.data.get(CTX_WA_IS_GROUP):
reply_to = context.data.get(CTX_WA_LAST_MSG_ID, "")
# Convert markdown to WhatsApp formatting
response_text = _md_to_whatsapp(response_text)
# Prefix response in self-chat mode so user can distinguish agent messages
mode = config.get("mode", "self-chat")
if mode == "self-chat":
response_text = context.agent0.read_prompt(
"fw.wa.self_chat_prefix.md", response_text=response_text,
)
# Send text
try:
result = await wa_client.send_message(base_url, chat_id, response_text, reply_to=reply_to)
if result.get("error"):
return result["error"]
except Exception as e:
return str(e)
# Send attachments via RFC (files may live in execution runtime)
if attachments:
host_paths = await _read_attachments_to_host(attachments)
for host_path in host_paths:
try:
result = await wa_client.send_media(
base_url, chat_id, host_path,
)
if result.get("error"):
PrintStyle.warning(f"WhatsApp: attachment error: {result['error']}")
except Exception as e:
PrintStyle.warning(f"WhatsApp: attachment error: {e}")
# Typing: restart if agent is still working, stop if final reply
if keep_typing:
await wa_client.send_typing(base_url, chat_id)
else:
await wa_client.send_typing(base_url, chat_id, paused=True)
context.data[CTX_WA_TYPING_ACTIVE] = False
return None
# ------------------------------------------------------------------
# Attachment reading (via RFC into execution runtime)
# ------------------------------------------------------------------
async def _read_attachments_to_host(
paths: list[str],
) -> list[str]:
"""Read files from execution runtime and write to host media cache."""
from plugins._whatsapp_integration.helpers.attachment_reader import read_attachment
host_paths: list[str] = []
for path in paths:
data = await runtime.call_development_function(read_attachment, path)
if data["error"]:
PrintStyle.warning(f"WhatsApp attachment: {data['error']}")
continue
# Write decoded bytes to host-side media cache
host_path = os.path.join(
files.get_abs_path(MEDIA_FOLDER), data["name"],
)
os.makedirs(os.path.dirname(host_path), exist_ok=True)
with open(host_path, "wb") as f:
f.write(base64.b64decode(data["content_b64"]))
host_paths.append(host_path)
return host_paths
async def _save_incoming_media(
media_urls: list[str],
) -> list[str]:
"""Save incoming media files into execution runtime via RFC."""
from plugins._whatsapp_integration.helpers.attachment_writer import write_attachment
runtime_paths: list[str] = []
for host_path in media_urls:
if not os.path.isfile(host_path):
continue
name = os.path.basename(host_path)
with open(host_path, "rb") as f:
content_b64 = base64.b64encode(f.read()).decode()
rel_path = os.path.join(MEDIA_FOLDER, name)
result = await runtime.call_development_function(
write_attachment, rel_path, content_b64,
)
if result.get("error"):
PrintStyle.warning(f"WhatsApp media save: {result['error']}")
runtime_paths.append(host_path) # fallback to host path
else:
runtime_paths.append(result["path"])
return runtime_paths

View file

@ -0,0 +1,32 @@
"""Phone-number normalization helpers for WhatsApp integration."""
from __future__ import annotations
import re
from collections.abc import Iterable
_JID_SUFFIX_RE = re.compile(r"[@:].*")
_NON_DIGIT_RE = re.compile(r"\D+")
_LEADING_ZERO_RE = re.compile(r"^0+")
def normalize_number(raw: str) -> str:
"""Normalize WhatsApp sender identifiers and phone numbers to comparable digits."""
text = _JID_SUFFIX_RE.sub("", str(raw or ""))
digits = _NON_DIGIT_RE.sub("", text)
return _LEADING_ZERO_RE.sub("", digits)
def normalize_allowed_numbers(value: object) -> set[str]:
"""Accept stored config as a list/tuple/set or comma-delimited string."""
if isinstance(value, str):
candidates = value.split(",")
elif isinstance(value, Iterable):
candidates = value
else:
return set()
normalized = {normalize_number(item) for item in candidates}
normalized.discard("")
return normalized

View file

@ -0,0 +1,11 @@
"""Shared storage paths for the WhatsApp bridge."""
from helpers import files
def get_bridge_session_dir() -> str:
return files.get_abs_path(files.TEMP_DIR, "whatsapp", "session")
def get_bridge_media_dir() -> str:
return files.get_abs_path(files.TEMP_DIR, "whatsapp", "media")

View file

@ -0,0 +1,103 @@
"""
WhatsApp bridge HTTP client.
No agent/tool dependencies.
"""
import aiohttp
async def get_messages(base_url: str) -> list[dict]:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{base_url}/messages", timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status == 200:
return await resp.json()
return []
async def send_message(
base_url: str, chat_id: str, message: str, reply_to: str = "",
) -> dict:
payload: dict = {"chatId": chat_id, "message": message}
if reply_to:
payload["replyTo"] = reply_to
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/send",
json=payload,
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
return await resp.json()
async def send_media(
base_url: str,
chat_id: str,
file_path: str,
caption: str = "",
media_type: str = "",
file_name: str = "",
) -> dict:
payload: dict = {"chatId": chat_id, "filePath": file_path}
if caption:
payload["caption"] = caption
if media_type:
payload["mediaType"] = media_type
if file_name:
payload["fileName"] = file_name
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/send-media",
json=payload,
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
return await resp.json()
async def send_typing(base_url: str, chat_id: str, paused: bool = False) -> None:
try:
payload: dict = {"chatId": chat_id}
if paused:
payload["status"] = "paused"
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/typing",
json=payload,
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
await resp.json()
except Exception:
pass
async def get_health(base_url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{base_url}/health", timeout=aiohttp.ClientTimeout(total=5),
) as resp:
if resp.status == 200:
return await resp.json()
return {"status": "error", "queueLength": 0, "uptime": 0}
async def get_qr(base_url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{base_url}/qr", timeout=aiohttp.ClientTimeout(total=5),
) as resp:
if resp.status == 200:
return await resp.json()
return {"status": "error", "qr": None}
async def get_chat_info(base_url: str, chat_id: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{base_url}/chat/{chat_id}",
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status == 200:
return await resp.json()
return {"name": "", "isGroup": False, "participants": []}

View file

@ -0,0 +1,8 @@
name: _whatsapp_integration
title: WhatsApp Integration
description: Communicate with Agent Zero via WhatsApp. Uses Baileys bridge for message sending and receiving.
version: 1.0.0
settings_sections:
- external
per_project_config: false
per_agent_config: false

View file

@ -0,0 +1 @@
whatsapp poll error: {{error}}

View file

@ -0,0 +1 @@
whatsapp poll complete: {{result}}

View file

@ -0,0 +1,2 @@
[🤖 Agent Zero]
{{response_text}}

View file

@ -0,0 +1,3 @@
whatsapp send failed (attempt {{attempt}}/{{max_retries}}): {{error}}
previous response was NOT delivered to user
retry or inform user issue exists

View file

@ -0,0 +1,6 @@
whatsapp session user communicates via whatsapp
response tool sends whatsapp message dont use python
break_loop true > stop working and wait for user reply
break_loop false > only for mid-task progress updates then keep working
always keep user informed send progress before long tasks
tell user what you are doing now and what comes next

View file

@ -0,0 +1,37 @@
# WhatsApp session behavior
user communicates via whatsapp
response tool = send whatsapp message to user
dont use code to send message
break_loop true > stop working and wait for user reply
break_loop false > only for mid-task progress updates then keep working
include file paths in attachments array for sending files
multiple files zip first attach single archive
in group chats use reply_to with msg id to quote a specific message
group replies auto-quote last received message if reply_to omitted
usage:
~~~json
{
...
"tool_name": "response",
"tool_args": {
"text": "Working on it...",
"break_loop": false
}
}
~~~
~~~json
{
...
"tool_name": "response",
"tool_args": {
"text": "Here is...",
"attachments": [
"/path/file.zip"
],
"reply_to": "msg_id",
"break_loop": true
}
}
~~~

View file

@ -0,0 +1 @@
whatsapp update failed: {{error}}

View file

@ -0,0 +1 @@
whatsapp update sent continue working

View file

@ -0,0 +1,3 @@
[WhatsApp from {{sender_name}}]
{{body}}

View file

@ -0,0 +1,3 @@
[WhatsApp group "{{group_name}}" from {{sender_name}} id:{{message_id}}]
{{body}}

View file

@ -0,0 +1,2 @@
# WhatsApp custom rules
{{instructions}}

View file

@ -0,0 +1,337 @@
<html>
<head>
<title>WhatsApp Integration</title>
</head>
<body>
<div x-data="{
testing: false,
test_results: null,
projects: [],
qr_visible: false,
qr_status: '',
qr_message: '',
qr_data_url: null,
qr_poll_timer: null,
disconnecting: false,
disconnect_message: '',
async init() {
try {
const { callJsonApi } = await import('/js/api.js');
const res = await callJsonApi('projects', { action: 'list' });
this.projects = res.data || [];
} catch (e) { this.projects = []; }
},
allowed_text() {
const value = config?.allowed_numbers;
if (Array.isArray(value)) return value.join(', ');
return typeof value === 'string' ? value : '';
},
allowed_is_empty() {
const value = config?.allowed_numbers;
if (Array.isArray(value)) return value.length === 0;
return !String(value || '').trim();
},
set_allowed(val) {
config.allowed_numbers = val.split(',')
.map(s => s.trim())
.filter(s => s);
},
async test_connection() {
this.testing = true;
this.test_results = null;
try {
const { callJsonApi } = await import('/js/api.js');
const res = await callJsonApi('/plugins/_whatsapp_integration/test_connection', {
config: { bridge_port: config.bridge_port }
});
this.test_results = res;
} catch (e) {
this.test_results = { success: false, results: [{ test: 'Connection', ok: false, message: String(e) }] };
}
this.testing = false;
},
async show_qr() {
this.qr_visible = true;
this.qr_status = 'loading';
this.qr_message = 'Starting bridge...';
this.qr_data_url = null;
await this.poll_qr();
this.qr_poll_timer = setInterval(() => this.poll_qr(), 3000);
},
hide_qr() {
this.qr_visible = false;
this.qr_data_url = null;
this.qr_status = '';
if (this.qr_poll_timer) {
clearInterval(this.qr_poll_timer);
this.qr_poll_timer = null;
}
},
async poll_qr() {
try {
const { callJsonApi } = await import('/js/api.js');
const res = await callJsonApi('/plugins/_whatsapp_integration/qr_code', {});
this.qr_status = res.status || 'error';
this.qr_message = res.message || '';
this.qr_data_url = res.qr || null;
if (res.status === 'connected') {
if (this.qr_poll_timer) {
clearInterval(this.qr_poll_timer);
this.qr_poll_timer = null;
}
}
} catch (e) {
this.qr_status = 'error';
this.qr_message = String(e);
this.qr_data_url = null;
}
},
async disconnect_account() {
if (!confirm('Disconnect this WhatsApp account? You will need to scan a new QR code to reconnect.')) return;
this.disconnecting = true;
this.disconnect_message = '';
try {
const { callJsonApi } = await import('/js/api.js');
const res = await callJsonApi('/plugins/_whatsapp_integration/disconnect', {});
this.disconnect_message = res.success ? 'Account disconnected' : (res.message || 'Failed');
} catch (e) {
this.disconnect_message = String(e);
}
this.disconnecting = false;
}
}">
<template x-if="config">
<div>
<div class="section-title">WhatsApp Integration</div>
<div class="field">
<div class="field-label">
<div class="field-title">Enabled</div>
<div class="field-description">Enable WhatsApp bridge and message polling</div>
</div>
<div class="field-control">
<label class="toggle">
<input type="checkbox" x-model="config.enabled" />
<span class="toggler"></span>
</label>
</div>
</div>
<!-- WhatsApp Account (shown when enabled) -->
<template x-if="config.enabled">
<div>
<div class="field">
<div class="field-label">
<div class="field-title">WhatsApp Account</div>
<div class="field-description">
<span x-show="!disconnect_message">Pair or switch your WhatsApp account</span>
<span x-show="disconnect_message" x-text="disconnect_message"
:style="'color:' + (disconnect_message === 'Account disconnected' ? '#4caf50' : '#f44336')"></span>
</div>
</div>
<div class="field-control" style="display: flex; gap: 8px;">
<button class="btn btn-field" @click="show_qr()">
Show QR Code
</button>
<button class="btn btn-field" @click="disconnect_account()" :disabled="disconnecting">
<span x-show="!disconnecting">Disconnect</span>
<span x-show="disconnecting">Disconnecting...</span>
</button>
</div>
</div>
<!-- QR Code panel -->
<template x-if="qr_visible">
<div style="margin-top: 8px; padding: 16px; border-radius: 8px;
border: 1px solid var(--border-color, #333);
text-align: center;">
<!-- Connected state -->
<template x-if="qr_status === 'connected'">
<div>
<div style="font-size: 1.5rem; margin-bottom: 8px;">&#10003;</div>
<div style="font-weight: 500; color: #4caf50;" x-text="qr_message"></div>
<button class="btn btn-field" @click="hide_qr()" style="margin-top: 12px;">
Close
</button>
</div>
</template>
<!-- QR code ready -->
<template x-if="qr_status === 'waiting_scan' && qr_data_url">
<div>
<div style="font-weight: 500; margin-bottom: 12px;">
Scan with WhatsApp on your phone
</div>
<img :src="qr_data_url" alt="WhatsApp QR Code"
style="width: 256px; height: 256px; border-radius: 8px;
background: white; padding: 4px;" />
<div style="margin-top: 8px; font-size: 0.8rem; opacity: 0.6;">
QR code refreshes automatically
</div>
<button class="btn btn-field" @click="hide_qr()" style="margin-top: 12px;">
Cancel
</button>
</div>
</template>
<!-- Loading / waiting for QR -->
<template x-if="qr_status !== 'connected' && !(qr_status === 'waiting_scan' && qr_data_url)">
<div>
<div style="font-weight: 500; margin-bottom: 8px;" x-text="qr_message || 'Connecting...'"></div>
<div style="font-size: 0.85rem; opacity: 0.6;">
<template x-if="qr_status === 'error'">
<span style="color: #f44336;" x-text="qr_message"></span>
</template>
<template x-if="qr_status !== 'error'">
<span>Please wait...</span>
</template>
</div>
<button class="btn btn-field" @click="hide_qr()" style="margin-top: 12px;">
Cancel
</button>
</div>
</template>
</div>
</template>
</div>
</template>
<div class="field">
<div class="field-label">
<div class="field-title">Mode</div>
<div class="field-description">
<span x-show="config.mode === 'self-chat'">
Use your personal number. You can message yourself to talk to the agent, and the agent can also handle messages that other people send to your number.
</span>
<span x-show="config.mode !== 'self-chat'">
Use a separate WhatsApp number dedicated to Agent Zero conversations.
</span>
</div>
</div>
<div class="field-control">
<select x-model="config.mode">
<option value="self-chat">Personal number (self-chat)</option>
<option value="dedicated">Separate number (dedicated)</option>
</select>
</div>
</div>
<template x-if="config.enabled && allowed_is_empty()">
<div style="margin: 8px 0 20px; padding: 12px 14px; border-radius: 10px;
border: 1px solid rgba(255, 170, 0, 0.45);
background: rgba(255, 170, 0, 0.12);
color: var(--color-warning-text);">
<div style="font-weight: 600; display: flex; align-items: center; gap: 8px;">
<span aria-hidden="true">&#9888;</span>
<span>Warning</span>
</div>
<div style="margin-top: 4px; line-height: 1.45;">
Allowed Numbers is empty. If other people can message this WhatsApp number, they can use your Agent Zero.
</div>
</div>
</template>
<div class="field">
<div class="field-label">
<div class="field-title">Allowed Numbers</div>
<div class="field-description">Comma-separated phone numbers. Matching is normalized by the backend, so punctuation and + prefixes are okay. Empty = allow all.</div>
</div>
<div class="field-control">
<input type="text" :value="allowed_text()" @change="set_allowed($event.target.value)" placeholder="+1 (415) 555-1234, +44 7911 123456" />
</div>
</div>
<div class="field">
<div class="field-label">
<div class="field-title">Allow Group</div>
<div class="field-description">Respond in group chats when mentioned or replied to</div>
</div>
<div class="field-control">
<label class="toggle">
<input type="checkbox" x-model="config.allow_group" />
<span class="toggler"></span>
</label>
</div>
</div>
<div class="field">
<div class="field-label">
<div class="field-title">Project</div>
<div class="field-description">Project to activate for WhatsApp chats</div>
</div>
<div class="field-control">
<select :value="config.project" @change="config.project = $event.target.value">
<option value="">No project</option>
<template x-for="proj in projects" :key="proj.name">
<option :value="proj.name" x-text="proj.title || proj.name" :selected="config.project === proj.name"></option>
</template>
</select>
</div>
</div>
<div class="field">
<div class="field-label">
<div class="field-title">Agent Instructions</div>
<div class="field-description">Extra instructions for the agent in WhatsApp chats</div>
</div>
<div class="field-control">
<textarea x-model="config.agent_instructions" rows="3" placeholder="e.g. Always respond concisely..."></textarea>
</div>
</div>
<div class="field">
<div class="field-label">
<div class="field-title">Bridge Port</div>
<div class="field-description">Local port for the WhatsApp bridge HTTP server</div>
</div>
<div class="field-control">
<input type="number" x-model.number="config.bridge_port" placeholder="3100" />
</div>
</div>
<div class="field">
<div class="field-label">
<div class="field-title">Poll Interval (seconds)</div>
<div class="field-description">How often to check for new messages (minimum 2)</div>
</div>
<div class="field-control">
<input type="number" x-model.number="config.poll_interval_seconds" min="2" placeholder="3" />
</div>
</div>
<!-- Test connection -->
<div style="margin-top: 16px; display: flex; align-items: center; gap: 12px;">
<button class="btn btn-field" @click="test_connection()" :disabled="testing">
<span x-show="!testing">Test Connection</span>
<span x-show="testing">Testing...</span>
</button>
</div>
<!-- Test results -->
<template x-if="test_results">
<div style="margin-top: 8px; padding: 8px 12px; border-radius: 6px; font-size: 0.85rem;
border: 1px solid var(--border-color, #333);">
<template x-for="r in test_results.results" :key="r.test">
<div style="display: flex; align-items: center; gap: 8px; padding: 4px 0;">
<span x-text="r.ok ? '✓' : '✗'"
:style="'font-weight: bold; color:' + (r.ok ? '#4caf50' : '#f44336')"></span>
<span style="font-weight: 500; min-width: 50px;" x-text="r.test"></span>
<span style="opacity: 0.8;" x-text="r.message"></span>
</div>
</template>
</div>
</template>
</div>
</template>
</div>
</body>
</html>

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

View file

@ -0,0 +1,615 @@
#!/usr/bin/env node
/**
* Agent Zero WhatsApp Bridge
*
* Standalone Node.js process that connects to WhatsApp via Baileys
* and exposes HTTP endpoints for the Python plugin.
*
* Endpoints:
* GET /messages - Poll for new incoming messages
* POST /send - Send a message { chatId, message, replyTo? }
* POST /edit - Edit a sent message { chatId, messageId, message }
* POST /send-media - Send media { chatId, filePath, mediaType?, caption?, fileName? }
* POST /typing - Send typing indicator { chatId }
* GET /chat/:id - Get chat info
* GET /health - Health check
*
* Usage:
* node bridge.js --port 3100 --session /path/to/session --cache-dir /path/to/media
*/
import { makeWASocket, useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion, downloadMediaMessage } from '@whiskeysockets/baileys';
import express from 'express';
import { Boom } from '@hapi/boom';
import pino from 'pino';
import path from 'path';
import { mkdirSync, readFileSync, writeFileSync, existsSync, readdirSync } from 'fs';
import { randomBytes } from 'crypto';
import qrcode from 'qrcode-terminal';
import QRCode from 'qrcode';
// Parse CLI args
const args = process.argv.slice(2);
function getArg(name, defaultVal) {
const idx = args.indexOf(`--${name}`);
return idx !== -1 && args[idx + 1] ? args[idx + 1] : defaultVal;
}
const WHATSAPP_DEBUG =
typeof process !== 'undefined' &&
process.env &&
typeof process.env.WHATSAPP_DEBUG === 'string' &&
['1', 'true', 'yes', 'on'].includes(process.env.WHATSAPP_DEBUG.toLowerCase());
const DEFAULT_DATA_ROOT = path.resolve(path.dirname(new URL(import.meta.url).pathname), '..', '..', '..', 'tmp', 'whatsapp');
const PORT = parseInt(getArg('port', '3100'), 10);
const SESSION_DIR = getArg('session', path.join(DEFAULT_DATA_ROOT, 'session'));
const CACHE_DIR = getArg('cache-dir', path.join(DEFAULT_DATA_ROOT, 'media'));
const PAIR_ONLY = args.includes('--pair-only');
const MODE = getArg('mode', 'self-chat'); // "dedicated" or "self-chat"
mkdirSync(SESSION_DIR, { recursive: true });
mkdirSync(CACHE_DIR, { recursive: true });
// Build LID -> phone reverse map from session files (lid-mapping-{phone}.json)
function buildLidMap() {
const map = {};
try {
for (const f of readdirSync(SESSION_DIR)) {
const m = f.match(/^lid-mapping-(\d+)\.json$/);
if (!m) continue;
const phone = m[1];
const lid = JSON.parse(readFileSync(path.join(SESSION_DIR, f), 'utf8'));
if (lid) map[String(lid)] = phone;
}
} catch {}
return map;
}
let lidToPhone = buildLidMap();
// Cache group names to avoid repeated metadata fetches
const groupNameCache = {};
// Extract raw number from a JID (strips @domain and :device)
function numOf(jid) {
return (jid || '').split('@')[0].split(':')[0];
}
// Resolve LID-based number to phone number using lidToPhone map
function resolveNumber(num) {
const raw = num.split(':')[0];
return lidToPhone[raw] || lidToPhone[num] || raw;
}
const logger = pino({ level: 'warn' });
// Message queue for polling
const messageQueue = [];
const MAX_QUEUE_SIZE = 100;
// Track recently sent message IDs to prevent echo-back loops
const recentlySentIds = new Set();
const MAX_RECENT_IDS = 50;
// Store received messages for reply quoting
const messageStore = new Map();
const MAX_STORED_MESSAGES = 200;
let sock = null;
let connectionState = 'disconnected';
let latestQrDataUrl = null;
async function startSocket() {
const { state, saveCreds } = await useMultiFileAuthState(SESSION_DIR);
const { version } = await fetchLatestBaileysVersion();
sock = makeWASocket({
version,
auth: state,
logger,
printQRInTerminal: false,
browser: ['Agent Zero', 'Chrome', '120.0'],
syncFullHistory: false,
markOnlineOnConnect: false,
getMessage: async (key) => {
return { conversation: '' };
},
});
sock.ev.on('creds.update', () => { saveCreds(); lidToPhone = buildLidMap(); });
sock.ev.on('connection.update', (update) => {
const { connection, lastDisconnect, qr } = update;
if (qr) {
console.log('\n[bridge] Scan this QR code with WhatsApp on your phone:\n');
qrcode.generate(qr, { small: true });
console.log('\n[bridge] Waiting for scan...\n');
QRCode.toDataURL(qr, { width: 256, margin: 2 }).then(url => {
latestQrDataUrl = url;
}).catch(() => {});
}
if (connection === 'close') {
const reason = new Boom(lastDisconnect?.error)?.output?.statusCode;
connectionState = 'disconnected';
if (reason === DisconnectReason.loggedOut) {
console.log('[bridge] Logged out. Delete session and restart to re-authenticate.');
process.exit(1);
} else {
if (reason === 515) {
console.log('[bridge] WhatsApp requested restart (code 515). Reconnecting...');
} else {
console.log(`[bridge] Connection closed (reason: ${reason}). Reconnecting in 3s...`);
}
setTimeout(startSocket, reason === 515 ? 1000 : 3000);
}
} else if (connection === 'open') {
connectionState = 'connected';
latestQrDataUrl = null;
console.log('[bridge] WhatsApp connected');
if (PAIR_ONLY) {
console.log('[bridge] Pairing complete. Credentials saved.');
setTimeout(() => process.exit(0), 2000);
}
}
});
sock.ev.on('messages.upsert', async ({ messages, type }) => {
if (type !== 'notify' && type !== 'append') return;
for (const msg of messages) {
if (!msg.message) continue;
const chatId = msg.key.remoteJid;
if (WHATSAPP_DEBUG) {
try {
console.log(JSON.stringify({
event: 'upsert', type,
fromMe: !!msg.key.fromMe, chatId,
senderId: msg.key.participant || chatId,
messageKeys: Object.keys(msg.message || {}),
}));
} catch {}
}
const senderId = msg.key.participant || chatId;
const isGroup = chatId.endsWith('@g.us');
const senderNumber = senderId.replace(/@.*/, '');
// Handle fromMe messages based on mode
if (msg.key.fromMe) {
if (isGroup || chatId.includes('status')) continue;
if (MODE === 'dedicated') {
// Dedicated mode: separate number — all fromMe are echo-backs, skip
continue;
}
// Self-chat mode: only accept messages in the user's own self-chat
const myNumber = (sock.user?.id || '').replace(/:.*@/, '@').replace(/@.*/, '');
const myLid = (sock.user?.lid || '').replace(/:.*@/, '@').replace(/@.*/, '');
const chatNumber = chatId.replace(/@.*/, '');
const isSelfChat = (myNumber && chatNumber === myNumber) || (myLid && chatNumber === myLid);
if (!isSelfChat) continue;
}
// Skip status broadcasts
if (chatId === 'status@broadcast') continue;
// Unwrap documentWithCaptionMessage (Baileys wraps captioned docs)
if (msg.message.documentWithCaptionMessage?.message?.documentMessage) {
msg.message.documentMessage = msg.message.documentWithCaptionMessage.message.documentMessage;
}
// Extract message body
let body = '';
let hasMedia = false;
let mediaType = '';
const mediaUrls = [];
if (msg.message.conversation) {
body = msg.message.conversation;
} else if (msg.message.extendedTextMessage?.text) {
body = msg.message.extendedTextMessage.text;
} else if (msg.message.imageMessage) {
body = msg.message.imageMessage.caption || '';
hasMedia = true;
mediaType = 'image';
} else if (msg.message.videoMessage) {
body = msg.message.videoMessage.caption || '';
hasMedia = true;
mediaType = 'video';
} else if (msg.message.audioMessage || msg.message.pttMessage) {
hasMedia = true;
mediaType = msg.message.pttMessage ? 'ptt' : 'audio';
} else if (msg.message.documentMessage) {
body = msg.message.documentMessage.caption || '';
hasMedia = true;
mediaType = 'document';
}
// Download media to disk
if (hasMedia) {
try {
const buf = await downloadMediaMessage(msg, 'buffer', {}, { logger, reuploadRequest: sock.updateMediaMessage });
let ext = '.bin';
let prefix = mediaType;
if (mediaType === 'image') {
const mime = msg.message.imageMessage?.mimetype || 'image/jpeg';
const extMap = { 'image/jpeg': '.jpg', 'image/png': '.png', 'image/webp': '.webp', 'image/gif': '.gif' };
ext = extMap[mime] || '.jpg';
} else if (mediaType === 'video') {
const mime = msg.message.videoMessage?.mimetype || 'video/mp4';
ext = mime.includes('mp4') ? '.mp4' : '.mkv';
} else if (mediaType === 'audio' || mediaType === 'ptt') {
const mime = msg.message.audioMessage?.mimetype || msg.message.pttMessage?.mimetype || 'audio/ogg';
ext = mime.includes('opus') || mime.includes('ogg') ? '.ogg' : '.mp3';
} else if (mediaType === 'document') {
const docMsg = msg.message.documentMessage;
const fileName = docMsg?.fileName || '';
if (fileName) {
// Use original filename for documents
const filePath = path.join(CACHE_DIR, `${randomBytes(4).toString('hex')}_${fileName}`);
writeFileSync(filePath, buf);
mediaUrls.push(filePath);
if (!body) body = fileName;
} else {
const mime = docMsg?.mimetype || 'application/octet-stream';
const docExtMap = { 'application/pdf': '.pdf', 'application/msword': '.doc' };
ext = docExtMap[mime] || '.bin';
}
}
// Write file if not already handled (document with fileName)
if (mediaUrls.length === 0) {
const filePath = path.join(CACHE_DIR, `${prefix}_${randomBytes(6).toString('hex')}${ext}`);
writeFileSync(filePath, buf);
mediaUrls.push(filePath);
}
} catch (err) {
console.error(`[bridge] Failed to download ${mediaType}:`, err.message);
}
}
// For media without caption, use a placeholder
if (hasMedia && !body) {
body = `[${mediaType} received]`;
}
// Skip echo-backs via recently sent IDs
if (recentlySentIds.has(msg.key.id)) {
if (WHATSAPP_DEBUG) {
try { console.log(JSON.stringify({ event: 'ignored', reason: 'agent_echo', chatId, messageId: msg.key.id })); } catch {}
}
continue;
}
// Skip empty messages
if (!body && !hasMedia) {
if (WHATSAPP_DEBUG) {
try {
console.log(JSON.stringify({ event: 'ignored', reason: 'empty', chatId, messageKeys: Object.keys(msg.message || {}) }));
} catch {}
}
continue;
}
// Detect if the bot was mentioned or replied to in a group message
let mentionedMe = false;
let repliedToMe = false;
if (isGroup && sock.user) {
const contextInfo = msg.message.extendedTextMessage?.contextInfo
|| msg.message.imageMessage?.contextInfo
|| msg.message.videoMessage?.contextInfo
|| msg.message.documentMessage?.contextInfo
|| null;
// Build set of bot's own numbers for comparison
const myNums = new Set();
if (sock.user.id) myNums.add(numOf(sock.user.id));
if (sock.user.lid) myNums.add(numOf(sock.user.lid));
for (const [lid, phone] of Object.entries(lidToPhone)) {
if (myNums.has(lid)) myNums.add(String(phone));
if (myNums.has(String(phone))) myNums.add(lid);
}
// Check @mentions
const mentionedJids = contextInfo?.mentionedJid || [];
for (const jid of mentionedJids) {
if (myNums.has(numOf(jid))) { mentionedMe = true; break; }
}
// Check if replying to a bot message
if (contextInfo?.stanzaId) {
const replyParticipant = contextInfo.participant || '';
if (replyParticipant && myNums.has(numOf(replyParticipant))) {
repliedToMe = true;
} else if (recentlySentIds.has(contextInfo.stanzaId)) {
repliedToMe = true;
}
}
if (WHATSAPP_DEBUG && (mentionedJids.length > 0 || repliedToMe)) {
try { console.log(JSON.stringify({ event: 'mention_reply_check', myNums: [...myNums], mentionedJids, mentionedMe, repliedToMe, stanzaId: contextInfo?.stanzaId })); } catch {}
}
}
// Resolve sender number (LID -> phone if possible)
const resolvedSender = resolveNumber(senderNumber);
// Resolve group name from metadata cache or fetch
let chatName;
if (isGroup) {
if (groupNameCache[chatId]) {
chatName = groupNameCache[chatId];
} else {
try {
const meta = await sock.groupMetadata(chatId);
chatName = meta.subject || chatId.split('@')[0];
groupNameCache[chatId] = chatName;
} catch {
chatName = chatId.split('@')[0];
}
}
} else {
chatName = msg.pushName || resolvedSender;
}
// Strip bot's own @mention from body so agent gets clean text
let cleanBody = body;
if (isGroup && mentionedMe && sock.user) {
const myNum = numOf(sock.user.id || '');
const myLidNum = numOf(sock.user.lid || '');
// Remove @number or @lid patterns matching the bot
for (const n of [myNum, myLidNum, resolveNumber(myNum), resolveNumber(myLidNum)]) {
if (n) cleanBody = cleanBody.replace(new RegExp(`@${n}\\b`, 'g'), '').trim();
}
}
const event = {
messageId: msg.key.id,
chatId,
senderId,
senderNumber: resolvedSender,
senderName: msg.pushName || resolvedSender,
chatName,
isGroup,
mentionedMe,
repliedToMe,
body: cleanBody,
hasMedia,
mediaType,
mediaUrls,
timestamp: msg.messageTimestamp,
};
// Store raw message for reply quoting
messageStore.set(msg.key.id, msg);
if (messageStore.size > MAX_STORED_MESSAGES) {
messageStore.delete(messageStore.keys().next().value);
}
messageQueue.push(event);
if (messageQueue.length > MAX_QUEUE_SIZE) {
messageQueue.shift();
}
}
});
}
// HTTP server
const app = express();
app.use(express.json());
// Poll for new messages
app.get('/messages', (req, res) => {
const msgs = messageQueue.splice(0, messageQueue.length);
res.json(msgs);
});
// Send a message
app.post('/send', async (req, res) => {
if (!sock || connectionState !== 'connected') {
return res.status(503).json({ error: 'Not connected to WhatsApp' });
}
const { chatId, message, replyTo } = req.body;
if (!chatId || !message) {
return res.status(400).json({ error: 'chatId and message are required' });
}
try {
const opts = {};
if (replyTo && messageStore.has(replyTo)) {
opts.quoted = messageStore.get(replyTo);
}
const sent = await sock.sendMessage(chatId, { text: message }, opts);
if (sent?.key?.id) {
recentlySentIds.add(sent.key.id);
if (recentlySentIds.size > MAX_RECENT_IDS) {
recentlySentIds.delete(recentlySentIds.values().next().value);
}
}
res.json({ success: true, messageId: sent?.key?.id });
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// Edit a previously sent message
app.post('/edit', async (req, res) => {
if (!sock || connectionState !== 'connected') {
return res.status(503).json({ error: 'Not connected to WhatsApp' });
}
const { chatId, messageId, message } = req.body;
if (!chatId || !messageId || !message) {
return res.status(400).json({ error: 'chatId, messageId, and message are required' });
}
try {
const key = { id: messageId, fromMe: true, remoteJid: chatId };
await sock.sendMessage(chatId, { text: message, edit: key });
res.json({ success: true });
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// MIME type map and media type inference
const MIME_MAP = {
jpg: 'image/jpeg', jpeg: 'image/jpeg', png: 'image/png',
webp: 'image/webp', gif: 'image/gif',
mp4: 'video/mp4', mov: 'video/quicktime', avi: 'video/x-msvideo',
mkv: 'video/x-matroska', '3gp': 'video/3gpp',
pdf: 'application/pdf',
doc: 'application/msword',
docx: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
xlsx: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
};
function inferMediaType(ext) {
if (['jpg', 'jpeg', 'png', 'webp', 'gif'].includes(ext)) return 'image';
if (['mp4', 'mov', 'avi', 'mkv', '3gp'].includes(ext)) return 'video';
if (['ogg', 'opus', 'mp3', 'wav', 'm4a'].includes(ext)) return 'audio';
return 'document';
}
// Send media natively
app.post('/send-media', async (req, res) => {
if (!sock || connectionState !== 'connected') {
return res.status(503).json({ error: 'Not connected to WhatsApp' });
}
const { chatId, filePath, mediaType, caption, fileName } = req.body;
if (!chatId || !filePath) {
return res.status(400).json({ error: 'chatId and filePath are required' });
}
try {
if (!existsSync(filePath)) {
return res.status(404).json({ error: `File not found: ${filePath}` });
}
const buffer = readFileSync(filePath);
const ext = filePath.toLowerCase().split('.').pop();
const type = mediaType || inferMediaType(ext);
let msgPayload;
switch (type) {
case 'image':
msgPayload = { image: buffer, caption: caption || undefined, mimetype: MIME_MAP[ext] || 'image/jpeg' };
break;
case 'video':
msgPayload = { video: buffer, caption: caption || undefined, mimetype: MIME_MAP[ext] || 'video/mp4' };
break;
case 'audio': {
const audioMime = (ext === 'ogg' || ext === 'opus') ? 'audio/ogg; codecs=opus' : 'audio/mpeg';
msgPayload = { audio: buffer, mimetype: audioMime, ptt: ext === 'ogg' || ext === 'opus' };
break;
}
case 'document':
default:
msgPayload = {
document: buffer,
fileName: fileName || path.basename(filePath),
caption: caption || undefined,
mimetype: MIME_MAP[ext] || 'application/octet-stream',
};
break;
}
const sent = await sock.sendMessage(chatId, msgPayload);
if (sent?.key?.id) {
recentlySentIds.add(sent.key.id);
if (recentlySentIds.size > MAX_RECENT_IDS) {
recentlySentIds.delete(recentlySentIds.values().next().value);
}
}
res.json({ success: true, messageId: sent?.key?.id });
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// Typing indicator
app.post('/typing', async (req, res) => {
if (!sock || connectionState !== 'connected') {
return res.status(503).json({ error: 'Not connected' });
}
const { chatId, status } = req.body;
if (!chatId) return res.status(400).json({ error: 'chatId required' });
try {
await sock.sendPresenceUpdate(status === 'paused' ? 'paused' : 'composing', chatId);
res.json({ success: true });
} catch (err) {
res.json({ success: false });
}
});
// Chat info
app.get('/chat/:id', async (req, res) => {
const chatId = req.params.id;
const isGroup = chatId.endsWith('@g.us');
if (isGroup && sock) {
try {
const metadata = await sock.groupMetadata(chatId);
return res.json({
name: metadata.subject,
isGroup: true,
participants: metadata.participants.map(p => p.id),
});
} catch {
// Fall through to default
}
}
res.json({
name: chatId.replace(/@.*/, ''),
isGroup,
participants: [],
});
});
// QR code for web UI pairing
app.get('/qr', (req, res) => {
if (connectionState === 'connected') {
return res.json({ status: 'connected', qr: null });
}
if (latestQrDataUrl) {
return res.json({ status: 'waiting_scan', qr: latestQrDataUrl });
}
res.json({ status: 'waiting_qr', qr: null });
});
// Health check
app.get('/health', (req, res) => {
res.json({
status: connectionState,
queueLength: messageQueue.length,
uptime: process.uptime(),
});
});
// Start
if (PAIR_ONLY) {
console.log('[bridge] WhatsApp pairing mode');
console.log(`[bridge] Session: ${SESSION_DIR}`);
console.log();
startSocket();
} else {
app.listen(PORT, '127.0.0.1', () => {
console.log(`[bridge] WhatsApp bridge listening on port ${PORT} (mode: ${MODE})`);
console.log(`[bridge] Session: ${SESSION_DIR}`);
console.log();
startSocket();
});
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,17 @@
{
"name": "a0-whatsapp-bridge",
"version": "1.0.0",
"description": "WhatsApp bridge for Agent Zero using Baileys",
"private": true,
"type": "module",
"scripts": {
"start": "node bridge.js"
},
"dependencies": {
"@whiskeysockets/baileys": "7.0.0-rc.9",
"express": "^4.21.0",
"qrcode-terminal": "^0.12.0",
"qrcode": "^1.5.0",
"pino": "^9.0.0"
}
}