diff --git a/src/components/ChatBox/index.tsx b/src/components/ChatBox/index.tsx index a95f183b..e35c18e9 100644 --- a/src/components/ChatBox/index.tsx +++ b/src/components/ChatBox/index.tsx @@ -139,6 +139,19 @@ export default function ChatBox(): JSX.Element { const _taskId = taskId || chatStore.activeTaskId; if (message.trim() === '' && !messageStr) return; const tempMessageContent = messageStr || message; + + if (executionId && projectStore.activeProjectId) { + const project = projectStore.getProjectById(projectStore.activeProjectId); + const isInQueue = project?.queuedMessages?.some( + (m) => m.executionId === executionId + ); + if (isInQueue) { + console.warn( + `[handleSend] Skipping message with executionId ${executionId} - already in queue, will be processed by useBackgroundTaskProcessor` + ); + return; + } + } chatStore.setHasMessages(_taskId as string, true); if (!_taskId) return; diff --git a/src/hooks/useBackgroundTaskProcessor.ts b/src/hooks/useBackgroundTaskProcessor.ts index a9ab0e50..eed81a64 100644 --- a/src/hooks/useBackgroundTaskProcessor.ts +++ b/src/hooks/useBackgroundTaskProcessor.ts @@ -14,6 +14,10 @@ import { generateUniqueId } from '@/lib'; import { proxyUpdateTriggerExecution } from '@/service/triggerApi'; +import { + closeSSEConnectionsForTasks, + hasActiveSSEConnection, +} from '@/store/chatStore'; import { useProjectStore } from '@/store/projectStore'; import { useTriggerTaskStore } from '@/store/triggerTaskStore'; import { ExecutionStatus } from '@/types'; @@ -107,7 +111,44 @@ export function useBackgroundTaskProcessor() { continue; } - const msg = projectData.queuedMessages.find((m) => m.executionId); + // If SSE is active, starting a new task would duplicate trigger processing. + // Wait for the active task to finish; if task is done but SSE lingers, close it + // so the trigger can start fresh on the next poll. + const allTaskIds = Object.values(projectData.chatStores || {}).flatMap( + (cs) => Object.keys(cs.getState().tasks) + ); + if (hasActiveSSEConnection(allTaskIds)) { + const activeChatStore = projectStore.getChatStore(project.id); + const activeState = activeChatStore?.getState(); + const activeTaskId = activeState?.activeTaskId; + const activeTask = activeTaskId + ? activeState?.tasks[activeTaskId] + : null; + + const isActiveTaskDone = + activeTask?.status === ChatTaskStatus.FINISHED || + activeTask?.hasWaitComfirm; + + if (isActiveTaskDone) { + console.log( + '[BackgroundTaskProcessor] Closing stale SSE for project', + project.id, + '- active task done, trigger waiting in queue' + ); + closeSSEConnectionsForTasks(allTaskIds); + } else { + console.log( + '[BackgroundTaskProcessor] Skipping project', + project.id, + '- SSE active, task still in progress' + ); + } + continue; + } + + const msg = projectData.queuedMessages.find( + (m) => m.executionId && !m.processing + ); if (msg && msg.executionId) { messageToProcess = { projectId: project.id, @@ -137,7 +178,14 @@ export function useBackgroundTaskProcessor() { triggerName, } = messageToProcess; - projectStore.removeQueuedMessage(projectId, task_id); + projectStore.markQueuedMessageAsProcessing(projectId, task_id); + + console.log( + '[BackgroundTaskProcessor] Marked message as processing:', + task_id, + 'executionId:', + executionId + ); try { // Get the latest project's chatStore @@ -196,6 +244,8 @@ export function useBackgroundTaskProcessor() { '[BackgroundTaskProcessor] Background task completed:', executionId ); + // Remove from queue after successful completion + projectStore.removeQueuedMessage(projectId, task_id); activeTasksRef.current.delete(executionId); }) .catch((err: any) => { @@ -203,6 +253,8 @@ export function useBackgroundTaskProcessor() { '[BackgroundTaskProcessor] Background task error:', err ); + // Remove from queue on error as well + projectStore.removeQueuedMessage(projectId, task_id); // Report failure to backend proxyUpdateTriggerExecution( executionId, @@ -239,6 +291,8 @@ export function useBackgroundTaskProcessor() { '[BackgroundTaskProcessor] Failed to start background task:', error ); + // Remove from queue on error + projectStore.removeQueuedMessage(projectId, task_id); // Report failure to backend proxyUpdateTriggerExecution( executionId, @@ -270,12 +324,13 @@ export function useBackgroundTaskProcessor() { for (const chatStore of Object.values(project.chatStores)) { const state = chatStore.getState(); const t = state.tasks[task.chatTaskId]; - if ( - t && - t.status !== ChatTaskStatus.RUNNING && - t.status !== ChatTaskStatus.PAUSE - ) { - toRemove.push(executionId); + if (t) { + if ( + t.status !== ChatTaskStatus.RUNNING && + t.status !== ChatTaskStatus.PAUSE + ) { + toRemove.push(executionId); + } break; } } diff --git a/src/hooks/useTriggerTaskExecutor.ts b/src/hooks/useTriggerTaskExecutor.ts index dc52b450..1d493cfc 100644 --- a/src/hooks/useTriggerTaskExecutor.ts +++ b/src/hooks/useTriggerTaskExecutor.ts @@ -182,11 +182,6 @@ export function useTriggerTaskExecutor() { // Format the message with all context const formattedMessage = formatTriggeredTaskMessage(task); - console.log( - '[TriggerTaskExecutor] Adding message to project queue:', - formattedMessage.substring(0, 100) + '...' - ); - // Add message directly to projectStore's queuedMessages // useBackgroundTaskProcessor will pick it up and execute it const queuedTaskId = store.addQueuedMessage( @@ -229,7 +224,7 @@ export function useTriggerTaskExecutor() { if (webSocketEvent) { console.log( '[TriggerTaskExecutor] WebSocket event received:', - webSocketEvent + webSocketEvent.executionId ); const task: TriggeredTask = { diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index 5605dcc6..9138f857 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -3448,3 +3448,26 @@ export const useChatStore = chatStore; export const createChatStoreInstance = chatStore; export const getToolStore = () => chatStore().getState(); + +/** Returns true if any task has an active SSE connection. */ +export function hasActiveSSEConnection(taskIds: string[]): boolean { + return taskIds.some((taskId) => !!activeSSEControllers[taskId]); +} + +/** Close SSE for given tasks (e.g. after completion, so triggers can start fresh). */ +export function closeSSEConnectionsForTasks(taskIds: string[]): void { + for (const taskId of taskIds) { + if (activeSSEControllers[taskId]) { + console.log( + '[closeSSEConnectionsForTasks] Closing SSE for task:', + taskId + ); + try { + activeSSEControllers[taskId].abort(); + } catch (_e) { + // Ignore if already aborted + } + delete activeSSEControllers[taskId]; + } + } +} diff --git a/src/store/projectStore.ts b/src/store/projectStore.ts index 24369dba..a5b4ff24 100644 --- a/src/store/projectStore.ts +++ b/src/store/projectStore.ts @@ -31,6 +31,7 @@ interface TaskQueue { triggerTaskId?: string; triggerId?: number; triggerName?: string; + processing?: boolean; } interface Project { @@ -112,6 +113,7 @@ interface ProjectStore { removeQueuedMessage: (projectId: string, taskId: string) => TaskQueue; restoreQueuedMessage: (projectId: string, messageData: TaskQueue) => void; clearQueuedMessages: (projectId: string) => void; + markQueuedMessageAsProcessing: (projectId: string, taskId: string) => void; // Chat store state management createChatStore: (projectId: string, chatName?: string) => string | null; @@ -873,6 +875,10 @@ const projectStore = create()((set, get) => ({ }, })); + console.log( + `[addQueuedMessage] Message added successfully: task_id=${actual_task_id}, queue length now: ${get().projects[projectId].queuedMessages.length}` + ); + return actual_task_id; }, @@ -966,6 +972,43 @@ const projectStore = create()((set, get) => ({ })); }, + markQueuedMessageAsProcessing: (projectId: string, taskId: string) => { + const { projects } = get(); + + if (!projects[projectId]) { + console.warn(`Project ${projectId} not found`); + return; + } + + const message = projects[projectId].queuedMessages.find( + (m) => m.task_id === taskId + ); + + if (!message) { + console.warn( + `Message with task_id ${taskId} not found in project ${projectId}` + ); + return; + } + + set((state) => ({ + projects: { + ...state.projects, + [projectId]: { + ...state.projects[projectId], + queuedMessages: state.projects[projectId].queuedMessages.map((m) => + m.task_id === taskId ? { ...m, processing: true } : m + ), + updatedAt: Date.now(), + }, + }, + })); + + console.log( + `[ProjectStore] Marked message as processing: ${taskId} in project ${projectId}` + ); + }, + getAllChatStores: (projectId: string) => { const { projects } = get();