diff --git a/src/components/ChatBox/index.tsx b/src/components/ChatBox/index.tsx index 41c66f352..1343e2fff 100644 --- a/src/components/ChatBox/index.tsx +++ b/src/components/ChatBox/index.tsx @@ -416,26 +416,30 @@ export default function ChatBox(): JSX.Element { const handleSkip = async () => { const taskId = chatStore.activeTaskId as string; setIsPauseResumeLoading(true); - + try { - // Skip the current task + // First, stop the SSE connection and update local state + chatStore.stopTask(taskId); + + // Then notify backend to skip the task await fetchPost(`/chat/${projectStore.activeProjectId}/skip-task`, { project_id: projectStore.activeProjectId }); - // Update task status to finished - chatStore.setStatus(taskId, 'finished'); + // Ensure pending state is cleared chatStore.setIsPending(taskId, false); - - // toast.success("Task skipped successfully", { - // closeButton: true, - // }); + toast.success("Task stopped successfully", { closeButton: true, }); } catch (error) { console.error("Failed to skip task:", error); - toast.error("Failed to skip task", { + + // If backend call failed, just ensure pending state is cleared + // Don't call stopTask again since it was already called above + chatStore.setIsPending(taskId, false); + + toast.error("Task stopped locally, but backend notification failed", { closeButton: true, }); } finally { diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index 296a15bdf..1e5d000a9 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -51,6 +51,7 @@ export interface ChatStore { tasks: { [key: string]: Task }; create: (id?: string, type?: any) => string; removeTask: (taskId: string) => void; + stopTask: (taskId: string) => void; setStatus: (taskId: string, status: 'running' | 'finished' | 'pending' | 'pause') => void; setActiveTaskId: (taskId: string) => void; replay: (taskId: string, question: string, time: number) => Promise; @@ -114,6 +115,9 @@ export type VanillaChatStore = { // Track auto-confirm timers per task to avoid reusing stale timers across rounds const autoConfirmTimers: Record> = {}; +// Track active SSE connections for proper cleanup +const activeSSEControllers: Record = {}; + const chatStore = (initial?: Partial) => createStore()( (set, get) => ({ activeTaskId: null, @@ -189,6 +193,16 @@ const chatStore = (initial?: Partial) => createStore()( console.warn('Error clearing auto-confirm timer in removeTask:', error); } + // Clean up SSE connection if it exists + try { + if (activeSSEControllers[taskId]) { + activeSSEControllers[taskId].abort(); + delete activeSSEControllers[taskId]; + } + } catch (error) { + console.warn('Error aborting SSE connection in removeTask:', error); + } + set((state) => { delete state.tasks[taskId]; return ({ @@ -198,6 +212,40 @@ const chatStore = (initial?: Partial) => createStore()( }) }) }, + stopTask(taskId: string) { + // Abort the SSE connection for this task + try { + if (activeSSEControllers[taskId]) { + console.log(`Stopping SSE connection for task ${taskId}`); + activeSSEControllers[taskId].abort(); + delete activeSSEControllers[taskId]; + } + } catch (error) { + console.warn('Error aborting SSE connection in stopTask:', error); + } + + // Clean up any pending auto-confirm timers + try { + if (autoConfirmTimers[taskId]) { + clearTimeout(autoConfirmTimers[taskId]); + delete autoConfirmTimers[taskId]; + } + } catch (error) { + console.warn('Error clearing auto-confirm timer in stopTask:', error); + } + + // Update task status to finished + set((state) => ({ + ...state, + tasks: { + ...state.tasks, + [taskId]: { + ...state.tasks[taskId], + status: 'finished' + }, + }, + })) + }, startTask: async (taskId: string, type?: string, shareToken?: string, delayTime?: number, messageContent?: string, messageAttaches?: File[]) => { const { token, language, modelType, cloud_model_type, email } = getAuthStore() const workerList = useWorkerList(); @@ -403,26 +451,31 @@ const chatStore = (initial?: Partial) => createStore()( // during active message processing let lockedChatStore = targetChatStore; let lockedTaskId = newTaskId; - + + // Create AbortController for this task's SSE connection + const abortController = new AbortController(); + activeSSEControllers[newTaskId] = abortController; + // Getter functions that use the locked references instead of dynamic ones const getCurrentChatStore = () => { return lockedChatStore.getState(); }; - + // Get the locked task ID - this won't change during the SSE session const getCurrentTaskId = () => { return lockedTaskId; }; - + // Function to update locked references (only for special cases like replay) const updateLockedReferences = (newChatStore: VanillaChatStore, newTaskId: string) => { lockedChatStore = newChatStore; lockedTaskId = newTaskId; }; - + fetchEventSource(api, { method: !type ? "POST" : "GET", openWhenHidden: true, + signal: abortController.signal, // Add abort signal for proper cleanup headers: { "Content-Type": "application/json", "Authorization": type == 'replay' ? `Bearer ${token}` : undefined as unknown as string }, body: !type ? JSON.stringify({ project_id: project_id, @@ -469,6 +522,21 @@ const chatStore = (initial?: Partial) => createStore()( return; } + // Check if this task has been stopped before processing any message + // But allow messages that switch to new tasks (like confirmed events) + const lockedTaskId = getCurrentTaskId(); + const currentTask = getCurrentChatStore().tasks[lockedTaskId]; + + // Only ignore messages if: + // 1. The task doesn't exist, OR + // 2. The task is finished AND it's not a task-switching event (confirmed, new_task_state) + const isTaskSwitchingEvent = agentMessages.step === "confirmed" || agentMessages.step === "new_task_state"; + if (!currentTask || (currentTask.status === 'finished' && !isTaskSwitchingEvent)) { + // Task was stopped, ignore any incoming messages + console.log(`Ignoring SSE message for stopped task ${lockedTaskId}, step: ${agentMessages.step}`); + return; + } + console.log("agentMessages", agentMessages); const agentNameMap = { developer_agent: "Developer Agent", @@ -1598,13 +1666,29 @@ const chatStore = (initial?: Partial) => createStore()( }, onerror(err) { - console.error("Error:", err); + console.error("SSE Error:", err); + // Clean up AbortController on error + try { + if (activeSSEControllers[newTaskId]) { + delete activeSSEControllers[newTaskId]; + } + } catch (error) { + console.warn('Error cleaning up AbortController on SSE error:', error); + } throw err; }, // Server closes connection onclose() { - console.log("server closed"); + console.log("SSE connection closed"); + // Clean up AbortController when connection closes + try { + if (activeSSEControllers[newTaskId]) { + delete activeSSEControllers[newTaskId]; + } + } catch (error) { + console.warn('Error cleaning up AbortController on SSE close:', error); + } }, }); @@ -2223,6 +2307,22 @@ const chatStore = (initial?: Partial) => createStore()( console.error('Error during timer cleanup in clearTasks:', error); } + // Clean up all active SSE connections + try { + Object.keys(activeSSEControllers).forEach(taskId => { + try { + if (activeSSEControllers[taskId]) { + activeSSEControllers[taskId].abort(); + delete activeSSEControllers[taskId]; + } + } catch (error) { + console.warn(`Error aborting SSE connection for task ${taskId}:`, error); + } + }); + } catch (error) { + console.error('Error during SSE cleanup in clearTasks:', error); + } + window.ipcRenderer.invoke('restart-backend') .then((res) => { console.log('restart-backend', res)