From b3b7ed6fe82595ef95d60eb0cc2e740fc3c11dd2 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Thu, 22 Jan 2026 03:21:31 +0100 Subject: [PATCH] . --- .env.development | 8 ++-- backend/app/service/chat_service.py | 2 + backend/app/service/task.py | 7 ++++ backend/app/utils/agent.py | 32 ++++++++++++++- src/components/WorkFlow/node.tsx | 10 ++++- src/store/chatStore.ts | 61 +++++++++++++++++++++++++++++ 6 files changed, 113 insertions(+), 7 deletions(-) diff --git a/.env.development b/.env.development index 9d26a2f4..9d5806aa 100644 --- a/.env.development +++ b/.env.development @@ -1,8 +1,6 @@ -VITE_BASE_URL=/api - -VITE_PROXY_URL=https://dev.eigent.ai - -VITE_USE_LOCAL_PROXY=false +VITE_BASE_URL=http://localhost:8000 +VITE_PROXY_URL=http://localhost:8000 +VITE_USE_LOCAL_PROXY=true # VITE_PROXY_URL=http://localhost:3001 # VITE_USE_LOCAL_PROXY=true diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index 04accf8b..404ab646 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -891,6 +891,8 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock): logger.warning(f"Cannot resume: workforce is None for project {options.project_id}") elif item.action == Action.decompose_text: yield sse_json("decompose_text", item.data) + elif item.action == Action.streaming_agent_output: + yield sse_json("streaming_agent_output", item.data) elif item.action == Action.decompose_progress: yield sse_json("to_sub_tasks", item.data) elif item.action == Action.new_agent: diff --git a/backend/app/service/task.py b/backend/app/service/task.py index d4958dd9..33040cc3 100644 --- a/backend/app/service/task.py +++ b/backend/app/service/task.py @@ -22,6 +22,7 @@ class Action(str, Enum): new_task_state = "new_task_state" # backend -> user decompose_progress = "decompose_progress" # backend -> user (streaming decomposition) decompose_text = "decompose_text" # backend -> user (raw streaming text) + streaming_agent_output = "streaming_agent_output" # backend -> user (streaming agent output during task execution) start = "start" # user -> backend create_agent = "create_agent" # backend -> user activate_agent = "activate_agent" # backend -> user @@ -78,6 +79,11 @@ class ActionDecomposeTextData(BaseModel): data: dict +class ActionStreamingAgentOutputData(BaseModel): + action: Literal[Action.streaming_agent_output] = Action.streaming_agent_output + data: dict[Literal["agent_name", "process_task_id", "agent_id", "content", "is_final"], str | bool] + + class ActionNewTaskStateData(BaseModel): action: Literal[Action.new_task_state] = Action.new_task_state data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int] @@ -249,6 +255,7 @@ ActionData = ( | ActionSkipTaskData | ActionDecomposeTextData | ActionDecomposeProgressData + | ActionStreamingAgentOutputData ) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 91ef926e..22ea04d2 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -125,6 +125,7 @@ from app.service.task import ( ActionCreateAgentData, ActionDeactivateAgentData, ActionDeactivateToolkitData, + ActionStreamingAgentOutputData, Agents, get_task_lock, ) @@ -269,7 +270,22 @@ class ListenChatAgent(ChatAgent): last_response = chunk # Accumulate content from each chunk (delta mode) if chunk.msg and chunk.msg.content: - accumulated_content += chunk.msg.content + delta_content = chunk.msg.content + accumulated_content += delta_content + # Stream output chunk to frontend (non-blocking) + asyncio.create_task( + task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": delta_content, + "is_final": False, + }, + ) + ) + ) yield chunk finally: total_tokens = 0 @@ -281,6 +297,20 @@ class ListenChatAgent(ChatAgent): ) if usage_info: total_tokens = usage_info.get("total_tokens", 0) + # Send final streaming output marker + asyncio.create_task( + task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": "", + "is_final": True, + }, + ) + ) + ) asyncio.create_task( task_lock.put_queue( ActionDeactivateAgentData( diff --git a/src/components/WorkFlow/node.tsx b/src/components/WorkFlow/node.tsx index 30b261ab..b3fb8cad 100644 --- a/src/components/WorkFlow/node.tsx +++ b/src/components/WorkFlow/node.tsx @@ -713,7 +713,15 @@ export function Node({ id, data }: NodeProps) {
{task.content}
{task?.status === "running" && ( -
+
+ {/* Streaming agent output */} + {chatStore.tasks[chatStore.activeTaskId as string]?.streamingAgentOutput?.[task.id] && ( +
+
+ {chatStore.tasks[chatStore.activeTaskId as string].streamingAgentOutput[task.id]} +
+
+ )} {/* active toolkit */} {task.toolkits && task.toolkits.length > 0 && diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index 6cc93c98..f44e2639 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -44,6 +44,8 @@ interface Task { isContextExceeded?: boolean; // Streaming decompose text - stored separately to avoid frequent re-renders streamingDecomposeText: string; + // Streaming agent output - real-time agent output during task execution + streamingAgentOutput: { [processTaskId: string]: string }; } export interface ChatStore { @@ -107,6 +109,8 @@ export interface ChatStore { setNextTaskId: (taskId: string | null) => void; setStreamingDecomposeText: (taskId: string, text: string) => void; clearStreamingDecomposeText: (taskId: string) => void; + updateStreamingAgentOutput: (taskId: string, processTaskId: string, content: string) => void; + clearStreamingAgentOutput: (taskId: string, processTaskId: string) => void; } export type VanillaChatStore = { @@ -207,6 +211,7 @@ const chatStore = (initial?: Partial) => createStore()( isTakeControl: false, isTaskEdit: false, streamingDecomposeText: '', + streamingAgentOutput: {}, }, } })) @@ -794,6 +799,8 @@ const chatStore = (initial?: Partial) => createStore()( setIsContextExceeded, setStreamingDecomposeText, clearStreamingDecomposeText, + updateStreamingAgentOutput, + clearStreamingAgentOutput, setIsTaskEdit } = getCurrentChatStore() currentTaskId = getCurrentTaskId(); @@ -840,6 +847,24 @@ const chatStore = (initial?: Partial) => createStore()( return; } + // Handle streaming agent output during task execution + if (agentMessages.step === "streaming_agent_output") { + const data = agentMessages.data as { process_task_id?: string; content?: string; is_final?: boolean }; + const { process_task_id, content, is_final } = data; + const currentId = getCurrentTaskId(); + + if (!process_task_id) return; + + if (is_final) { + // Clear streaming output when final marker received + clearStreamingAgentOutput(currentId, process_task_id); + } else if (content) { + // Append streaming content + updateStreamingAgentOutput(currentId, process_task_id, content); + } + return; + } + if (agentMessages.step === "to_sub_tasks") { // Clear streaming decompose text when task splitting is done clearStreamingDecomposeText(currentTaskId); @@ -2606,6 +2631,42 @@ const chatStore = (initial?: Partial) => createStore()( }, }; }); + }, + updateStreamingAgentOutput: (taskId, processTaskId, content) => { + set((state) => { + if (!state.tasks[taskId]) return state; + const currentOutput = state.tasks[taskId].streamingAgentOutput[processTaskId] || ''; + return { + ...state, + tasks: { + ...state.tasks, + [taskId]: { + ...state.tasks[taskId], + streamingAgentOutput: { + ...state.tasks[taskId].streamingAgentOutput, + [processTaskId]: currentOutput + content, + }, + }, + }, + }; + }); + }, + clearStreamingAgentOutput: (taskId, processTaskId) => { + set((state) => { + if (!state.tasks[taskId]) return state; + const newStreamingAgentOutput = { ...state.tasks[taskId].streamingAgentOutput }; + delete newStreamingAgentOutput[processTaskId]; + return { + ...state, + tasks: { + ...state.tasks, + [taskId]: { + ...state.tasks[taskId], + streamingAgentOutput: newStreamingAgentOutput, + }, + }, + }; + }); } }) );