feat: handle child SSE creation

This commit is contained in:
a7m-1st 2025-10-09 03:23:50 +03:00
parent f31b9d1ad9
commit 5cb3da9844

View file

@ -46,12 +46,14 @@ export interface ChatStore {
updateCount: number;
activeTaskId: string | null;
tasks: { [key: string]: Task };
activeConnections: Set<string>; // Track active SSE connections by task ID
create: (id?: string, type?: any) => string;
removeTask: (taskId: string) => void;
setStatus: (taskId: string, status: 'running' | 'finished' | 'pending' | 'pause') => void;
setActiveTaskId: (taskId: string) => void;
replay: (taskId: string, question: string, time: number) => Promise<void>;
startTask: (taskId: string, type?: string, shareToken?: string, delayTime?: number) => Promise<void>;
startChildTask: (taskId: string, content: string, projectId: string) => Promise<void>;
handleConfirmTask: (project_id:string, taskId: string, type?: string) => void;
addMessages: (taskId: string, messages: Message) => void;
setMessages: (taskId: string, messages: Message[]) => void;
@ -105,6 +107,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
activeTaskId: null,
tasks: initial?.tasks ?? {},
updateCount: 0,
activeConnections: new Set(), // Track active SSE connections
create(id?: string, type?: any) {
const taskId = id ? id : generateUniqueId();
console.log("Create Task", taskId)
@ -176,7 +179,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
startTask: async (taskId: string, type?: string, shareToken?: string, delayTime?: number) => {
const { token, language, modelType, cloud_model_type, email } = getAuthStore()
const workerList = useWorkerList();
const { getLastUserMessage, setDelayTime, setType } = get();
const { getLastUserMessage, setDelayTime, setType, activeConnections } = get();
const baseURL = await getBaseURL();
let systemLanguage = language
if (language === 'system') {
@ -301,6 +304,11 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
})
}
const browser_port = await window.ipcRenderer.invoke('get-browser-port');
// Track this connection
activeConnections.add(taskId);
console.log(`Starting SSE connection for task: ${taskId}. Active connections: ${activeConnections.size}`);
fetchEventSource(api, {
method: !type ? "POST" : "GET",
openWhenHidden: true,
@ -508,6 +516,22 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
setTaskAssigning(taskId, taskAssigning)
return;
}
// New Task State from queue - now handled by add_task with new SSE
if (agentMessages.step === "new_task_state") {
const { task_id, content, state, result, failure_count } = agentMessages.data;
if (!task_id || !content || !project_id) return;
console.log(`new_task_state received for task: ${task_id}, creating a new SSE connection`);
// This creates a completely new SSE connection for the child task
// Log for debugging but don't process
// Automatically start a new SSE connection for this child task
const { startChildTask } = get();
await startChildTask(task_id, content, project_id);
console.log(`Started new SSE stream for child task: ${task_id}`);
return;
}
// Activate agent
if (agentMessages.step === "activate_agent" || agentMessages.step === "deactivate_agent") {
@ -886,7 +910,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
if (agentMessages.step === "add_task") {
try {
const taskData = agentMessages.data;
if (taskData && taskData.project_id && taskData.content) {
if (taskData && taskData.project_id && taskData.content && taskData.task_id) {
console.log(`Task added to project queue: ${taskData.project_id}`);
}
} catch (error) {
@ -916,15 +940,17 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
const taskIdToRemove = agentMessages.data.task_id as string;
if (taskIdToRemove) {
const projectStore = useProjectStore.getState();
if (agentMessages.data.project_id) {
// Try to remove from current project otherwise
const project_id = agentMessages.data.project_id ?? projectStore.activeProjectId;
if (project_id) {
// Find and remove the message with matching task ID
const project = projectStore.getProjectById(agentMessages.data.project_id);
const project = projectStore.getProjectById(project_id);
if (project && project.queuedMessages) {
const messageToRemove = project.queuedMessages.find(msg =>
msg.task_id === taskIdToRemove || msg.content.includes(taskIdToRemove)
);
if (messageToRemove) {
projectStore.removeQueuedMessage(agentMessages.data.project_id, messageToRemove.task_id);
projectStore.removeQueuedMessage(project_id, messageToRemove.task_id);
console.log(`Task removed from project queue: ${taskIdToRemove}`);
}
}
@ -1186,11 +1212,72 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
// Server closes connection
onclose() {
console.log("server closed");
// Clean up connection tracking
const { activeConnections } = get();
activeConnections.delete(taskId);
console.log(`SSE connection closed for task: ${taskId}. Active connections: ${activeConnections.size}`);
},
});
},
startChildTask: async (taskId: string, content: string, projectId: string) => {
console.log(`Starting child task: ${taskId} for project: ${projectId}`);
// Get project store to create new chat instance
const projectStore = useProjectStore.getState();
if (!projectId) {
console.warn("No project ID provided for child task");
return;
}
// Create new chat store in the specified project
const newChatId = projectStore.createChatStore(projectId, `Task: ${content.substring(0, 50)}...`);
if (!newChatId) {
console.error("Failed to create new chat store for child task");
return;
}
// Get the new chat store instance
const newChatStore = projectStore.getChatStore(projectId, newChatId);
if (!newChatStore) {
console.error("Failed to get new chat store instance for child task");
return;
}
// Create a new task in the new chat store with the specific task ID
const newTaskId = newChatStore.getState().create(taskId);
// Add the user message (the task content)
newChatStore.getState().addMessages(newTaskId, {
id: generateUniqueId(),
role: "user",
content: content,
});
// Set the task as having messages
newChatStore.getState().setHasMessages(newTaskId, true);
// Set Active Task to the latest one
newChatStore.getState().setActiveTaskId(newTaskId);
// Set up initial task state
newChatStore.getState().setActiveWorkSpace(newTaskId, 'workflow');
newChatStore.getState().setStatus(newTaskId, 'pending');
newChatStore.getState().setTaskTime(newTaskId, Date.now());
// Set the new chat store as active
projectStore.setActiveChatStore(projectId, newChatId);
// Now start a completely new SSE connection for this child task
await newChatStore.getState().startTask(newTaskId);
console.log(`Started new SSE connection for child task: ${taskId}`);
},
replay: async (taskId: string, question: string, time: number) => {
const { create, setHasMessages, addMessages, startTask, setActiveTaskId, handleConfirmTask } = get();
//get project id