feat(channels): add crash recovery and gateway mode support

- Add session persistence to SessionRouter for crash recovery
- Add loadSession method to AcpBridge for restoring sessions
- Add ChannelBaseOptions to support external router injection
- Refactor start.ts to support both standalone and gateway modes
- Extract config utilities into separate module

This enables channels to recover sessions after bridge crashes and
supports running multiple channels under a gateway process.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
tanzhenxin 2026-03-26 02:55:18 +00:00
parent f6ae769736
commit 1a605ec973
11 changed files with 567 additions and 161 deletions

View file

@ -129,6 +129,16 @@ export class AcpBridge extends EventEmitter {
return response.sessionId;
}
async loadSession(sessionId: string, cwd: string): Promise<string> {
const conn = this.ensureConnection();
const response = await conn.loadSession({
sessionId,
cwd,
mcpServers: [],
});
return response.sessionId;
}
async prompt(
sessionId: string,
text: string,

View file

@ -5,6 +5,10 @@ import { PairingStore } from './PairingStore.js';
import { SessionRouter } from './SessionRouter.js';
import type { AcpBridge, ToolCallEvent } from './AcpBridge.js';
export interface ChannelBaseOptions {
router?: SessionRouter;
}
export abstract class ChannelBase {
protected config: ChannelConfig;
protected bridge: AcpBridge;
@ -14,7 +18,12 @@ export abstract class ChannelBase {
protected name: string;
private instructedSessions: Set<string> = new Set();
constructor(name: string, config: ChannelConfig, bridge: AcpBridge) {
constructor(
name: string,
config: ChannelConfig,
bridge: AcpBridge,
options?: ChannelBaseOptions,
) {
this.name = name;
this.config = config;
this.bridge = bridge;
@ -28,20 +37,31 @@ export abstract class ChannelBase {
config.allowedUsers,
pairingStore,
);
this.router = new SessionRouter(bridge, config.cwd, config.sessionScope);
this.router =
options?.router ||
new SessionRouter(bridge, config.cwd, config.sessionScope);
bridge.on('toolCall', (event: ToolCallEvent) => {
const target = this.router.getTarget(event.sessionId);
if (target) {
this.onToolCall(target.chatId, event);
}
});
// When running standalone (no gateway), register toolCall listener directly.
// In gateway mode, the ChannelManager dispatches events instead.
if (!options?.router) {
bridge.on('toolCall', (event: ToolCallEvent) => {
const target = this.router.getTarget(event.sessionId);
if (target) {
this.onToolCall(target.chatId, event);
}
});
}
}
abstract connect(): Promise<void>;
abstract sendMessage(chatId: string, text: string): Promise<void>;
abstract disconnect(): void;
/** Replace the bridge instance (used after crash recovery restart). */
setBridge(bridge: AcpBridge): void {
this.bridge = bridge;
}
onToolCall(_chatId: string, _event: ToolCallEvent): void {}
async handleInbound(envelope: Envelope): Promise<void> {
@ -65,6 +85,7 @@ export abstract class ChannelBase {
envelope.senderId,
envelope.chatId,
envelope.threadId,
this.config.cwd,
);
// Prepend channel instructions on first message of a session

View file

@ -1,18 +1,38 @@
import { existsSync, readFileSync, writeFileSync, unlinkSync } from 'node:fs';
import type { SessionScope, SessionTarget } from './types.js';
import type { AcpBridge } from './AcpBridge.js';
interface PersistedEntry {
sessionId: string;
target: SessionTarget;
cwd: string;
}
export class SessionRouter {
private toSession: Map<string, string> = new Map(); // routing key → session ID
private toTarget: Map<string, SessionTarget> = new Map(); // session ID → target
private toCwd: Map<string, string> = new Map(); // session ID → cwd
private bridge: AcpBridge;
private cwd: string;
private defaultCwd: string;
private scope: SessionScope;
private persistPath: string | undefined;
constructor(bridge: AcpBridge, cwd: string, scope: SessionScope = 'user') {
constructor(
bridge: AcpBridge,
defaultCwd: string,
scope: SessionScope = 'user',
persistPath?: string,
) {
this.bridge = bridge;
this.cwd = cwd;
this.defaultCwd = defaultCwd;
this.scope = scope;
this.persistPath = persistPath;
}
/** Replace the bridge instance (used after crash recovery restart). */
setBridge(bridge: AcpBridge): void {
this.bridge = bridge;
}
private routingKey(
@ -37,6 +57,7 @@ export class SessionRouter {
senderId: string,
chatId: string,
threadId?: string,
cwd?: string,
): Promise<string> {
const key = this.routingKey(channelName, senderId, chatId, threadId);
const existing = this.toSession.get(key);
@ -44,9 +65,12 @@ export class SessionRouter {
return existing;
}
const sessionId = await this.bridge.newSession(this.cwd);
const sessionCwd = cwd || this.defaultCwd;
const sessionId = await this.bridge.newSession(sessionCwd);
this.toSession.set(key, sessionId);
this.toTarget.set(sessionId, { channelName, senderId, chatId, threadId });
this.toCwd.set(sessionId, sessionCwd);
this.persist();
return sessionId;
}
@ -64,6 +88,107 @@ export class SessionRouter {
if (!sessionId) return false;
this.toSession.delete(key);
this.toTarget.delete(sessionId);
this.toCwd.delete(sessionId);
this.persist();
return true;
}
/** Get all session entries for crash recovery. */
getAll(): Array<{ key: string; sessionId: string; target: SessionTarget }> {
const entries: Array<{
key: string;
sessionId: string;
target: SessionTarget;
}> = [];
for (const [key, sessionId] of this.toSession) {
const target = this.toTarget.get(sessionId);
if (target) {
entries.push({ key, sessionId, target });
}
}
return entries;
}
/**
* Restore session mappings from a previous bridge.
* Called after bridge restart attempts loadSession for each saved mapping.
* Failed loads are silently dropped (new session on next message).
*/
async restoreSessions(): Promise<{
restored: number;
failed: number;
}> {
if (!this.persistPath || !existsSync(this.persistPath)) {
return { restored: 0, failed: 0 };
}
let entries: Record<string, PersistedEntry>;
try {
entries = JSON.parse(readFileSync(this.persistPath, 'utf-8'));
} catch {
return { restored: 0, failed: 0 };
}
let restored = 0;
let failed = 0;
for (const [key, entry] of Object.entries(entries)) {
try {
const sessionId = await this.bridge.loadSession(
entry.sessionId,
entry.cwd,
);
this.toSession.set(key, sessionId);
this.toTarget.set(sessionId, entry.target);
this.toCwd.set(sessionId, entry.cwd);
restored++;
} catch {
// Session can't be loaded — will create fresh on next message
failed++;
}
}
// Update persist file to only include successfully restored sessions
if (failed > 0) {
this.persist();
}
return { restored, failed };
}
/** Clear in-memory state and delete persist file. Used on clean shutdown. */
clearAll(): void {
this.toSession.clear();
this.toTarget.clear();
this.toCwd.clear();
if (this.persistPath && existsSync(this.persistPath)) {
try {
unlinkSync(this.persistPath);
} catch {
// best-effort
}
}
}
private persist(): void {
if (!this.persistPath) return;
const data: Record<string, PersistedEntry> = {};
for (const [key, sessionId] of this.toSession) {
const target = this.toTarget.get(sessionId);
if (target) {
data[key] = {
sessionId,
target,
cwd: this.toCwd.get(sessionId) || this.defaultCwd,
};
}
}
try {
writeFileSync(this.persistPath, JSON.stringify(data, null, 2), 'utf-8');
} catch {
// best-effort — don't break message flow for persistence failure
}
}
}

View file

@ -5,6 +5,7 @@ export type {
ToolCallEvent,
} from './AcpBridge.js';
export { ChannelBase } from './ChannelBase.js';
export type { ChannelBaseOptions } from './ChannelBase.js';
export { PairingStore } from './PairingStore.js';
export type { PairingRequest } from './PairingStore.js';
export { GroupGate } from './GroupGate.js';