feat: Stream mode task splitting (reopen #767) (#793)

This commit is contained in:
Tao Sun 2025-12-29 23:15:50 +08:00 committed by GitHub
commit a845fe1436
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 752 additions and 334 deletions

View file

@ -3,7 +3,7 @@ import datetime
import json
from pathlib import Path
import platform
from typing import Literal
from typing import Any, Literal
from fastapi import Request
from inflection import titleize
from pydash import chain
@ -17,6 +17,8 @@ from app.service.task import (
TaskLock,
delete_task_lock,
set_current_task_id,
ActionDecomposeProgressData,
ActionDecomposeTextData,
)
from camel.toolkits import AgentCommunicationToolkit, ToolkitMessageIntegration
from app.utils.toolkit.human_toolkit import HumanToolkit
@ -267,6 +269,8 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
last_completed_task_result = "" # Track the last completed task result
summary_task_content = "" # Track task summary
loop_iteration = 0
event_loop = asyncio.get_running_loop()
sub_tasks: list[Task] = []
logger.info("=" * 80)
logger.info("🚀 [LIFECYCLE] step_solve STARTED", extra={"project_id": options.project_id, "task_id": options.task_id})
@ -429,55 +433,118 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
if len(options.attaches) > 0:
camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
# Stream decomposition in background so queue items (decompose_text) are processed immediately
logger.info(f"[NEW-QUESTION] 🧩 Starting task decomposition via workforce.eigent_make_sub_tasks")
sub_tasks = await asyncio.to_thread(
workforce.eigent_make_sub_tasks,
camel_task,
context_for_coordinator
)
logger.info(f"[NEW-QUESTION] ✅ Task decomposed into {len(sub_tasks)} subtasks")
stream_state = {"subtasks": [], "seen_ids": set()}
state_holder: dict[str, Any] = {"sub_tasks": [], "summary_task": ""}
logger.info(f"[NEW-QUESTION] Generating task summary")
summary_task_agent = task_summary_agent(options)
try:
summary_task_content = await asyncio.wait_for(
summary_task(summary_task_agent, camel_task), timeout=10
)
task_lock.summary_generated = True
logger.info("[NEW-QUESTION] ✅ Summary generated successfully", extra={"project_id": options.project_id})
except asyncio.TimeoutError:
logger.warning("summary_task timeout", extra={"project_id": options.project_id, "task_id": options.task_id})
# Fallback to a minimal summary to unblock UI
fallback_name = "Task"
content_preview = camel_task.content if hasattr(camel_task, "content") else ""
if content_preview is None:
content_preview = ""
fallback_summary = (
(content_preview[:80] + "...") if len(content_preview) > 80 else content_preview
)
summary_task_content = f"{fallback_name}|{fallback_summary}"
task_lock.summary_generated = True
def on_stream_batch(new_tasks: list[Task], is_final: bool = False):
fresh_tasks = [t for t in new_tasks if t.id not in stream_state["seen_ids"]]
for t in fresh_tasks:
stream_state["seen_ids"].add(t.id)
stream_state["subtasks"].extend(fresh_tasks)
logger.info(f"[NEW-QUESTION] 📤 Sending to_sub_tasks SSE to frontend (task card)")
logger.info(f"[NEW-QUESTION] to_sub_tasks data: task_id={camel_task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(camel_task.subtasks)}")
yield to_sub_tasks(camel_task, summary_task_content)
logger.info(f"[NEW-QUESTION] ✅ to_sub_tasks SSE sent")
# tracer.stop()
# tracer.save("trace.json")
def on_stream_text(chunk):
try:
# Extract content from chunk object (CAMEL now passes chunk instead of accumulated content)
text_content = chunk.msg.content if hasattr(chunk, 'msg') and chunk.msg else str(chunk)
asyncio.run_coroutine_threadsafe(
task_lock.put_queue(
ActionDecomposeTextData(
data={
"project_id": options.project_id,
"task_id": options.task_id,
"content": text_content,
}
)
),
event_loop,
)
except Exception as e:
logger.warning(f"Failed to stream decomposition text: {e}")
# Only auto-start in debug mode
if env("debug") == "on":
logger.info(f"[DEBUG] Auto-starting workforce in debug mode")
task_lock.status = Status.processing
task = asyncio.create_task(workforce.eigent_start(sub_tasks))
task_lock.add_background_task(task)
async def run_decomposition():
nonlocal camel_task, summary_task_content
try:
sub_tasks = await asyncio.to_thread(
workforce.eigent_make_sub_tasks,
camel_task,
context_for_coordinator,
on_stream_batch,
on_stream_text,
)
if stream_state["subtasks"]:
sub_tasks = stream_state["subtasks"]
state_holder["sub_tasks"] = sub_tasks
logger.info(f"[NEW-QUESTION] ✅ Task decomposed into {len(sub_tasks)} subtasks")
try:
setattr(task_lock, "decompose_sub_tasks", sub_tasks)
except Exception:
pass
logger.info(f"[NEW-QUESTION] Generating task summary")
summary_task_agent = task_summary_agent(options)
try:
summary_task_content = await asyncio.wait_for(
summary_task(summary_task_agent, camel_task), timeout=10
)
task_lock.summary_generated = True
logger.info("[NEW-QUESTION] ✅ Summary generated successfully", extra={"project_id": options.project_id})
except asyncio.TimeoutError:
logger.warning("summary_task timeout", extra={"project_id": options.project_id, "task_id": options.task_id})
task_lock.summary_generated = True
fallback_name = "Task"
content_preview = camel_task.content if hasattr(camel_task, "content") else ""
if content_preview is None:
content_preview = ""
summary_task_content = (
(content_preview[:80] + "...") if len(content_preview) > 80 else content_preview
)
summary_task_content = f"{fallback_name}|{summary_task_content}"
except Exception:
task_lock.summary_generated = True
fallback_name = "Task"
content_preview = camel_task.content if hasattr(camel_task, "content") else ""
if content_preview is None:
content_preview = ""
summary_task_content = (
(content_preview[:80] + "...") if len(content_preview) > 80 else content_preview
)
summary_task_content = f"{fallback_name}|{summary_task_content}"
state_holder["summary_task"] = summary_task_content
try:
setattr(task_lock, "summary_task_content", summary_task_content)
except Exception:
pass
logger.info(f"[NEW-QUESTION] 📤 Sending to_sub_tasks SSE to frontend (task card)")
logger.info(f"[NEW-QUESTION] to_sub_tasks data: task_id={camel_task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(camel_task.subtasks)}")
payload = {
"project_id": options.project_id,
"task_id": options.task_id,
"sub_tasks": tree_sub_tasks(camel_task.subtasks),
"delta_sub_tasks": tree_sub_tasks(sub_tasks),
"is_final": True,
"summary_task": summary_task_content,
}
await task_lock.put_queue(ActionDecomposeProgressData(data=payload))
logger.info(f"[NEW-QUESTION] ✅ to_sub_tasks SSE sent")
except Exception as e:
logger.error(f"Error in background decomposition: {e}", exc_info=True)
bg_task = asyncio.create_task(run_decomposition())
task_lock.add_background_task(bg_task)
elif item.action == Action.update_task:
assert camel_task is not None
update_tasks = {item.id: item for item in item.data.task}
# Use stored decomposition results if available
if not sub_tasks:
sub_tasks = getattr(task_lock, "decompose_sub_tasks", [])
sub_tasks = update_sub_tasks(sub_tasks, update_tasks)
add_sub_tasks(camel_task, item.data.task)
yield to_sub_tasks(camel_task, summary_task_content)
summary_task_content_local = getattr(task_lock, "summary_task_content", summary_task_content)
yield to_sub_tasks(camel_task, summary_task_content_local)
elif item.action == Action.add_task:
# Check if this might be a misrouted second question
@ -596,6 +663,8 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
continue
task_lock.status = Status.processing
if not sub_tasks:
sub_tasks = getattr(task_lock, "decompose_sub_tasks", [])
task = asyncio.create_task(workforce.eigent_start(sub_tasks))
task_lock.add_background_task(task)
elif item.action == Action.task_state:
@ -700,11 +769,41 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
context_for_multi_turn = build_context_for_workforce(task_lock, options)
logger.info(f"[LIFECYCLE] Multi-turn: calling workforce.handle_decompose_append_task for new task decomposition")
stream_state = {"subtasks": [], "seen_ids": set()}
def on_stream_batch(new_tasks: list[Task], is_final: bool = False):
fresh_tasks = [t for t in new_tasks if t.id not in stream_state["seen_ids"]]
for t in fresh_tasks:
stream_state["seen_ids"].add(t.id)
stream_state["subtasks"].extend(fresh_tasks)
def on_stream_text(chunk):
try:
# Extract content from chunk object (CAMEL now passes chunk instead of accumulated content)
text_content = chunk.msg.content if hasattr(chunk, 'msg') and chunk.msg else str(chunk)
asyncio.run_coroutine_threadsafe(
task_lock.put_queue(
ActionDecomposeTextData(
data={
"project_id": options.project_id,
"task_id": options.task_id,
"content": text_content,
}
)
),
event_loop,
)
except Exception as e:
logger.warning(f"Failed to stream decomposition text: {e}")
new_sub_tasks = await workforce.handle_decompose_append_task(
camel_task,
reset=False,
coordinator_context=context_for_multi_turn
coordinator_context=context_for_multi_turn,
on_stream_batch=on_stream_batch,
on_stream_text=on_stream_text,
)
if stream_state["subtasks"]:
new_sub_tasks = stream_state["subtasks"]
logger.info(f"[LIFECYCLE] Multi-turn: task decomposed into {len(new_sub_tasks)} subtasks")
# Generate proper LLM summary for multi-turn tasks instead of hardcoded fallback
@ -731,8 +830,16 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
else:
new_summary_content = f"Follow-up Task|{task_content_for_summary}"
# Send the extracted events
yield to_sub_tasks(camel_task, new_summary_content)
# Emit final subtasks once when decomposition is complete
final_payload = {
"project_id": options.project_id,
"task_id": options.task_id,
"sub_tasks": tree_sub_tasks(camel_task.subtasks),
"delta_sub_tasks": tree_sub_tasks(new_sub_tasks),
"is_final": True,
"summary_task": new_summary_content,
}
await task_lock.put_queue(ActionDecomposeProgressData(data=final_payload))
# Update the context with new task data
sub_tasks = new_sub_tasks
@ -795,6 +902,10 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
logger.info(f"Workforce resumed for project {options.project_id}")
else:
logger.warning(f"Cannot resume: workforce is None for project {options.project_id}")
elif item.action == Action.decompose_text:
yield sse_json("decompose_text", item.data)
elif item.action == Action.decompose_progress:
yield sse_json("to_sub_tasks", item.data)
elif item.action == Action.new_agent:
if workforce is not None:
workforce.pause()

View file

@ -20,6 +20,8 @@ class Action(str, Enum):
update_task = "update_task" # user -> backend
task_state = "task_state" # backend -> user
new_task_state = "new_task_state" # backend -> user
decompose_progress = "decompose_progress" # backend -> user (streaming decomposition)
decompose_text = "decompose_text" # backend -> user (raw streaming text)
start = "start" # user -> backend
create_agent = "create_agent" # backend -> user
activate_agent = "activate_agent" # backend -> user
@ -64,6 +66,17 @@ class ActionTaskStateData(BaseModel):
action: Literal[Action.task_state] = Action.task_state
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]
class ActionDecomposeProgressData(BaseModel):
action: Literal[Action.decompose_progress] = Action.decompose_progress
data: dict
class ActionDecomposeTextData(BaseModel):
action: Literal[Action.decompose_text] = Action.decompose_text
data: dict
class ActionNewTaskStateData(BaseModel):
action: Literal[Action.new_task_state] = Action.new_task_state
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]
@ -227,6 +240,8 @@ ActionData = (
| ActionAddTaskData
| ActionRemoveTaskData
| ActionSkipTaskData
| ActionDecomposeTextData
| ActionDecomposeProgressData
)

View file

@ -108,6 +108,7 @@ class ListenChatAgent(ChatAgent):
prune_tool_calls_from_memory: bool = False,
enable_snapshot_clean: bool = False,
step_timeout: float | None = 900,
**kwargs: Any,
) -> None:
super().__init__(
system_message=system_message,
@ -130,6 +131,7 @@ class ListenChatAgent(ChatAgent):
prune_tool_calls_from_memory=prune_tool_calls_from_memory,
enable_snapshot_clean=enable_snapshot_clean,
step_timeout=step_timeout,
**kwargs,
)
self.api_task_id = api_task_id
self.agent_name = agent_name
@ -182,9 +184,49 @@ class ListenChatAgent(ChatAgent):
total_tokens = 0
if res is not None:
if isinstance(res, StreamingChatAgentResponse):
def _stream_with_deactivate():
last_response: ChatAgentResponse | None = None
try:
for chunk in res:
last_response = chunk
yield chunk
finally:
final_message = ""
total_tokens = 0
if last_response:
final_message = (
last_response.msg.content if last_response.msg else ""
)
usage_info = (
last_response.info.get("usage")
or last_response.info.get("token_usage")
or {}
)
if usage_info:
total_tokens = usage_info.get("total_tokens", 0)
asyncio.create_task(
task_lock.put_queue(
ActionDeactivateAgentData(
data={
"agent_name": self.agent_name,
"process_task_id": self.process_task_id,
"agent_id": self.agent_id,
"message": final_message,
"tokens": total_tokens,
},
)
)
)
return StreamingChatAgentResponse(_stream_with_deactivate())
message = res.msg.content if res.msg else ""
total_tokens = res.info["usage"]["total_tokens"]
traceroot_logger.info(f"Agent {self.agent_name} completed step, tokens used: {total_tokens}")
usage_info = res.info.get("usage") or res.info.get("token_usage") or {}
total_tokens = usage_info.get("total_tokens", 0) if usage_info else 0
traceroot_logger.info(
f"Agent {self.agent_name} completed step, tokens used: {total_tokens}"
)
assert message is not None
@ -533,6 +575,21 @@ def agent_model(
)
)
# Build model config, defaulting to streaming for planner
extra_params = options.extra_params or {}
model_config: dict[str, Any] = {}
if options.is_cloud():
model_config["user"] = str(options.project_id)
model_config.update(
{
k: v
for k, v in extra_params.items()
if k not in ["model_platform", "model_type", "api_key", "url"]
}
)
if agent_name == Agents.task_agent:
model_config["stream"] = True
return ListenChatAgent(
options.project_id,
agent_name,
@ -542,16 +599,7 @@ def agent_model(
model_type=options.model_type,
api_key=options.api_key,
url=options.api_url,
model_config_dict={
"user": str(options.project_id),
}
if options.is_cloud()
else None,
**{
k: v
for k, v in (options.extra_params or {}).items()
if k not in ["model_platform", "model_type", "api_key", "url"]
},
model_config_dict=model_config or None,
),
# output_language=options.language,
tools=tools,
@ -559,6 +607,7 @@ def agent_model(
prune_tool_calls_from_memory=prune_tool_calls_from_memory,
toolkits_to_register_agent=toolkits_to_register_agent,
enable_snapshot_clean=enable_snapshot_clean,
stream_accumulate=False,
)

View file

@ -12,6 +12,7 @@ from camel.societies.workforce.base import BaseNode
from camel.societies.workforce.utils import TaskAssignResult
from camel.societies.workforce.workforce_metrics import WorkforceMetrics
from camel.societies.workforce.events import WorkerCreatedEvent
from camel.societies.workforce.prompts import TASK_DECOMPOSE_PROMPT
from camel.tasks.task import Task, TaskState, validate_task_content
from app.component import code
from app.exception.exception import UserException
@ -65,7 +66,13 @@ class Workforce(BaseWorkforce):
)
logger.info(f"[WF-LIFECYCLE] ✅ Workforce.__init__ COMPLETED, id={id(self)}")
def eigent_make_sub_tasks(self, task: Task, coordinator_context: str = ""):
def eigent_make_sub_tasks(
self,
task: Task,
coordinator_context: str = "",
on_stream_batch=None,
on_stream_text=None,
):
"""
Split process_task method to eigent_make_sub_tasks and eigent_start method.
@ -73,6 +80,8 @@ class Workforce(BaseWorkforce):
task: The main task to decompose
coordinator_context: Optional context ONLY for coordinator agent during decomposition.
This context will NOT be passed to subtasks or worker agents.
on_stream_batch: Optional callback for streaming batches signature (List[Task], bool)
on_stream_text: Optional callback for raw streaming text chunks
"""
logger.info("=" * 80)
logger.info("🧩 [DECOMPOSE] eigent_make_sub_tasks CALLED", extra={
@ -103,7 +112,15 @@ class Workforce(BaseWorkforce):
logger.info(f"[DECOMPOSE] Workforce reset complete, state: {self._state.name}")
logger.info(f"[DECOMPOSE] Calling handle_decompose_append_task")
subtasks = asyncio.run(self.handle_decompose_append_task(task, reset=False, coordinator_context=coordinator_context))
subtasks = asyncio.run(
self.handle_decompose_append_task(
task,
reset=False,
coordinator_context=coordinator_context,
on_stream_batch=on_stream_batch,
on_stream_text=on_stream_text
)
)
logger.info("=" * 80)
logger.info(f"✅ [DECOMPOSE] Task decomposition COMPLETED", extra={
"api_task_id": self.api_task_id,
@ -142,8 +159,45 @@ class Workforce(BaseWorkforce):
self._state = WorkforceState.IDLE
logger.info(f"[WF-LIFECYCLE] Workforce state set to IDLE")
def _decompose_task(self, task: Task, stream_callback=None):
"""Decompose task with optional streaming text callback."""
decompose_prompt = str(
TASK_DECOMPOSE_PROMPT.format(
content=task.content,
child_nodes_info=self._get_child_nodes_info(),
additional_info=task.additional_info,
)
)
self.task_agent.reset()
result = task.decompose(
self.task_agent, decompose_prompt, stream_callback=stream_callback
)
if isinstance(result, Generator):
def streaming_with_dependencies():
all_subtasks = []
for new_tasks in result:
all_subtasks.extend(new_tasks)
if new_tasks:
self._update_dependencies_for_decomposition(
task, all_subtasks
)
yield new_tasks
return streaming_with_dependencies()
else:
subtasks = result
if subtasks:
self._update_dependencies_for_decomposition(task, subtasks)
return subtasks
async def handle_decompose_append_task(
self, task: Task, reset: bool = True, coordinator_context: str = ""
self,
task: Task,
reset: bool = True,
coordinator_context: str = "",
on_stream_batch=None,
on_stream_text=None,
) -> List[Task]:
"""
Override to support coordinator_context parameter.
@ -153,6 +207,8 @@ class Workforce(BaseWorkforce):
task: The task to be processed
reset: Should trigger workforce reset (Workforce must not be running)
coordinator_context: Optional context ONLY for coordinator during decomposition
on_stream_batch: Optional callback for streaming batches signature (List[Task], bool)
on_stream_text: Optional callback for raw streaming text chunks
Returns:
List[Task]: The decomposed subtasks or the original task
@ -186,18 +242,23 @@ class Workforce(BaseWorkforce):
task.content = task_with_context
logger.info(f"[DECOMPOSE] Calling _decompose_task with context")
subtasks_result = self._decompose_task(task)
subtasks_result = self._decompose_task(task, stream_callback=on_stream_text)
task.content = original_content
else:
logger.info(f"[DECOMPOSE] Calling _decompose_task without context")
subtasks_result = self._decompose_task(task)
subtasks_result = self._decompose_task(task, stream_callback=on_stream_text)
logger.info(f"[DECOMPOSE] _decompose_task returned, processing results")
if isinstance(subtasks_result, Generator):
subtasks = []
for new_tasks in subtasks_result:
subtasks.extend(new_tasks)
if on_stream_batch:
try:
on_stream_batch(new_tasks, False)
except Exception as e:
logger.warning(f"Streaming callback failed: {e}")
logger.info(f"[DECOMPOSE] Collected {len(subtasks)} subtasks from generator")
else:
subtasks = subtasks_result
@ -218,6 +279,12 @@ class Workforce(BaseWorkforce):
subtasks = [fallback_task]
logger.info(f"[DECOMPOSE] Created fallback task: {fallback_task.id}")
if on_stream_batch:
try:
on_stream_batch(subtasks, True)
except Exception as e:
logger.warning(f"Final streaming callback failed: {e}")
return subtasks
async def _find_assignee(self, tasks: List[Task]) -> TaskAssignResult:

View file

@ -23,7 +23,39 @@ export const ProjectSection = React.forwardRef<HTMLDivElement, ProjectSectionPro
onSkip,
isPauseResumeLoading
}, ref) => {
const chatState = chatStore.getState();
// Subscribe to store changes with throttling to prevent excessive re-renders
const [chatState, setChatState] = React.useState(() => chatStore.getState());
React.useEffect(() => {
let timeoutId: NodeJS.Timeout | null = null;
let latestState: any = null;
const unsubscribe = chatStore.subscribe((state) => {
latestState = state;
// Throttle updates to max once per 100ms
if (!timeoutId) {
timeoutId = setTimeout(() => {
if (latestState) {
setChatState(latestState);
}
timeoutId = null;
}, 100);
}
});
return () => {
unsubscribe();
if (timeoutId) {
clearTimeout(timeoutId);
// Apply final state on cleanup
if (latestState) {
setChatState(latestState);
}
}
};
}, [chatStore]);
const activeTaskId = chatState.activeTaskId;
if (!activeTaskId || !chatState.tasks[activeTaskId]) {
@ -33,8 +65,17 @@ export const ProjectSection = React.forwardRef<HTMLDivElement, ProjectSectionPro
const task = chatState.tasks[activeTaskId];
const messages = task.messages || [];
// Group messages by query cycles and show in chronological order (oldest first)
const queryGroups = groupMessagesByQuery(messages);
// Create a stable key based on messages content to prevent excessive re-renders
const lastMessage = messages[messages.length - 1];
const messagesKey = React.useMemo(() => {
// Only re-compute when message count or last message changes
return `${messages.length}-${lastMessage?.id || ''}-${lastMessage?.content?.length || 0}`;
}, [messages.length, lastMessage?.id, lastMessage?.content?.length]);
// Memoize grouping to prevent re-creating objects on every render
const queryGroups = React.useMemo(() => {
return groupMessagesByQuery(messages);
}, [messagesKey]);
return (
<motion.div
@ -152,7 +193,7 @@ function groupMessagesByQuery(messages: any[]) {
otherMessages: []
};
}
} else {
} else {
// Other messages (assistant responses, errors, etc.)
if (currentGroup) {
currentGroup.otherMessages.push(message);

View file

@ -1,4 +1,4 @@
import React, { useRef, useEffect, useState } from 'react';
import React, { useRef, useEffect, useState, useSyncExternalStore } from 'react';
import { motion, useMotionValue, useTransform } from 'framer-motion';
import { UserMessageCard } from './MessageItem/UserMessageCard';
import { AgentMessageCard } from './MessageItem/AgentMessageCard';
@ -38,24 +38,35 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
const chatState = chatStore.getState();
const activeTaskId = chatState.activeTaskId;
// Subscribe to streaming decompose text separately for efficient updates
const streamingDecomposeText = useSyncExternalStore(
(callback) => chatStore.subscribe(callback),
() => {
const state = chatStore.getState();
const taskId = state.activeTaskId;
if (!taskId || !state.tasks[taskId]) return '';
return state.tasks[taskId].streamingDecomposeText || '';
}
);
// Show task if this query group has a task message OR if it's the most recent user query during splitting
// During splitting phase (no to_sub_tasks yet), show task for the most recent query only
// Exclude human-reply scenarios (when user is replying to an activeAsk)
const isHumanReply = queryGroup.userMessage &&
const isHumanReply = queryGroup.userMessage &&
activeTaskId &&
chatState.tasks[activeTaskId] &&
(chatState.tasks[activeTaskId].activeAsk ||
// Check if this user message follows an 'ask' message in the message sequence
(() => {
const messages = chatState.tasks[activeTaskId].messages;
const userMessageIndex = messages.findIndex((m: any) => m.id === queryGroup.userMessage.id);
if (userMessageIndex > 0) {
// Check the previous message - if it's an agent message with step 'ask', this is a human-reply
const prevMessage = messages[userMessageIndex - 1];
return prevMessage?.role === 'agent' && prevMessage?.step === 'ask';
}
return false;
})());
(chatState.tasks[activeTaskId].activeAsk ||
// Check if this user message follows an 'ask' message in the message sequence
(() => {
const messages = chatState.tasks[activeTaskId].messages;
const userMessageIndex = messages.findIndex((m: any) => m.id === queryGroup.userMessage.id);
if (userMessageIndex > 0) {
// Check the previous message - if it's an agent message with step 'ask', this is a human-reply
const prevMessage = messages[userMessageIndex - 1];
return prevMessage?.role === 'agent' && prevMessage?.step === 'ask';
}
return false;
})());
const isLastUserQuery = !queryGroup.taskMessage &&
!isHumanReply &&
@ -68,8 +79,10 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
// Only show the fallback task box for the newest query while the agent is still splitting work.
// Simple Q&A sessions set hasWaitComfirm to true, so we should not render an empty task box there.
// Also, do not show fallback task if we are currently decomposing (streaming text).
const isDecomposing = streamingDecomposeText.length > 0;
const shouldShowFallbackTask =
isLastUserQuery && activeTaskId && !chatState.tasks[activeTaskId].hasWaitComfirm;
isLastUserQuery && activeTaskId && !chatState.tasks[activeTaskId].hasWaitComfirm && !isDecomposing;
const task =
(queryGroup.taskMessage || shouldShowFallbackTask) && activeTaskId
@ -114,7 +127,7 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
sentinel.style.height = '1px';
sentinel.style.pointerEvents = 'none';
sentinel.style.zIndex = '-1';
// Insert sentinel before the sticky element
taskBoxRef.current.parentNode?.insertBefore(sentinel, taskBoxRef.current);
@ -144,9 +157,9 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
const anyToSubTasksMessage = task?.messages.find((m: any) => m.step === "to_sub_tasks");
const isSkeletonPhase = task && (
(task.status !== 'finished' &&
!anyToSubTasksMessage &&
!task.hasWaitComfirm &&
task.messages.length > 0) ||
!anyToSubTasksMessage &&
!task.hasWaitComfirm &&
task.messages.length > 0) ||
(task.isTakeControl && !anyToSubTasksMessage)
);
@ -156,7 +169,7 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
data-query-id={queryGroup.queryId}
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{
transition={{
duration: 0.3,
delay: index * 0.1 // Stagger animation for multiple groups
}}
@ -191,16 +204,16 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
>
<motion.div
initial={{ opacity: 0, y: 20 }}
animate={{
opacity: 1,
animate={{
opacity: 1,
y: 0
}}
transition={{
transition={{
duration: 0.3,
delay: 0.1 // Slight delay for sequencing
}}
>
<div
<div
style={{
transition: 'all 0.3s ease-in-out',
transformOrigin: 'top'
@ -236,105 +249,28 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
{/* Other Messages */}
{queryGroup.otherMessages.map((message) => {
if (message.content.length > 0) {
if (message.step === "end") {
return (
<motion.div
key={`end-${message.id}`}
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{ delay: 0.2 }}
className="flex flex-col pl-3 gap-4"
>
<AgentMessageCard
typewriter={
task?.type !== "replay" ||
(task?.type === "replay" && task?.delayTime !== 0)
}
id={message.id}
content={message.content}
onTyping={() => {}}
/>
{/* File List */}
{message.fileList && (
<div className="flex pl-3 gap-2 flex-wrap">
{message.fileList.map((file: any, fileIndex: number) => (
<motion.div
key={`file-${message.id}-${file.name}-${fileIndex}`}
initial={{ opacity: 0, scale: 0.9 }}
animate={{ opacity: 1, scale: 1 }}
transition={{ delay: 0.3 }}
onClick={() => {
chatState.setSelectedFile(activeTaskId as string, file);
chatState.setActiveWorkSpace(activeTaskId as string, "documentWorkSpace");
}}
className="flex items-center gap-2 bg-message-fill-default rounded-sm px-2 py-1 w-[140px] cursor-pointer hover:bg-message-fill-hover transition-colors"
>
<div className="flex flex-col">
<div className="max-w-[100px] font-bold text-sm text-body text-text-body overflow-hidden text-ellipsis whitespace-nowrap">
{file.name.split(".")[0]}
</div>
<div className="font-medium leading-29 text-xs text-text-body">
{file.type}
</div>
</div>
</motion.div>
))}
</div>
)}
</motion.div>
);
} else if (message.content === "skip") {
return (
<motion.div
key={`skip-${message.id}`}
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{ delay: 0.2 }}
className="flex flex-col pl-3 gap-4"
>
if (message.content.length > 0) {
if (message.step === "end") {
return (
<motion.div
key={`end-${message.id}`}
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{ delay: 0.2 }}
className="flex flex-col pl-3 gap-4"
>
<AgentMessageCard
key={message.id}
id={message.id}
content="No reply received, task continues..."
onTyping={() => {}}
/>
</motion.div>
);
} else {
return (
<motion.div
key={`message-${message.id}`}
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{ delay: 0.2 }}
className="flex flex-col pl-3 gap-4"
>
<AgentMessageCard
key={message.id}
typewriter={
task?.type !== "replay" ||
(task?.type === "replay" && task?.delayTime !== 0)
}
id={message.id}
content={message.content}
onTyping={() => {}}
attaches={message.attaches}
onTyping={() => { }}
/>
</motion.div>
);
}
} else if (message.step === "end" && message.content === "") {
return (
<motion.div
key={`end-empty-${message.id}`}
initial={{ opacity: 0 }}
animate={{ opacity: 1 }}
transition={{ delay: 0.2 }}
className="flex flex-col pl-3 gap-4"
>
{/* File List */}
{message.fileList && (
<div className="flex gap-2 flex-wrap">
<div className="flex pl-3 gap-2 flex-wrap">
{message.fileList.map((file: any, fileIndex: number) => (
<motion.div
key={`file-${message.id}-${file.name}-${fileIndex}`}
@ -345,11 +281,10 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
chatState.setSelectedFile(activeTaskId as string, file);
chatState.setActiveWorkSpace(activeTaskId as string, "documentWorkSpace");
}}
className="flex items-center gap-2 bg-message-fill-default rounded-2xl px-2 py-1 w-[120px] cursor-pointer hover:bg-message-fill-hover transition-colors"
className="flex items-center gap-2 bg-message-fill-default rounded-sm px-2 py-1 w-[140px] cursor-pointer hover:bg-message-fill-hover transition-colors"
>
<FileText size={16} className="text-icon-primary flex-shrink-0" />
<div className="flex flex-col">
<div className="max-w-48 font-bold text-sm text-body text-text-body overflow-hidden text-ellipsis whitespace-nowrap">
<div className="max-w-[100px] font-bold text-sm text-body text-text-body overflow-hidden text-ellipsis whitespace-nowrap">
{file.name.split(".")[0]}
</div>
<div className="font-medium leading-29 text-xs text-text-body">
@ -362,30 +297,122 @@ export const UserQueryGroup: React.FC<UserQueryGroupProps> = ({
)}
</motion.div>
);
} else if (message.content === "skip") {
return (
<motion.div
key={`skip-${message.id}`}
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{ delay: 0.2 }}
className="flex flex-col pl-3 gap-4"
>
<AgentMessageCard
key={message.id}
id={message.id}
content="No reply received, task continues..."
onTyping={() => { }}
/>
</motion.div>
);
} else {
return (
<motion.div
key={`message-${message.id}`}
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{ delay: 0.2 }}
className="flex flex-col pl-3 gap-4"
>
<AgentMessageCard
key={message.id}
typewriter={
task?.type !== "replay" ||
(task?.type === "replay" && task?.delayTime !== 0)
}
id={message.id}
content={message.content}
onTyping={() => { }}
attaches={message.attaches}
/>
</motion.div>
);
}
} else if (message.step === "end" && message.content === "") {
return (
<motion.div
key={`end-empty-${message.id}`}
initial={{ opacity: 0 }}
animate={{ opacity: 1 }}
transition={{ delay: 0.2 }}
className="flex flex-col pl-3 gap-4"
>
{message.fileList && (
<div className="flex gap-2 flex-wrap">
{message.fileList.map((file: any, fileIndex: number) => (
<motion.div
key={`file-${message.id}-${file.name}-${fileIndex}`}
initial={{ opacity: 0, scale: 0.9 }}
animate={{ opacity: 1, scale: 1 }}
transition={{ delay: 0.3 }}
onClick={() => {
chatState.setSelectedFile(activeTaskId as string, file);
chatState.setActiveWorkSpace(activeTaskId as string, "documentWorkSpace");
}}
className="flex items-center gap-2 bg-message-fill-default rounded-2xl px-2 py-1 w-[120px] cursor-pointer hover:bg-message-fill-hover transition-colors"
>
<FileText size={16} className="text-icon-primary flex-shrink-0" />
<div className="flex flex-col">
<div className="max-w-48 font-bold text-sm text-body text-text-body overflow-hidden text-ellipsis whitespace-nowrap">
{file.name.split(".")[0]}
</div>
<div className="font-medium leading-29 text-xs text-text-body">
{file.type}
</div>
</div>
</motion.div>
))}
</div>
)}
</motion.div>
);
}
// Notice Card
if (
message.step === "notice_card" &&
!task?.isTakeControl &&
task?.cotList && task.cotList.length > 0
) {
return <NoticeCard key={`notice-${message.id}`} />;
}
// Notice Card
if (
message.step === "notice_card" &&
!task?.isTakeControl &&
task?.cotList && task.cotList.length > 0
) {
return <NoticeCard key={`notice-${message.id}`} />;
}
return null;
})}
return null;
})}
{/* Skeleton for loading state */}
{isSkeletonPhase && (
<motion.div
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{ delay: 0.4 }}
>
<TypeCardSkeleton isTakeControl={task?.isTakeControl || false} />
</motion.div>
)}
{/* Streaming Decompose Text - rendered separately to avoid flickering */}
{isLastUserQuery && streamingDecomposeText && (
<div className="flex flex-col pl-3 gap-4">
<div className="flex items-center gap-2 text-text-label text-sm font-medium">
<span className="animate-pulse">Splitting Tasks</span>
</div>
<div className="bg-message-fill-default/50 rounded-lg p-3 overflow-hidden">
<pre className="whitespace-pre-wrap break-all font-mono text-xs text-text-secondary leading-relaxed">
{streamingDecomposeText}
</pre>
</div>
</div>
)}
{/* Skeleton for loading state */}
{isSkeletonPhase && (
<motion.div
initial={{ opacity: 0, y: 10 }}
animate={{ opacity: 1, y: 0 }}
transition={{ delay: 0.4 }}
>
<TypeCardSkeleton isTakeControl={task?.isTakeControl || false} />
</motion.div>
)}
</motion.div>
);
};

View file

@ -42,6 +42,8 @@ interface Task {
isTakeControl: boolean;
isTaskEdit: boolean;
isContextExceeded?: boolean;
// Streaming decompose text - stored separately to avoid frequent re-renders
streamingDecomposeText: string;
}
export interface ChatStore {
@ -56,9 +58,10 @@ export interface ChatStore {
setActiveTaskId: (taskId: string) => void;
replay: (taskId: string, question: string, time: number) => Promise<void>;
startTask: (taskId: string, type?: string, shareToken?: string, delayTime?: number, messageContent?: string, messageAttaches?: File[]) => Promise<void>;
handleConfirmTask: (project_id:string, taskId: string, type?: string) => void;
handleConfirmTask: (project_id: string, taskId: string, type?: string) => void;
addMessages: (taskId: string, messages: Message) => void;
setMessages: (taskId: string, messages: Message[]) => void;
updateMessage: (taskId: string, messageId: string, message: Message) => void;
removeMessage: (taskId: string, messageId: string) => void;
setAttaches: (taskId: string, attaches: File[]) => void;
setSummaryTask: (taskId: string, summaryTask: string) => void;
@ -102,6 +105,8 @@ export interface ChatStore {
clearTasks: () => void,
setIsContextExceeded: (taskId: string, isContextExceeded: boolean) => void;
setNextTaskId: (taskId: string | null) => void;
setStreamingDecomposeText: (taskId: string, text: string) => void;
clearStreamingDecomposeText: (taskId: string) => void;
}
export type VanillaChatStore = {
@ -118,6 +123,10 @@ const autoConfirmTimers: Record<string, ReturnType<typeof setTimeout>> = {};
// Track active SSE connections for proper cleanup
const activeSSEControllers: Record<string, AbortController> = {};
// Throttle streaming decompose text updates to prevent excessive re-renders
const streamingDecomposeTextBuffer: Record<string, string> = {};
const streamingDecomposeTextTimers: Record<string, ReturnType<typeof setTimeout>> = {};
const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
(set, get) => ({
activeTaskId: null,
@ -162,6 +171,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
snapshotsTemp: [],
isTakeControl: false,
isTaskEdit: false,
streamingDecomposeText: '',
},
}
}))
@ -212,6 +222,27 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
})
})
},
updateMessage(taskId: string, messageId: string, message: Message) {
set((state) => {
const task = state.tasks[taskId];
if (!task) return state;
const messages = task.messages.map((m) => {
if (m.id === messageId) {
return message;
}
return m;
});
return {
tasks: {
...state.tasks,
[taskId]: {
...task,
messages,
},
},
};
});
},
stopTask(taskId: string) {
// Abort the SSE connection for this task
try {
@ -305,7 +336,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
/**
* Replay creates its own chatStore for each task with replayProject
*/
if(project_id && type !== "replay") {
if (project_id && type !== "replay") {
console.log("Creating a new Chat Instance for current project on end")
const newChatResult = projectStore.appendInitChatStore(project_id);
@ -313,7 +344,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
newTaskId = newChatResult.taskId;
targetChatStore = newChatResult.chatStore;
targetChatStore.getState().setIsPending(newTaskId, true);
//From handleSend if message is given
// Add the message to the new chatStore if provided
if (messageContent) {
@ -329,11 +360,11 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
}
const base_Url = import.meta.env.DEV ? import.meta.env.VITE_PROXY_URL : import.meta.env.VITE_BASE_URL
const api = type == 'share' ?
`${base_Url}/api/chat/share/playback/${shareToken}?delay_time=${delayTime}`
: type == 'replay' ?
`${base_Url}/api/chat/steps/playback/${newTaskId}?delay_time=${delayTime}`
: `${baseURL}/chat`
const api = type == 'share' ?
`${base_Url}/api/chat/share/playback/${shareToken}?delay_time=${delayTime}`
: type == 'replay' ?
`${base_Url}/api/chat/steps/playback/${newTaskId}?delay_time=${delayTime}`
: `${baseURL}/chat`
const { tasks } = get()
let historyId: string | null = projectStore.getHistoryId(project_id);
@ -387,9 +418,9 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
apiModel = {
api_key: res.value,
model_type: cloud_model_type,
model_platform: cloud_model_type.includes('gpt') ? 'openai' :
cloud_model_type.includes('claude') ? 'anthropic' :
cloud_model_type.includes('gemini') ? 'gemini' : 'openai-compatible-model',
model_platform: cloud_model_type.includes('gpt') ? 'openai' :
cloud_model_type.includes('claude') ? 'anthropic' :
cloud_model_type.includes('gemini') ? 'gemini' : 'openai-compatible-model',
api_url: res.api_url,
extra_params: {}
}
@ -451,7 +482,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
} catch (error) {
console.log('get-env-path error', error)
}
// create history
if (!type) {
const authStore = getAuthStore();
@ -478,11 +509,11 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
* TODO(history): Remove historyId handling to support per projectId
* instead in history api
*/
if(project_id && historyId) projectStore.setHistoryId(project_id, historyId);
if (project_id && historyId) projectStore.setHistoryId(project_id, historyId);
})
}
const browser_port = await window.ipcRenderer.invoke('get-browser-port');
// Lock the chatStore reference at the start of SSE session to prevent focus changes
// during active message processing
let lockedChatStore = targetChatStore;
@ -577,8 +608,8 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
// - Task switching: confirmed, new_task_state, end
// - Multi-turn simple answer: wait_confirm
const isTaskSwitchingEvent = agentMessages.step === "confirmed" ||
agentMessages.step === "new_task_state" ||
agentMessages.step === "end";
agentMessages.step === "new_task_state" ||
agentMessages.step === "end";
const isMultiTurnSimpleAnswer = agentMessages.step === "wait_confirm";
@ -612,78 +643,78 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
*/
let currentTaskId = getCurrentTaskId();
const previousChatStore = getCurrentChatStore()
if(agentMessages.step === "confirmed") {
if (agentMessages.step === "confirmed") {
const { question } = agentMessages.data;
const shouldCreateNewChat = project_id && (question || messageContent);
//All except first confirmed event to reuse the existing chatStore
if(shouldCreateNewChat && !skipFirstConfirm) {
/**
* For Tasks where appended to existing project by
* reusing same projectId. Need to create new chatStore
* as it has been skipped earlier in startTask.
*/
const nextTaskId = previousChatStore.nextTaskId || undefined;
const newChatResult = projectStore.appendInitChatStore(project_id || projectStore.activeProjectId!, nextTaskId);
if (newChatResult) {
const { taskId: newTaskId, chatStore: newChatStore } = newChatResult;
// Update references for both scenarios
updateLockedReferences(newChatStore, newTaskId);
newChatStore.getState().setIsPending(newTaskId, false);
if(type === "replay") {
newChatStore.getState().setDelayTime(newTaskId, delayTime as number);
newChatStore.getState().setType(newTaskId, "replay");
}
if (shouldCreateNewChat && !skipFirstConfirm) {
/**
* For Tasks where appended to existing project by
* reusing same projectId. Need to create new chatStore
* as it has been skipped earlier in startTask.
*/
const nextTaskId = previousChatStore.nextTaskId || undefined;
const newChatResult = projectStore.appendInitChatStore(project_id || projectStore.activeProjectId!, nextTaskId);
const lastMessage = previousChatStore.tasks[currentTaskId]?.messages.at(-1);
if(lastMessage?.role === "user" && lastMessage?.id) {
previousChatStore.removeMessage(currentTaskId, lastMessage.id);
}
//Trick: by the time the question is retrieved from event,
//the last message from previous chatStore is at display
newChatStore.getState().addMessages(newTaskId, {
id: generateUniqueId(),
role: "user",
content: question || messageContent as string,
//TODO: The attaches that reach here (when Improve API is called) doesn't reach the backend
attaches: [...(previousChatStore.tasks[currentTaskId]?.attaches || []), ...(messageAttaches || [])],
});
console.log("[NEW CHATSTORE] Created for ", project_id);
if (newChatResult) {
const { taskId: newTaskId, chatStore: newChatStore } = newChatResult;
//Create a new history point
if (!type) {
const authStore = getAuthStore();
// Update references for both scenarios
updateLockedReferences(newChatStore, newTaskId);
newChatStore.getState().setIsPending(newTaskId, false);
const obj = {
"project_id": project_id,
"task_id": newTaskId,
"user_id": authStore.user_id,
"question": question || messageContent || (targetChatStore.getState().tasks[newTaskId]?.messages[0]?.content ?? ''),
"language": systemLanguage,
"model_platform": apiModel.model_platform,
"model_type": apiModel.model_type,
"api_url": modelType === 'cloud' ? "cloud" : apiModel.api_url,
"max_retries": 3,
"file_save_path": "string",
"installed_mcp": "string",
"status": 1,
"tokens": 0
}
await proxyFetchPost(`/api/chat/history`, obj).then(res => {
historyId = res.id;
/**Save history id for replay reuse purposes.
* TODO(history): Remove historyId handling to support per projectId
* instead in history api
*/
if(project_id && historyId) projectStore.setHistoryId(project_id, historyId);
})
}
if (type === "replay") {
newChatStore.getState().setDelayTime(newTaskId, delayTime as number);
newChatStore.getState().setType(newTaskId, "replay");
}
const lastMessage = previousChatStore.tasks[currentTaskId]?.messages.at(-1);
if (lastMessage?.role === "user" && lastMessage?.id) {
previousChatStore.removeMessage(currentTaskId, lastMessage.id);
}
//Trick: by the time the question is retrieved from event,
//the last message from previous chatStore is at display
newChatStore.getState().addMessages(newTaskId, {
id: generateUniqueId(),
role: "user",
content: question || messageContent as string,
//TODO: The attaches that reach here (when Improve API is called) doesn't reach the backend
attaches: [...(previousChatStore.tasks[currentTaskId]?.attaches || []), ...(messageAttaches || [])],
});
console.log("[NEW CHATSTORE] Created for ", project_id);
//Create a new history point
if (!type) {
const authStore = getAuthStore();
const obj = {
"project_id": project_id,
"task_id": newTaskId,
"user_id": authStore.user_id,
"question": question || messageContent || (targetChatStore.getState().tasks[newTaskId]?.messages[0]?.content ?? ''),
"language": systemLanguage,
"model_platform": apiModel.model_platform,
"model_type": apiModel.model_type,
"api_url": modelType === 'cloud' ? "cloud" : apiModel.api_url,
"max_retries": 3,
"file_save_path": "string",
"installed_mcp": "string",
"status": 1,
"tokens": 0
}
await proxyFetchPost(`/api/chat/history`, obj).then(res => {
historyId = res.id;
/**Save history id for replay reuse purposes.
* TODO(history): Remove historyId handling to support per projectId
* instead in history api
*/
if (project_id && historyId) projectStore.setHistoryId(project_id, historyId);
})
}
}
} else {
//NOTE: Triggered only with first "confirmed" in the project
//Handle Original cases - with old chatStore
@ -696,18 +727,18 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
return
}
const {
setNuwFileNum,
setCotList,
getTokens,
setUpdateCount,
addTokens,
setStatus,
addWebViewUrl,
setIsPending,
addMessages,
setHasWaitComfirm,
setSummaryTask,
const {
setNuwFileNum,
setCotList,
getTokens,
setUpdateCount,
addTokens,
setStatus,
addWebViewUrl,
setIsPending,
addMessages,
setHasWaitComfirm,
setSummaryTask,
setTaskAssigning,
setTaskInfo,
setTaskRunning,
@ -721,11 +752,50 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
setElapsed,
setActiveTaskId,
setIsContextExceeded,
setIsTaskEdit} = getCurrentChatStore()
setStreamingDecomposeText,
clearStreamingDecomposeText,
setIsTaskEdit } = getCurrentChatStore()
currentTaskId = getCurrentTaskId();
// if (tasks[currentTaskId].status === 'finished') return
if (agentMessages.step === "decompose_text") {
const { content } = agentMessages.data;
const text = content;
const currentId = getCurrentTaskId();
// Get current buffer or task state
const currentContent = streamingDecomposeTextBuffer[currentId] ||
getCurrentChatStore().tasks[currentId]?.streamingDecomposeText || "";
const newContent = text || "";
let updatedContent = newContent;
if (newContent.startsWith(currentContent)) {
// Accumulated format: new content contains old content -> Replace
updatedContent = newContent;
} else {
// Delta format: new content is a chunk -> Append
updatedContent = currentContent + newContent;
}
// Store in buffer immediately
streamingDecomposeTextBuffer[currentId] = updatedContent;
// Throttle store updates to every 50ms for smoother streaming display
if (!streamingDecomposeTextTimers[currentId]) {
streamingDecomposeTextTimers[currentId] = setTimeout(() => {
const bufferedText = streamingDecomposeTextBuffer[currentId];
if (bufferedText !== undefined) {
setStreamingDecomposeText(currentId, bufferedText);
}
delete streamingDecomposeTextTimers[currentId];
}, 16);
}
return;
}
if (agentMessages.step === "to_sub_tasks") {
// Clear streaming decompose text when task splitting is done
clearStreamingDecomposeText(currentTaskId);
// Check if this is a multi-turn scenario after task completion
const isMultiTurnAfterCompletion = tasks[currentTaskId].status === 'finished';
@ -863,18 +933,18 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
return;
}
if (agentMessages.step === "wait_confirm") {
const {content, question} = agentMessages.data;
const { content, question } = agentMessages.data;
setHasWaitComfirm(currentTaskId, true)
setIsPending(currentTaskId, false)
const currentChatStore = getCurrentChatStore();
//Make sure to add user Message on replay and avoid duplication of first msg
if(question && !(currentChatStore.tasks[currentTaskId].messages.length === 1)) {
if (question && !(currentChatStore.tasks[currentTaskId].messages.length === 1)) {
//Replace the optimistic update if existent.
const lastMessage = currentChatStore.tasks[currentTaskId]?.messages.at(-1);
if(lastMessage?.role === "user" && lastMessage.id && lastMessage.content === question) {
if (lastMessage?.role === "user" && lastMessage.id && lastMessage.content === question) {
currentChatStore.removeMessage(currentTaskId, lastMessage.id)
}
}
addMessages(currentTaskId, {
id: generateUniqueId(),
role: "user",
@ -1075,7 +1145,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
if (target) {
const { agentIndex, taskIndex } = target
const agentName = taskAssigning.find((agent: Agent) => agent.agent_id === assignee_id)?.name
if(agentName!==taskAssigning[agentIndex].name){
if (agentName !== taskAssigning[agentIndex].name) {
taskAssigning[agentIndex].tasks[taskIndex].reAssignTo = agentName
}
}
@ -1126,7 +1196,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
// Only update or add to taskRunning, never duplicate
if (taskRunningIndex === -1) {
// Task not in taskRunning, add it
if(task){
if (task) {
task.status = taskState === "waiting" ? "waiting" : "running";
}
taskRunning!.push(
@ -1322,27 +1392,27 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
}
if (agentMessages.step === "context_too_long") {
console.error('Context too long:', agentMessages.data)
const currentLength = agentMessages.data.current_length || 0;
const maxLength = agentMessages.data.max_length || 100000;
// Show toast notification
toast.dismiss();
toast.error(
`⚠️ Context Limit Exceeded\n\nThe conversation history is too long (${currentLength.toLocaleString()} / ${maxLength.toLocaleString()} characters).\n\nPlease create a new project to continue your work.`,
{
duration: Infinity,
closeButton: true,
}
);
if (agentMessages.step === "context_too_long") {
console.error('Context too long:', agentMessages.data)
const currentLength = agentMessages.data.current_length || 0;
const maxLength = agentMessages.data.max_length || 100000;
// Set flag to block input and set status to pause
setIsContextExceeded(currentTaskId, true);
setStatus(currentTaskId, "pause");
uploadLog(currentTaskId, type);
return
}
// Show toast notification
toast.dismiss();
toast.error(
`⚠️ Context Limit Exceeded\n\nThe conversation history is too long (${currentLength.toLocaleString()} / ${maxLength.toLocaleString()} characters).\n\nPlease create a new project to continue your work.`,
{
duration: Infinity,
closeButton: true,
}
);
// Set flag to block input and set status to pause
setIsContextExceeded(currentTaskId, true);
setStatus(currentTaskId, "pause");
uploadLog(currentTaskId, type);
return
}
if (agentMessages.step === "error") {
try {
@ -1355,8 +1425,8 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
// Safely extract error message with fallback chain
const errorMessage = agentMessages.data?.message ||
(typeof agentMessages.data === 'string' ? agentMessages.data : null) ||
'An error occurred while processing your request';
(typeof agentMessages.data === 'string' ? agentMessages.data : null) ||
'An error occurred while processing your request';
// Mark all incomplete tasks as failed
let taskRunning = [...tasks[currentTaskId].taskRunning];
@ -1438,10 +1508,10 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
const taskIdToRemove = agentMessages.data.task_id as string;
const projectStore = useProjectStore.getState();
//Remove the task from the queue on error
if(project_id) {
if (project_id) {
const project = projectStore.getProjectById(project_id);
if (project && project.queuedMessages) {
const messageToRemove = project.queuedMessages.find(msg =>
const messageToRemove = project.queuedMessages.find(msg =>
msg.task_id === taskIdToRemove || msg.content.includes(taskIdToRemove)
);
if (messageToRemove) {
@ -1467,7 +1537,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
// Find and remove the message with matching task ID
const project = projectStore.getProjectById(project_id);
if (project && project.queuedMessages) {
const messageToRemove = project.queuedMessages.find(msg =>
const messageToRemove = project.queuedMessages.find(msg =>
msg.task_id === taskIdToRemove || msg.content.includes(taskIdToRemove)
);
if (messageToRemove) {
@ -1773,7 +1843,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
const { create, setHasMessages, addMessages, startTask, setActiveTaskId, handleConfirmTask } = get();
//get project id
const project_id = useProjectStore.getState().activeProjectId
if(!project_id) {
if (!project_id) {
console.error("Can't replay task because no project id provided")
return;
}
@ -1996,7 +2066,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
},
}))
},
handleConfirmTask: async (project_id:string, taskId: string, type?: string) => {
handleConfirmTask: async (project_id: string, taskId: string, type?: string) => {
const { tasks, setMessages, setActiveWorkSpace, setStatus, setTaskTime, setTaskInfo, setTaskRunning, setIsTaskEdit } = get();
if (!taskId) return;
@ -2022,7 +2092,7 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
task: taskInfo,
});
await fetchPost(`/task/${project_id}/start`, {});
setActiveWorkSpace(taskId, 'workflow')
setStatus(taskId, 'running')
}
@ -2435,6 +2505,43 @@ const chatStore = (initial?: Partial<ChatStore>) => createStore<ChatStore>()(
...state,
nextTaskId: taskId,
}))
},
setStreamingDecomposeText: (taskId, text) => {
set((state) => {
if (!state.tasks[taskId]) return state;
return {
...state,
tasks: {
...state.tasks,
[taskId]: {
...state.tasks[taskId],
streamingDecomposeText: text,
},
},
};
});
},
clearStreamingDecomposeText: (taskId) => {
// Clear buffer and any pending timer
delete streamingDecomposeTextBuffer[taskId];
if (streamingDecomposeTextTimers[taskId]) {
clearTimeout(streamingDecomposeTextTimers[taskId]);
delete streamingDecomposeTextTimers[taskId];
}
set((state) => {
if (!state.tasks[taskId]) return state;
return {
...state,
tasks: {
...state.tasks,
[taskId]: {
...state.tasks[taskId],
streamingDecomposeText: '',
},
},
};
});
}
})
);

View file

@ -38,7 +38,7 @@ declare global {
toolkitStatus?: AgentStatus;
}[];
failure_count?: number;
reAssignTo?:string;
reAssignTo?: string;
}
interface File {
@ -66,7 +66,7 @@ declare global {
log: AgentMessage[];
img?: string[];
activeWebviewIds?: ActiveWebView[];
tools?:string[];
tools?: string[];
workerInfo?: {
name: string;
description: string;
@ -94,13 +94,13 @@ declare global {
task_id?: string;
summary?: string;
agent_name?: string;
attaches?:File[]
attaches?: File[]
}
interface AgentMessage {
step: string;
data: {
project_id?:string;
project_id?: string;
failure_count?: number;
tokens?: number;
sub_tasks?: TaskInfo[];
@ -125,7 +125,8 @@ declare global {
tools?: string[];
//Context Length
current_length?: number;
max_length?: number
max_length?: number;
text?: string;
};
status?: 'running' | 'filled' | 'completed';
}