fix: stop task not working as expected

This commit is contained in:
Wendong-Fan 2025-11-20 16:25:35 +08:00
parent 0cba98c2aa
commit c3e7e8ef42
2 changed files with 119 additions and 15 deletions

View file

@ -416,26 +416,30 @@ export default function ChatBox(): JSX.Element {
const handleSkip = async () => {
const taskId = chatStore.activeTaskId as string;
setIsPauseResumeLoading(true);
try {
// Skip the current task
// First, stop the SSE connection and update local state
chatStore.stopTask(taskId);
// Then notify backend to skip the task
await fetchPost(`/chat/${projectStore.activeProjectId}/skip-task`, {
project_id: projectStore.activeProjectId
});
// Update task status to finished
chatStore.setStatus(taskId, 'finished');
// Ensure pending state is cleared
chatStore.setIsPending(taskId, false);
// toast.success("Task skipped successfully", {
// closeButton: true,
// });
toast.success("Task stopped successfully", {
closeButton: true,
});
} catch (error) {
console.error("Failed to skip task:", error);
toast.error("Failed to skip task", {
// If backend call failed, just ensure pending state is cleared
// Don't call stopTask again since it was already called above
chatStore.setIsPending(taskId, false);
toast.error("Task stopped locally, but backend notification failed", {
closeButton: true,
});
} finally {

View file

@ -51,6 +51,7 @@ export interface ChatStore {
tasks: { [key: string]: Task };
create: (id?: string, type?: any) => string;
removeTask: (taskId: string) => void;
stopTask: (taskId: string) => void;
setStatus: (taskId: string, status: 'running' | 'finished' | 'pending' | 'pause') => void;
setActiveTaskId: (taskId: string) => void;
replay: (taskId: string, question: string, time: number) => Promise<void>;
@ -114,6 +115,9 @@ export type VanillaChatStore = {
// Track auto-confirm timers per task to avoid reusing stale timers across rounds
const autoConfirmTimers: Record<string, ReturnType<typeof setTimeout>> = {};
// Track active SSE connections for proper cleanup
const activeSSEControllers: Record<string, AbortController> = {};
const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
(set, get) => ({
activeTaskId: null,
@ -189,6 +193,16 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
console.warn('Error clearing auto-confirm timer in removeTask:', error);
}
// Clean up SSE connection if it exists
try {
if (activeSSEControllers[taskId]) {
activeSSEControllers[taskId].abort();
delete activeSSEControllers[taskId];
}
} catch (error) {
console.warn('Error aborting SSE connection in removeTask:', error);
}
set((state) => {
delete state.tasks[taskId];
return ({
@ -198,6 +212,40 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
})
})
},
stopTask(taskId: string) {
// Abort the SSE connection for this task
try {
if (activeSSEControllers[taskId]) {
console.log(`Stopping SSE connection for task ${taskId}`);
activeSSEControllers[taskId].abort();
delete activeSSEControllers[taskId];
}
} catch (error) {
console.warn('Error aborting SSE connection in stopTask:', error);
}
// Clean up any pending auto-confirm timers
try {
if (autoConfirmTimers[taskId]) {
clearTimeout(autoConfirmTimers[taskId]);
delete autoConfirmTimers[taskId];
}
} catch (error) {
console.warn('Error clearing auto-confirm timer in stopTask:', error);
}
// Update task status to finished
set((state) => ({
...state,
tasks: {
...state.tasks,
[taskId]: {
...state.tasks[taskId],
status: 'finished'
},
},
}))
},
startTask: async (taskId: string, type?: string, shareToken?: string, delayTime?: number, messageContent?: string, messageAttaches?: File[]) => {
const { token, language, modelType, cloud_model_type, email } = getAuthStore()
const workerList = useWorkerList();
@ -403,26 +451,31 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
// during active message processing
let lockedChatStore = targetChatStore;
let lockedTaskId = newTaskId;
// Create AbortController for this task's SSE connection
const abortController = new AbortController();
activeSSEControllers[newTaskId] = abortController;
// Getter functions that use the locked references instead of dynamic ones
const getCurrentChatStore = () => {
return lockedChatStore.getState();
};
// Get the locked task ID - this won't change during the SSE session
const getCurrentTaskId = () => {
return lockedTaskId;
};
// Function to update locked references (only for special cases like replay)
const updateLockedReferences = (newChatStore: VanillaChatStore, newTaskId: string) => {
lockedChatStore = newChatStore;
lockedTaskId = newTaskId;
};
fetchEventSource(api, {
method: !type ? "POST" : "GET",
openWhenHidden: true,
signal: abortController.signal, // Add abort signal for proper cleanup
headers: { "Content-Type": "application/json", "Authorization": type == 'replay' ? `Bearer ${token}` : undefined as unknown as string },
body: !type ? JSON.stringify({
project_id: project_id,
@ -469,6 +522,21 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
return;
}
// Check if this task has been stopped before processing any message
// But allow messages that switch to new tasks (like confirmed events)
const lockedTaskId = getCurrentTaskId();
const currentTask = getCurrentChatStore().tasks[lockedTaskId];
// Only ignore messages if:
// 1. The task doesn't exist, OR
// 2. The task is finished AND it's not a task-switching event (confirmed, new_task_state)
const isTaskSwitchingEvent = agentMessages.step === "confirmed" || agentMessages.step === "new_task_state";
if (!currentTask || (currentTask.status === 'finished' && !isTaskSwitchingEvent)) {
// Task was stopped, ignore any incoming messages
console.log(`Ignoring SSE message for stopped task ${lockedTaskId}, step: ${agentMessages.step}`);
return;
}
console.log("agentMessages", agentMessages);
const agentNameMap = {
developer_agent: "Developer Agent",
@ -1598,13 +1666,29 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
},
onerror(err) {
console.error("Error:", err);
console.error("SSE Error:", err);
// Clean up AbortController on error
try {
if (activeSSEControllers[newTaskId]) {
delete activeSSEControllers[newTaskId];
}
} catch (error) {
console.warn('Error cleaning up AbortController on SSE error:', error);
}
throw err;
},
// Server closes connection
onclose() {
console.log("server closed");
console.log("SSE connection closed");
// Clean up AbortController when connection closes
try {
if (activeSSEControllers[newTaskId]) {
delete activeSSEControllers[newTaskId];
}
} catch (error) {
console.warn('Error cleaning up AbortController on SSE close:', error);
}
},
});
@ -2223,6 +2307,22 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
console.error('Error during timer cleanup in clearTasks:', error);
}
// Clean up all active SSE connections
try {
Object.keys(activeSSEControllers).forEach(taskId => {
try {
if (activeSSEControllers[taskId]) {
activeSSEControllers[taskId].abort();
delete activeSSEControllers[taskId];
}
} catch (error) {
console.warn(`Error aborting SSE connection for task ${taskId}:`, error);
}
});
} catch (error) {
console.error('Error during SSE cleanup in clearTasks:', error);
}
window.ipcRenderer.invoke('restart-backend')
.then((res) => {
console.log('restart-backend', res)