This commit is contained in:
mkdev11 2026-01-22 03:21:31 +01:00
parent 35f4ca259d
commit b3b7ed6fe8
6 changed files with 113 additions and 7 deletions

View file

@ -1,8 +1,6 @@
VITE_BASE_URL=/api
VITE_PROXY_URL=https://dev.eigent.ai
VITE_USE_LOCAL_PROXY=false
VITE_BASE_URL=http://localhost:8000
VITE_PROXY_URL=http://localhost:8000
VITE_USE_LOCAL_PROXY=true
# VITE_PROXY_URL=http://localhost:3001
# VITE_USE_LOCAL_PROXY=true

View file

@ -891,6 +891,8 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
logger.warning(f"Cannot resume: workforce is None for project {options.project_id}")
elif item.action == Action.decompose_text:
yield sse_json("decompose_text", item.data)
elif item.action == Action.streaming_agent_output:
yield sse_json("streaming_agent_output", item.data)
elif item.action == Action.decompose_progress:
yield sse_json("to_sub_tasks", item.data)
elif item.action == Action.new_agent:

View file

@ -22,6 +22,7 @@ class Action(str, Enum):
new_task_state = "new_task_state" # backend -> user
decompose_progress = "decompose_progress" # backend -> user (streaming decomposition)
decompose_text = "decompose_text" # backend -> user (raw streaming text)
streaming_agent_output = "streaming_agent_output" # backend -> user (streaming agent output during task execution)
start = "start" # user -> backend
create_agent = "create_agent" # backend -> user
activate_agent = "activate_agent" # backend -> user
@ -78,6 +79,11 @@ class ActionDecomposeTextData(BaseModel):
data: dict
class ActionStreamingAgentOutputData(BaseModel):
action: Literal[Action.streaming_agent_output] = Action.streaming_agent_output
data: dict[Literal["agent_name", "process_task_id", "agent_id", "content", "is_final"], str | bool]
class ActionNewTaskStateData(BaseModel):
action: Literal[Action.new_task_state] = Action.new_task_state
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]
@ -249,6 +255,7 @@ ActionData = (
| ActionSkipTaskData
| ActionDecomposeTextData
| ActionDecomposeProgressData
| ActionStreamingAgentOutputData
)

View file

@ -125,6 +125,7 @@ from app.service.task import (
ActionCreateAgentData,
ActionDeactivateAgentData,
ActionDeactivateToolkitData,
ActionStreamingAgentOutputData,
Agents,
get_task_lock,
)
@ -269,7 +270,22 @@ class ListenChatAgent(ChatAgent):
last_response = chunk
# Accumulate content from each chunk (delta mode)
if chunk.msg and chunk.msg.content:
accumulated_content += chunk.msg.content
delta_content = chunk.msg.content
accumulated_content += delta_content
# Stream output chunk to frontend (non-blocking)
asyncio.create_task(
task_lock.put_queue(
ActionStreamingAgentOutputData(
data={
"agent_name": self.agent_name,
"process_task_id": self.process_task_id,
"agent_id": self.agent_id,
"content": delta_content,
"is_final": False,
},
)
)
)
yield chunk
finally:
total_tokens = 0
@ -281,6 +297,20 @@ class ListenChatAgent(ChatAgent):
)
if usage_info:
total_tokens = usage_info.get("total_tokens", 0)
# Send final streaming output marker
asyncio.create_task(
task_lock.put_queue(
ActionStreamingAgentOutputData(
data={
"agent_name": self.agent_name,
"process_task_id": self.process_task_id,
"agent_id": self.agent_id,
"content": "",
"is_final": True,
},
)
)
)
asyncio.create_task(
task_lock.put_queue(
ActionDeactivateAgentData(

View file

@ -713,7 +713,15 @@ export function Node({ id, data }: NodeProps) {
<div>{task.content}</div>
</div>
{task?.status === "running" && (
<div className="flex items-center gap-2 mt-xs animate-in fade-in-0 slide-in-from-bottom-2 duration-400">
<div className="flex flex-col gap-1 mt-xs animate-in fade-in-0 slide-in-from-bottom-2 duration-400">
{/* Streaming agent output */}
{chatStore.tasks[chatStore.activeTaskId as string]?.streamingAgentOutput?.[task.id] && (
<div className="flex-1 min-w-0 text-text-secondary text-xs leading-tight overflow-hidden animate-in fade-in-0 duration-200">
<div className="line-clamp-2 whitespace-pre-wrap break-words">
{chatStore.tasks[chatStore.activeTaskId as string].streamingAgentOutput[task.id]}
</div>
</div>
)}
{/* active toolkit */}
{task.toolkits &&
task.toolkits.length > 0 &&

View file

@ -44,6 +44,8 @@ interface Task {
isContextExceeded?: boolean;
// Streaming decompose text - stored separately to avoid frequent re-renders
streamingDecomposeText: string;
// Streaming agent output - real-time agent output during task execution
streamingAgentOutput: { [processTaskId: string]: string };
}
export interface ChatStore {
@ -107,6 +109,8 @@ export interface ChatStore {
setNextTaskId: (taskId: string | null) => void;
setStreamingDecomposeText: (taskId: string, text: string) => void;
clearStreamingDecomposeText: (taskId: string) => void;
updateStreamingAgentOutput: (taskId: string, processTaskId: string, content: string) => void;
clearStreamingAgentOutput: (taskId: string, processTaskId: string) => void;
}
export type VanillaChatStore = {
@ -207,6 +211,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
isTakeControl: false,
isTaskEdit: false,
streamingDecomposeText: '',
streamingAgentOutput: {},
},
}
}))
@ -794,6 +799,8 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
setIsContextExceeded,
setStreamingDecomposeText,
clearStreamingDecomposeText,
updateStreamingAgentOutput,
clearStreamingAgentOutput,
setIsTaskEdit } = getCurrentChatStore()
currentTaskId = getCurrentTaskId();
@ -840,6 +847,24 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
return;
}
// Handle streaming agent output during task execution
if (agentMessages.step === "streaming_agent_output") {
const data = agentMessages.data as { process_task_id?: string; content?: string; is_final?: boolean };
const { process_task_id, content, is_final } = data;
const currentId = getCurrentTaskId();
if (!process_task_id) return;
if (is_final) {
// Clear streaming output when final marker received
clearStreamingAgentOutput(currentId, process_task_id);
} else if (content) {
// Append streaming content
updateStreamingAgentOutput(currentId, process_task_id, content);
}
return;
}
if (agentMessages.step === "to_sub_tasks") {
// Clear streaming decompose text when task splitting is done
clearStreamingDecomposeText(currentTaskId);
@ -2606,6 +2631,42 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
},
};
});
},
updateStreamingAgentOutput: (taskId, processTaskId, content) => {
set((state) => {
if (!state.tasks[taskId]) return state;
const currentOutput = state.tasks[taskId].streamingAgentOutput[processTaskId] || '';
return {
...state,
tasks: {
...state.tasks,
[taskId]: {
...state.tasks[taskId],
streamingAgentOutput: {
...state.tasks[taskId].streamingAgentOutput,
[processTaskId]: currentOutput + content,
},
},
},
};
});
},
clearStreamingAgentOutput: (taskId, processTaskId) => {
set((state) => {
if (!state.tasks[taskId]) return state;
const newStreamingAgentOutput = { ...state.tasks[taskId].streamingAgentOutput };
delete newStreamingAgentOutput[processTaskId];
return {
...state,
tasks: {
...state.tasks,
[taskId]: {
...state.tasks[taskId],
streamingAgentOutput: newStreamingAgentOutput,
},
},
};
});
}
})
);