diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index 34cbd8500..fb19823f2 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -46,12 +46,14 @@ export interface ChatStore { updateCount: number; activeTaskId: string | null; tasks: { [key: string]: Task }; + activeConnections: Set; // Track active SSE connections by task ID create: (id?: string, type?: any) => string; removeTask: (taskId: string) => void; setStatus: (taskId: string, status: 'running' | 'finished' | 'pending' | 'pause') => void; setActiveTaskId: (taskId: string) => void; replay: (taskId: string, question: string, time: number) => Promise; startTask: (taskId: string, type?: string, shareToken?: string, delayTime?: number) => Promise; + startChildTask: (taskId: string, content: string, projectId: string) => Promise; handleConfirmTask: (project_id:string, taskId: string, type?: string) => void; addMessages: (taskId: string, messages: Message) => void; setMessages: (taskId: string, messages: Message[]) => void; @@ -105,6 +107,7 @@ const chatStore = (initial?: Partial) => createStore()( activeTaskId: null, tasks: initial?.tasks ?? {}, updateCount: 0, + activeConnections: new Set(), // Track active SSE connections create(id?: string, type?: any) { const taskId = id ? id : generateUniqueId(); console.log("Create Task", taskId) @@ -176,7 +179,7 @@ const chatStore = (initial?: Partial) => createStore()( startTask: async (taskId: string, type?: string, shareToken?: string, delayTime?: number) => { const { token, language, modelType, cloud_model_type, email } = getAuthStore() const workerList = useWorkerList(); - const { getLastUserMessage, setDelayTime, setType } = get(); + const { getLastUserMessage, setDelayTime, setType, activeConnections } = get(); const baseURL = await getBaseURL(); let systemLanguage = language if (language === 'system') { @@ -301,6 +304,11 @@ const chatStore = (initial?: Partial) => createStore()( }) } const browser_port = await window.ipcRenderer.invoke('get-browser-port'); + + // Track this connection + activeConnections.add(taskId); + console.log(`Starting SSE connection for task: ${taskId}. Active connections: ${activeConnections.size}`); + fetchEventSource(api, { method: !type ? "POST" : "GET", openWhenHidden: true, @@ -508,6 +516,22 @@ const chatStore = (initial?: Partial) => createStore()( setTaskAssigning(taskId, taskAssigning) return; } + // New Task State from queue - now handled by add_task with new SSE + if (agentMessages.step === "new_task_state") { + const { task_id, content, state, result, failure_count } = agentMessages.data; + if (!task_id || !content || !project_id) return; + + console.log(`new_task_state received for task: ${task_id}, creating a new SSE connection`); + + // This creates a completely new SSE connection for the child task + // Log for debugging but don't process + // Automatically start a new SSE connection for this child task + const { startChildTask } = get(); + await startChildTask(task_id, content, project_id); + + console.log(`Started new SSE stream for child task: ${task_id}`); + return; + } // Activate agent if (agentMessages.step === "activate_agent" || agentMessages.step === "deactivate_agent") { @@ -886,7 +910,7 @@ const chatStore = (initial?: Partial) => createStore()( if (agentMessages.step === "add_task") { try { const taskData = agentMessages.data; - if (taskData && taskData.project_id && taskData.content) { + if (taskData && taskData.project_id && taskData.content && taskData.task_id) { console.log(`Task added to project queue: ${taskData.project_id}`); } } catch (error) { @@ -916,15 +940,17 @@ const chatStore = (initial?: Partial) => createStore()( const taskIdToRemove = agentMessages.data.task_id as string; if (taskIdToRemove) { const projectStore = useProjectStore.getState(); - if (agentMessages.data.project_id) { + // Try to remove from current project otherwise + const project_id = agentMessages.data.project_id ?? projectStore.activeProjectId; + if (project_id) { // Find and remove the message with matching task ID - const project = projectStore.getProjectById(agentMessages.data.project_id); + const project = projectStore.getProjectById(project_id); if (project && project.queuedMessages) { const messageToRemove = project.queuedMessages.find(msg => msg.task_id === taskIdToRemove || msg.content.includes(taskIdToRemove) ); if (messageToRemove) { - projectStore.removeQueuedMessage(agentMessages.data.project_id, messageToRemove.task_id); + projectStore.removeQueuedMessage(project_id, messageToRemove.task_id); console.log(`Task removed from project queue: ${taskIdToRemove}`); } } @@ -1186,11 +1212,72 @@ const chatStore = (initial?: Partial) => createStore()( // Server closes connection onclose() { console.log("server closed"); + // Clean up connection tracking + const { activeConnections } = get(); + activeConnections.delete(taskId); + console.log(`SSE connection closed for task: ${taskId}. Active connections: ${activeConnections.size}`); }, }); }, + startChildTask: async (taskId: string, content: string, projectId: string) => { + console.log(`Starting child task: ${taskId} for project: ${projectId}`); + + // Get project store to create new chat instance + const projectStore = useProjectStore.getState(); + + if (!projectId) { + console.warn("No project ID provided for child task"); + return; + } + + // Create new chat store in the specified project + const newChatId = projectStore.createChatStore(projectId, `Task: ${content.substring(0, 50)}...`); + + if (!newChatId) { + console.error("Failed to create new chat store for child task"); + return; + } + + // Get the new chat store instance + const newChatStore = projectStore.getChatStore(projectId, newChatId); + + if (!newChatStore) { + console.error("Failed to get new chat store instance for child task"); + return; + } + + // Create a new task in the new chat store with the specific task ID + const newTaskId = newChatStore.getState().create(taskId); + + // Add the user message (the task content) + newChatStore.getState().addMessages(newTaskId, { + id: generateUniqueId(), + role: "user", + content: content, + }); + + // Set the task as having messages + newChatStore.getState().setHasMessages(newTaskId, true); + + // Set Active Task to the latest one + newChatStore.getState().setActiveTaskId(newTaskId); + + // Set up initial task state + newChatStore.getState().setActiveWorkSpace(newTaskId, 'workflow'); + newChatStore.getState().setStatus(newTaskId, 'pending'); + newChatStore.getState().setTaskTime(newTaskId, Date.now()); + + // Set the new chat store as active + projectStore.setActiveChatStore(projectId, newChatId); + + // Now start a completely new SSE connection for this child task + await newChatStore.getState().startTask(newTaskId); + + console.log(`Started new SSE connection for child task: ${taskId}`); + }, + replay: async (taskId: string, question: string, time: number) => { const { create, setHasMessages, addMessages, startTask, setActiveTaskId, handleConfirmTask } = get(); //get project id