feat(vscode-ide-companion/auth): deduplicate concurrent authentication calls

Prevent multiple simultaneous authentication flows by:
- Adding static authInFlight promise tracking in AcpConnection
- Implementing runExclusiveAuth method in AuthStateManager
- Adding sessionCreateInFlight tracking in QwenAgentManager
- Ensuring only one auth flow runs at a time across different components

This prevents race conditions and duplicate login prompts when multiple components request authentication simultaneously.
This commit is contained in:
yiliang114 2025-12-11 22:56:58 +08:00
parent 58d3a9c253
commit b34894c8ea
12 changed files with 589 additions and 464 deletions

View file

@ -23,6 +23,7 @@ import { QwenSessionUpdateHandler } from './qwenSessionUpdateHandler.js';
import { CliContextManager } from '../cli/cliContextManager.js';
import { authMethod } from '../types/acpTypes.js';
import { MIN_CLI_VERSION_FOR_SESSION_METHODS } from '../cli/cliVersionManager.js';
import { getConsoleLogger } from '../utils/logger.js';
export type { ChatMessage, PlanEntry, ToolCallUpdateData };
@ -45,11 +46,15 @@ export class QwenAgentManager {
// Cache the last used AuthStateManager so internal calls (e.g. fallback paths)
// can reuse it and avoid forcing a fresh authentication unnecessarily.
private defaultAuthStateManager?: AuthStateManager;
// Deduplicate concurrent session/new attempts
private sessionCreateInFlight: Promise<string | null> | null = null;
// Callback storage
private callbacks: QwenAgentCallbacks = {};
private consoleLog: (...args: unknown[]) => void;
constructor() {
constructor(consoleLogger = getConsoleLogger()) {
this.consoleLog = consoleLogger;
this.connection = new AcpConnection();
this.sessionReader = new QwenSessionReader();
this.sessionManager = new QwenSessionManager();
@ -76,7 +81,7 @@ export class QwenAgentManager {
).update;
const text = update?.content?.text || '';
if (update?.sessionUpdate === 'user_message_chunk' && text) {
console.log(
this.consoleLog(
'[QwenAgentManager] Rehydration: routing user message chunk',
);
this.callbacks.onMessage?.({
@ -87,7 +92,7 @@ export class QwenAgentManager {
return;
}
if (update?.sessionUpdate === 'agent_message_chunk' && text) {
console.log(
this.consoleLog(
'[QwenAgentManager] Rehydration: routing agent message chunk',
);
this.callbacks.onMessage?.({
@ -98,7 +103,7 @@ export class QwenAgentManager {
return;
}
// For other types during rehydration, fall through to normal handler
console.log(
this.consoleLog(
'[QwenAgentManager] Rehydration: non-text update, forwarding to handler',
);
}
@ -257,7 +262,7 @@ export class QwenAgentManager {
* @returns Session list
*/
async getSessionList(): Promise<Array<Record<string, unknown>>> {
console.log(
this.consoleLog(
'[QwenAgentManager] Getting session list with version-aware strategy',
);
@ -265,7 +270,7 @@ export class QwenAgentManager {
const cliContextManager = CliContextManager.getInstance();
const supportsSessionList = cliContextManager.supportsSessionList();
console.log(
this.consoleLog(
'[QwenAgentManager] CLI supports session/list:',
supportsSessionList,
);
@ -273,11 +278,14 @@ export class QwenAgentManager {
// Try ACP method first if supported
if (supportsSessionList) {
try {
console.log(
this.consoleLog(
'[QwenAgentManager] Attempting to get session list via ACP method',
);
const response = await this.connection.listSessions();
console.log('[QwenAgentManager] ACP session list response:', response);
this.consoleLog(
'[QwenAgentManager] ACP session list response:',
response,
);
// sendRequest resolves with the JSON-RPC "result" directly
// Newer CLI returns an object: { items: [...], nextCursor?, hasMore }
@ -295,7 +303,7 @@ export class QwenAgentManager {
: [];
}
console.log(
this.consoleLog(
'[QwenAgentManager] Sessions retrieved via ACP:',
res,
items.length,
@ -314,7 +322,7 @@ export class QwenAgentManager {
cwd: item.cwd,
}));
console.log(
this.consoleLog(
'[QwenAgentManager] Sessions retrieved via ACP:',
sessions.length,
);
@ -330,9 +338,11 @@ export class QwenAgentManager {
// Always fall back to file system method
try {
console.log('[QwenAgentManager] Getting session list from file system');
this.consoleLog(
'[QwenAgentManager] Getting session list from file system',
);
const sessions = await this.sessionReader.getAllSessions(undefined, true);
console.log(
this.consoleLog(
'[QwenAgentManager] Session list from file system (all projects):',
sessions.length,
);
@ -350,7 +360,7 @@ export class QwenAgentManager {
}),
);
console.log(
this.consoleLog(
'[QwenAgentManager] Sessions retrieved from file system:',
result.length,
);
@ -490,7 +500,7 @@ export class QwenAgentManager {
const item = list.find(
(s) => s.sessionId === sessionId || s.id === sessionId,
);
console.log(
this.consoleLog(
'[QwenAgentManager] Session list item for filePath lookup:',
item,
);
@ -561,7 +571,7 @@ export class QwenAgentManager {
}
}
// Simple linear reconstruction: filter user/assistant and sort by timestamp
console.log(
this.consoleLog(
'[QwenAgentManager] JSONL records read:',
records.length,
filePath,
@ -718,7 +728,7 @@ export class QwenAgentManager {
// Handle other types if needed
}
console.log(
this.consoleLog(
'[QwenAgentManager] JSONL messages reconstructed:',
msgs.length,
);
@ -856,7 +866,7 @@ export class QwenAgentManager {
tag: string,
): Promise<{ success: boolean; message?: string }> {
try {
console.log(
this.consoleLog(
'[QwenAgentManager] Saving session via /chat save command:',
sessionId,
'with tag:',
@ -867,7 +877,9 @@ export class QwenAgentManager {
// The CLI will handle this as a special command
await this.connection.sendPrompt(`/chat save "${tag}"`);
console.log('[QwenAgentManager] /chat save command sent successfully');
this.consoleLog(
'[QwenAgentManager] /chat save command sent successfully',
);
return {
success: true,
message: `Session saved with tag: ${tag}`,
@ -914,14 +926,14 @@ export class QwenAgentManager {
conversationId: string,
): Promise<{ success: boolean; tag?: string; message?: string }> {
try {
console.log('[QwenAgentManager] ===== CHECKPOINT SAVE START =====');
console.log('[QwenAgentManager] Conversation ID:', conversationId);
console.log('[QwenAgentManager] Message count:', messages.length);
console.log(
this.consoleLog('[QwenAgentManager] ===== CHECKPOINT SAVE START =====');
this.consoleLog('[QwenAgentManager] Conversation ID:', conversationId);
this.consoleLog('[QwenAgentManager] Message count:', messages.length);
this.consoleLog(
'[QwenAgentManager] Current working dir:',
this.currentWorkingDir,
);
console.log(
this.consoleLog(
'[QwenAgentManager] Current session ID (from CLI):',
this.currentSessionId,
);
@ -998,11 +1010,11 @@ export class QwenAgentManager {
try {
// Route upcoming session/update messages as discrete messages for replay
this.rehydratingSessionId = sessionId;
console.log(
this.consoleLog(
'[QwenAgentManager] Rehydration start for session:',
sessionId,
);
console.log(
this.consoleLog(
'[QwenAgentManager] Attempting session/load via ACP for session:',
sessionId,
);
@ -1010,7 +1022,7 @@ export class QwenAgentManager {
sessionId,
cwdOverride,
);
console.log(
this.consoleLog(
'[QwenAgentManager] Session load succeeded. Response:',
JSON.stringify(response).substring(0, 200),
);
@ -1050,7 +1062,10 @@ export class QwenAgentManager {
throw error;
} finally {
// End rehydration routing regardless of outcome
console.log('[QwenAgentManager] Rehydration end for session:', sessionId);
this.consoleLog(
'[QwenAgentManager] Rehydration end for session:',
sessionId,
);
this.rehydratingSessionId = null;
}
}
@ -1063,7 +1078,7 @@ export class QwenAgentManager {
* @returns Loaded session messages or null
*/
async loadSession(sessionId: string): Promise<ChatMessage[] | null> {
console.log(
this.consoleLog(
'[QwenAgentManager] Loading session with version-aware strategy:',
sessionId,
);
@ -1072,7 +1087,7 @@ export class QwenAgentManager {
const cliContextManager = CliContextManager.getInstance();
const supportsSessionLoad = cliContextManager.supportsSessionLoad();
console.log(
this.consoleLog(
'[QwenAgentManager] CLI supports session/load:',
supportsSessionLoad,
);
@ -1080,11 +1095,13 @@ export class QwenAgentManager {
// Try ACP method first if supported
if (supportsSessionLoad) {
try {
console.log(
this.consoleLog(
'[QwenAgentManager] Attempting to load session via ACP method',
);
await this.loadSessionViaAcp(sessionId);
console.log('[QwenAgentManager] Session loaded successfully via ACP');
this.consoleLog(
'[QwenAgentManager] Session loaded successfully via ACP',
);
// After loading via ACP, we still need to get messages from file system
// In future, we might get them directly from the ACP response
@ -1098,11 +1115,11 @@ export class QwenAgentManager {
// Always fall back to file system method
try {
console.log(
this.consoleLog(
'[QwenAgentManager] Loading session messages from file system',
);
const messages = await this.loadSessionMessagesFromFile(sessionId);
console.log(
this.consoleLog(
'[QwenAgentManager] Session messages loaded successfully from file system',
);
return messages;
@ -1125,7 +1142,7 @@ export class QwenAgentManager {
sessionId: string,
): Promise<ChatMessage[] | null> {
try {
console.log(
this.consoleLog(
'[QwenAgentManager] Loading session from file system:',
sessionId,
);
@ -1137,7 +1154,7 @@ export class QwenAgentManager {
);
if (!session) {
console.log(
this.consoleLog(
'[QwenAgentManager] Session not found in file system:',
sessionId,
);
@ -1183,93 +1200,67 @@ export class QwenAgentManager {
workingDir: string,
authStateManager?: AuthStateManager,
): Promise<string | null> {
console.log('[QwenAgentManager] Creating new session...');
// Reuse existing session if present
if (this.connection.currentSessionId) {
return this.connection.currentSessionId;
}
// Deduplicate concurrent session/new attempts
if (this.sessionCreateInFlight) {
return this.sessionCreateInFlight;
}
// Check if we have valid cached authentication
let hasValidAuth = false;
this.consoleLog('[QwenAgentManager] Creating new session...');
// Prefer the provided authStateManager, otherwise fall back to the one
// remembered during connect(). This prevents accidental re-auth in
// fallback paths (e.g. session switching) when the handler didn't pass it.
const effectiveAuth = authStateManager || this.defaultAuthStateManager;
if (effectiveAuth) {
hasValidAuth = await effectiveAuth.hasValidAuth(workingDir, authMethod);
console.log(
'[QwenAgentManager] Has valid cached auth for new session:',
hasValidAuth,
);
}
// Only authenticate if we don't have valid cached auth
if (!hasValidAuth) {
console.log(
'[QwenAgentManager] Authenticating before creating session...',
);
this.sessionCreateInFlight = (async () => {
try {
await this.connection.authenticate(authMethod);
console.log('[QwenAgentManager] Authentication successful');
// Save auth state
if (effectiveAuth) {
console.log(
'[QwenAgentManager] Saving auth state after successful authentication',
);
await effectiveAuth.saveAuthState(workingDir, authMethod);
}
} catch (authError) {
console.error('[QwenAgentManager] Authentication failed:', authError);
// Clear potentially invalid cache
if (effectiveAuth) {
console.log(
'[QwenAgentManager] Clearing auth cache due to authentication failure',
);
await effectiveAuth.clearAuthState();
}
throw authError;
}
} else {
console.log(
'[QwenAgentManager] Skipping authentication - using valid cached auth',
);
}
// Try to create a new ACP session. If Qwen asks for auth despite our
// cached flag (e.g. fresh process or expired tokens), re-authenticate and retry.
try {
await this.connection.newSession(workingDir);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
const requiresAuth =
msg.includes('Authentication required') ||
msg.includes('(code: -32000)');
if (requiresAuth) {
console.warn(
'[QwenAgentManager] session/new requires authentication. Retrying with authenticate...',
);
// Try to create a new ACP session. If Qwen asks for auth despite our
// cached flag (e.g. fresh process or expired tokens), re-authenticate and retry.
try {
await this.connection.authenticate(authMethod);
// Persist auth cache so subsequent calls can skip the web flow.
if (effectiveAuth) {
await effectiveAuth.saveAuthState(workingDir, authMethod);
}
await this.connection.newSession(workingDir);
} catch (reauthErr) {
// Clear potentially stale cache on failure and rethrow
if (effectiveAuth) {
await effectiveAuth.clearAuthState();
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
const requiresAuth =
msg.includes('Authentication required') ||
msg.includes('(code: -32000)');
if (requiresAuth) {
console.warn(
'[QwenAgentManager] session/new requires authentication. Retrying with authenticate...',
);
try {
await this.connection.authenticate(authMethod);
// Persist auth cache so subsequent calls can skip the web flow.
if (effectiveAuth) {
await effectiveAuth.saveAuthState(workingDir, authMethod);
}
await this.connection.newSession(workingDir);
} catch (reauthErr) {
// Clear potentially stale cache on failure and rethrow
if (effectiveAuth) {
await effectiveAuth.clearAuthState();
}
throw reauthErr;
}
} else {
throw err;
}
throw reauthErr;
}
} else {
throw err;
const newSessionId = this.connection.currentSessionId;
this.consoleLog(
'[QwenAgentManager] New session created with ID:',
newSessionId,
);
return newSessionId;
} finally {
this.sessionCreateInFlight = null;
}
}
const newSessionId = this.connection.currentSessionId;
console.log(
'[QwenAgentManager] New session created with ID:',
newSessionId,
);
return newSessionId;
})();
return this.sessionCreateInFlight;
}
/**
@ -1285,7 +1276,7 @@ export class QwenAgentManager {
* Cancel current prompt
*/
async cancelCurrentPrompt(): Promise<void> {
console.log('[QwenAgentManager] Cancelling current prompt');
this.consoleLog('[QwenAgentManager] Cancelling current prompt');
await this.connection.cancelSession();
}