refactor: simplify SSE/trigger comments in background task processor

This commit is contained in:
4pmtong 2026-03-02 20:58:14 +08:00
parent a954166a94
commit 537a5d9eb9
5 changed files with 143 additions and 14 deletions

View file

@ -139,6 +139,19 @@ export default function ChatBox(): JSX.Element {
const _taskId = taskId || chatStore.activeTaskId;
if (message.trim() === '' && !messageStr) return;
const tempMessageContent = messageStr || message;
if (executionId && projectStore.activeProjectId) {
const project = projectStore.getProjectById(projectStore.activeProjectId);
const isInQueue = project?.queuedMessages?.some(
(m) => m.executionId === executionId
);
if (isInQueue) {
console.warn(
`[handleSend] Skipping message with executionId ${executionId} - already in queue, will be processed by useBackgroundTaskProcessor`
);
return;
}
}
chatStore.setHasMessages(_taskId as string, true);
if (!_taskId) return;

View file

@ -14,6 +14,10 @@
import { generateUniqueId } from '@/lib';
import { proxyUpdateTriggerExecution } from '@/service/triggerApi';
import {
closeSSEConnectionsForTasks,
hasActiveSSEConnection,
} from '@/store/chatStore';
import { useProjectStore } from '@/store/projectStore';
import { useTriggerTaskStore } from '@/store/triggerTaskStore';
import { ExecutionStatus } from '@/types';
@ -107,7 +111,44 @@ export function useBackgroundTaskProcessor() {
continue;
}
const msg = projectData.queuedMessages.find((m) => m.executionId);
// If SSE is active, starting a new task would duplicate trigger processing.
// Wait for the active task to finish; if task is done but SSE lingers, close it
// so the trigger can start fresh on the next poll.
const allTaskIds = Object.values(projectData.chatStores || {}).flatMap(
(cs) => Object.keys(cs.getState().tasks)
);
if (hasActiveSSEConnection(allTaskIds)) {
const activeChatStore = projectStore.getChatStore(project.id);
const activeState = activeChatStore?.getState();
const activeTaskId = activeState?.activeTaskId;
const activeTask = activeTaskId
? activeState?.tasks[activeTaskId]
: null;
const isActiveTaskDone =
activeTask?.status === ChatTaskStatus.FINISHED ||
activeTask?.hasWaitComfirm;
if (isActiveTaskDone) {
console.log(
'[BackgroundTaskProcessor] Closing stale SSE for project',
project.id,
'- active task done, trigger waiting in queue'
);
closeSSEConnectionsForTasks(allTaskIds);
} else {
console.log(
'[BackgroundTaskProcessor] Skipping project',
project.id,
'- SSE active, task still in progress'
);
}
continue;
}
const msg = projectData.queuedMessages.find(
(m) => m.executionId && !m.processing
);
if (msg && msg.executionId) {
messageToProcess = {
projectId: project.id,
@ -137,7 +178,14 @@ export function useBackgroundTaskProcessor() {
triggerName,
} = messageToProcess;
projectStore.removeQueuedMessage(projectId, task_id);
projectStore.markQueuedMessageAsProcessing(projectId, task_id);
console.log(
'[BackgroundTaskProcessor] Marked message as processing:',
task_id,
'executionId:',
executionId
);
try {
// Get the latest project's chatStore
@ -196,6 +244,8 @@ export function useBackgroundTaskProcessor() {
'[BackgroundTaskProcessor] Background task completed:',
executionId
);
// Remove from queue after successful completion
projectStore.removeQueuedMessage(projectId, task_id);
activeTasksRef.current.delete(executionId);
})
.catch((err: any) => {
@ -203,6 +253,8 @@ export function useBackgroundTaskProcessor() {
'[BackgroundTaskProcessor] Background task error:',
err
);
// Remove from queue on error as well
projectStore.removeQueuedMessage(projectId, task_id);
// Report failure to backend
proxyUpdateTriggerExecution(
executionId,
@ -239,6 +291,8 @@ export function useBackgroundTaskProcessor() {
'[BackgroundTaskProcessor] Failed to start background task:',
error
);
// Remove from queue on error
projectStore.removeQueuedMessage(projectId, task_id);
// Report failure to backend
proxyUpdateTriggerExecution(
executionId,
@ -270,12 +324,13 @@ export function useBackgroundTaskProcessor() {
for (const chatStore of Object.values(project.chatStores)) {
const state = chatStore.getState();
const t = state.tasks[task.chatTaskId];
if (
t &&
t.status !== ChatTaskStatus.RUNNING &&
t.status !== ChatTaskStatus.PAUSE
) {
toRemove.push(executionId);
if (t) {
if (
t.status !== ChatTaskStatus.RUNNING &&
t.status !== ChatTaskStatus.PAUSE
) {
toRemove.push(executionId);
}
break;
}
}

View file

@ -182,11 +182,6 @@ export function useTriggerTaskExecutor() {
// Format the message with all context
const formattedMessage = formatTriggeredTaskMessage(task);
console.log(
'[TriggerTaskExecutor] Adding message to project queue:',
formattedMessage.substring(0, 100) + '...'
);
// Add message directly to projectStore's queuedMessages
// useBackgroundTaskProcessor will pick it up and execute it
const queuedTaskId = store.addQueuedMessage(
@ -229,7 +224,7 @@ export function useTriggerTaskExecutor() {
if (webSocketEvent) {
console.log(
'[TriggerTaskExecutor] WebSocket event received:',
webSocketEvent
webSocketEvent.executionId
);
const task: TriggeredTask = {

View file

@ -3448,3 +3448,26 @@ export const useChatStore = chatStore;
export const createChatStoreInstance = chatStore;
export const getToolStore = () => chatStore().getState();
/** Returns true if any task has an active SSE connection. */
export function hasActiveSSEConnection(taskIds: string[]): boolean {
return taskIds.some((taskId) => !!activeSSEControllers[taskId]);
}
/** Close SSE for given tasks (e.g. after completion, so triggers can start fresh). */
export function closeSSEConnectionsForTasks(taskIds: string[]): void {
for (const taskId of taskIds) {
if (activeSSEControllers[taskId]) {
console.log(
'[closeSSEConnectionsForTasks] Closing SSE for task:',
taskId
);
try {
activeSSEControllers[taskId].abort();
} catch (_e) {
// Ignore if already aborted
}
delete activeSSEControllers[taskId];
}
}
}

View file

@ -31,6 +31,7 @@ interface TaskQueue {
triggerTaskId?: string;
triggerId?: number;
triggerName?: string;
processing?: boolean;
}
interface Project {
@ -112,6 +113,7 @@ interface ProjectStore {
removeQueuedMessage: (projectId: string, taskId: string) => TaskQueue;
restoreQueuedMessage: (projectId: string, messageData: TaskQueue) => void;
clearQueuedMessages: (projectId: string) => void;
markQueuedMessageAsProcessing: (projectId: string, taskId: string) => void;
// Chat store state management
createChatStore: (projectId: string, chatName?: string) => string | null;
@ -873,6 +875,10 @@ const projectStore = create<ProjectStore>()((set, get) => ({
},
}));
console.log(
`[addQueuedMessage] Message added successfully: task_id=${actual_task_id}, queue length now: ${get().projects[projectId].queuedMessages.length}`
);
return actual_task_id;
},
@ -966,6 +972,43 @@ const projectStore = create<ProjectStore>()((set, get) => ({
}));
},
markQueuedMessageAsProcessing: (projectId: string, taskId: string) => {
const { projects } = get();
if (!projects[projectId]) {
console.warn(`Project ${projectId} not found`);
return;
}
const message = projects[projectId].queuedMessages.find(
(m) => m.task_id === taskId
);
if (!message) {
console.warn(
`Message with task_id ${taskId} not found in project ${projectId}`
);
return;
}
set((state) => ({
projects: {
...state.projects,
[projectId]: {
...state.projects[projectId],
queuedMessages: state.projects[projectId].queuedMessages.map((m) =>
m.task_id === taskId ? { ...m, processing: true } : m
),
updatedAt: Date.now(),
},
},
}));
console.log(
`[ProjectStore] Marked message as processing: ${taskId} in project ${projectId}`
);
},
getAllChatStores: (projectId: string) => {
const { projects } = get();