mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-22 19:47:28 +00:00
Co-authored-by: Douglas <douglas.ym.lai@gmail.com> Co-authored-by: Douglas Lai <115660088+Douglasymlai@users.noreply.github.com>
665 lines
22 KiB
Python
665 lines
22 KiB
Python
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
||
|
||
import asyncio
|
||
import inspect
|
||
import logging
|
||
import os
|
||
import re
|
||
import time
|
||
from pathlib import Path
|
||
|
||
from dotenv import load_dotenv
|
||
from fastapi import APIRouter, Request, Response
|
||
from fastapi.responses import StreamingResponse
|
||
|
||
from app.component import code
|
||
from app.component.environment import sanitize_env_path, set_user_env_path
|
||
from app.exception.exception import UserException
|
||
from app.model.chat import (
|
||
AddTaskRequest,
|
||
Chat,
|
||
HumanReply,
|
||
McpServers,
|
||
Status,
|
||
SupplementChat,
|
||
sse_json,
|
||
)
|
||
from app.service.chat_service import step_solve
|
||
from app.service.task import (
|
||
Action,
|
||
ActionAddTaskData,
|
||
ActionImproveData,
|
||
ActionInstallMcpData,
|
||
ActionRemoveTaskData,
|
||
ActionSkipTaskData,
|
||
ActionStopData,
|
||
ActionSupplementData,
|
||
ImprovePayload,
|
||
delete_task_lock,
|
||
get_or_create_task_lock,
|
||
get_task_lock,
|
||
get_task_lock_if_exists,
|
||
set_current_task_id,
|
||
task_locks,
|
||
)
|
||
from app.utils.browser_launcher import (
|
||
ensure_cdp_browser_endpoint,
|
||
is_cdp_url_available,
|
||
normalize_cdp_url,
|
||
)
|
||
|
||
router = APIRouter()
|
||
|
||
# Logger for chat controller
|
||
chat_logger = logging.getLogger("chat_controller")
|
||
|
||
# SSE timeout configuration (60 minutes in seconds)
|
||
SSE_TIMEOUT_SECONDS = 60 * 60
|
||
|
||
|
||
def _is_remote_browser_hands(request: Request | None) -> bool:
|
||
hands = getattr(getattr(request, "state", None), "hands", None)
|
||
if hands is None:
|
||
return False
|
||
get_manifest = getattr(hands, "get_capability_manifest", None)
|
||
if get_manifest is None or inspect.iscoroutinefunction(get_manifest):
|
||
return False
|
||
try:
|
||
manifest = get_manifest()
|
||
except Exception:
|
||
return False
|
||
if inspect.isawaitable(manifest):
|
||
if hasattr(manifest, "close"):
|
||
manifest.close()
|
||
return False
|
||
if not isinstance(manifest, dict):
|
||
return False
|
||
return manifest.get("deployment") == "remote_cluster"
|
||
|
||
|
||
async def _prepare_browser_for_request(
|
||
request: Request | None,
|
||
port: int,
|
||
) -> bool:
|
||
existing_cdp_url = os.environ.get("EIGENT_CDP_URL", "").strip()
|
||
if existing_cdp_url:
|
||
is_available = await asyncio.to_thread(
|
||
is_cdp_url_available, existing_cdp_url
|
||
)
|
||
if is_available:
|
||
normalized_endpoint, _, selected_port = normalize_cdp_url(
|
||
existing_cdp_url
|
||
)
|
||
os.environ["EIGENT_CDP_URL"] = normalized_endpoint
|
||
os.environ["browser_port"] = str(selected_port)
|
||
if request is not None:
|
||
request.state.browser_available = True
|
||
return True
|
||
os.environ.pop("EIGENT_CDP_URL", None)
|
||
|
||
if _is_remote_browser_hands(request):
|
||
if request is not None:
|
||
request.state.browser_available = True
|
||
return True
|
||
|
||
try:
|
||
endpoint = await asyncio.to_thread(ensure_cdp_browser_endpoint, port)
|
||
except Exception as e:
|
||
os.environ.pop("EIGENT_CDP_URL", None)
|
||
chat_logger.warning(
|
||
"Could not ensure CDP browser for web mode",
|
||
extra={"error": str(e), "port": port},
|
||
)
|
||
if request is not None:
|
||
request.state.browser_available = False
|
||
return False
|
||
|
||
if endpoint:
|
||
os.environ["EIGENT_CDP_URL"] = endpoint
|
||
_, _, selected_port = normalize_cdp_url(endpoint)
|
||
os.environ["browser_port"] = str(selected_port)
|
||
if request is not None:
|
||
request.state.browser_available = True
|
||
return True
|
||
|
||
os.environ.pop("EIGENT_CDP_URL", None)
|
||
chat_logger.warning(
|
||
"CDP browser not available after ensure attempt",
|
||
extra={"port": port},
|
||
)
|
||
if request is not None:
|
||
request.state.browser_available = False
|
||
return False
|
||
|
||
|
||
async def _cleanup_task_lock_safe(task_lock, reason: str) -> bool:
|
||
"""Safely cleanup task lock with existence check.
|
||
|
||
Args:
|
||
task_lock: The task lock to cleanup
|
||
reason: Reason for cleanup (for logging)
|
||
|
||
Returns:
|
||
True if cleanup was performed, False otherwise
|
||
"""
|
||
if not task_lock:
|
||
return False
|
||
|
||
# Check if task_lock still exists before attempting cleanup
|
||
if task_lock.id not in task_locks:
|
||
chat_logger.debug(
|
||
f"[{reason}] Task lock already removed, skipping cleanup",
|
||
extra={"task_id": task_lock.id},
|
||
)
|
||
return False
|
||
|
||
try:
|
||
task_lock.status = Status.done
|
||
await delete_task_lock(task_lock.id)
|
||
chat_logger.info(
|
||
f"[{reason}] Task lock cleanup completed",
|
||
extra={"task_id": task_lock.id},
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
chat_logger.error(
|
||
f"[{reason}] Failed to cleanup task lock",
|
||
extra={"task_id": task_lock.id, "error": str(e)},
|
||
exc_info=True,
|
||
)
|
||
return False
|
||
|
||
|
||
async def timeout_stream_wrapper(
|
||
stream_generator,
|
||
timeout_seconds: int = SSE_TIMEOUT_SECONDS,
|
||
task_lock=None,
|
||
):
|
||
"""Wraps a stream generator with timeout handling.
|
||
|
||
Closes the SSE connection if no data is received within the timeout period.
|
||
Triggers cleanup if timeout occurs to prevent resource leaks.
|
||
"""
|
||
last_data_time = time.time()
|
||
generator = stream_generator.__aiter__()
|
||
cleanup_triggered = False
|
||
|
||
try:
|
||
while True:
|
||
elapsed = time.time() - last_data_time
|
||
remaining_timeout = timeout_seconds - elapsed
|
||
|
||
try:
|
||
data = await asyncio.wait_for(
|
||
generator.__anext__(), timeout=remaining_timeout
|
||
)
|
||
last_data_time = time.time()
|
||
yield data
|
||
except TimeoutError:
|
||
chat_logger.warning(
|
||
"SSE timeout: No data received, closing connection",
|
||
extra={"timeout_seconds": timeout_seconds},
|
||
)
|
||
timeout_min = timeout_seconds // 60
|
||
yield sse_json(
|
||
"error",
|
||
{
|
||
"message": "Connection timeout: No data"
|
||
f" received for {timeout_min}"
|
||
" minutes"
|
||
},
|
||
)
|
||
cleanup_triggered = await _cleanup_task_lock_safe(
|
||
task_lock, "TIMEOUT"
|
||
)
|
||
break
|
||
except StopAsyncIteration:
|
||
break
|
||
|
||
except asyncio.CancelledError:
|
||
chat_logger.info(
|
||
"[STREAM-CANCELLED] Stream cancelled, triggering cleanup"
|
||
)
|
||
if not cleanup_triggered:
|
||
await _cleanup_task_lock_safe(task_lock, "CANCELLED")
|
||
raise
|
||
except Exception as e:
|
||
chat_logger.error(
|
||
"[STREAM-ERROR] Unexpected error in stream wrapper",
|
||
extra={"error": str(e)},
|
||
exc_info=True,
|
||
)
|
||
if not cleanup_triggered:
|
||
await _cleanup_task_lock_safe(task_lock, "ERROR")
|
||
raise
|
||
|
||
|
||
async def start_chat_stream(data: Chat, request: Request):
|
||
"""
|
||
Setup and start chat stream. Used by POST /chat and Message Router.
|
||
Returns async generator of SSE chunks.
|
||
"""
|
||
chat_logger.info(
|
||
"Starting new chat session",
|
||
extra={
|
||
"project_id": data.project_id,
|
||
"task_id": data.task_id,
|
||
"user": data.email,
|
||
},
|
||
)
|
||
|
||
task_lock = get_or_create_task_lock(data.project_id)
|
||
|
||
# Set user-specific environment path for this thread
|
||
set_user_env_path(data.env_path)
|
||
# Load environment with validated path
|
||
safe_env_path = sanitize_env_path(data.env_path)
|
||
if safe_env_path:
|
||
load_dotenv(dotenv_path=safe_env_path)
|
||
|
||
# TODO(multi-tenant): os.environ is global – concurrent sessions overwrite
|
||
# each other's API keys, file paths, and browser ports. Pass these values
|
||
# through Chat / request context instead of mutating the process environment.
|
||
os.environ["file_save_path"] = data.file_save_path()
|
||
os.environ["browser_port"] = str(data.browser_port)
|
||
# Web mode: reuse an existing CDP endpoint first, otherwise acquire browser
|
||
# through RemoteHands or launch a local browser when available.
|
||
if not data.cdp_browsers:
|
||
await _prepare_browser_for_request(request, data.browser_port)
|
||
os.environ["OPENAI_API_KEY"] = data.api_key
|
||
os.environ["OPENAI_API_BASE_URL"] = (
|
||
data.api_url or "https://api.openai.com/v1"
|
||
)
|
||
os.environ["CAMEL_MODEL_LOG_ENABLED"] = "true"
|
||
|
||
# Set user-specific search engine configuration if provided
|
||
if data.search_config:
|
||
for key, value in data.search_config.items():
|
||
if value:
|
||
os.environ[key] = value
|
||
chat_logger.debug(
|
||
f"Set search config: {key}",
|
||
extra={"project_id": data.project_id},
|
||
)
|
||
|
||
email_sanitized = re.sub(
|
||
r'[\\/*?:"<>|\s]', "_", data.email.split("@")[0]
|
||
).strip(".")
|
||
camel_log = (
|
||
Path.home()
|
||
/ ".eigent"
|
||
/ email_sanitized
|
||
/ ("project_" + data.project_id)
|
||
/ ("task_" + data.task_id)
|
||
/ "camel_logs"
|
||
)
|
||
camel_log.mkdir(parents=True, exist_ok=True)
|
||
|
||
os.environ["CAMEL_LOG_DIR"] = str(camel_log)
|
||
|
||
if data.is_cloud():
|
||
os.environ["cloud_api_key"] = data.api_key
|
||
|
||
# Set the initial current_task_id in task_lock
|
||
set_current_task_id(data.project_id, data.task_id)
|
||
|
||
# Put initial action in queue to start processing
|
||
await task_lock.put_queue(
|
||
ActionImproveData(
|
||
data=ImprovePayload(
|
||
question=data.question,
|
||
attaches=data.attaches or [],
|
||
),
|
||
new_task_id=data.task_id,
|
||
)
|
||
)
|
||
|
||
chat_logger.info(
|
||
"Chat session initialized",
|
||
extra={
|
||
"project_id": data.project_id,
|
||
"task_id": data.task_id,
|
||
"log_dir": str(camel_log),
|
||
},
|
||
)
|
||
return timeout_stream_wrapper(
|
||
step_solve(data, request, task_lock), task_lock=task_lock
|
||
)
|
||
|
||
|
||
@router.post("/chat", name="start chat")
|
||
async def post(data: Chat, request: Request):
|
||
stream = await start_chat_stream(data, request)
|
||
return StreamingResponse(
|
||
stream,
|
||
media_type="text/event-stream",
|
||
)
|
||
|
||
|
||
@router.post("/chat/{id}", name="improve chat")
|
||
def improve(id: str, data: SupplementChat, request: Request):
|
||
chat_logger.info(
|
||
"Chat improvement requested",
|
||
extra={"task_id": id, "question_length": len(data.question)},
|
||
)
|
||
task_lock = get_task_lock(id)
|
||
|
||
# Reuse an existing endpoint when possible to avoid tearing down
|
||
# a browser that was manually connected through the Browser page.
|
||
port = int(os.environ.get("browser_port", "9222"))
|
||
asyncio.run(_prepare_browser_for_request(request, port))
|
||
|
||
# Allow continuing conversation even after task is done
|
||
# This supports multi-turn conversation after complex task completion
|
||
if task_lock.status == Status.done:
|
||
# Reset status to allow processing new messages
|
||
task_lock.status = Status.confirming
|
||
# Clear any existing background tasks since workforce was stopped
|
||
if hasattr(task_lock, "background_tasks"):
|
||
task_lock.background_tasks.clear()
|
||
# Note: conversation_history and last_task_result are preserved
|
||
|
||
# Log context preservation
|
||
if hasattr(task_lock, "conversation_history"):
|
||
hist_len = len(task_lock.conversation_history)
|
||
chat_logger.info(
|
||
f"[CONTEXT] Preserved {hist_len} conversation entries"
|
||
)
|
||
if hasattr(task_lock, "last_task_result"):
|
||
result_len = len(task_lock.last_task_result)
|
||
chat_logger.info(
|
||
f"[CONTEXT] Preserved task result: {result_len} chars"
|
||
)
|
||
|
||
# If task_id is provided, optimistically update
|
||
# file_save_path (will be destroyed if task is
|
||
# not complex)
|
||
# this is because a NEW workforce instance may be created for this task
|
||
new_folder_path = None
|
||
if data.task_id:
|
||
try:
|
||
# Get current environment values needed to construct new path
|
||
current_email = None
|
||
|
||
# Extract email from current file_save_path if available
|
||
current_file_save_path = os.environ.get("file_save_path", "")
|
||
if current_file_save_path:
|
||
path_parts = Path(current_file_save_path).parts
|
||
if len(path_parts) >= 3 and "eigent" in path_parts:
|
||
eigent_index = path_parts.index("eigent")
|
||
if eigent_index + 1 < len(path_parts):
|
||
current_email = path_parts[eigent_index + 1]
|
||
|
||
# If we have the necessary info, update
|
||
# the file_save_path
|
||
if current_email and id:
|
||
# Create new path using the existing
|
||
# pattern: email/project_{id}/task_{id}
|
||
new_folder_path = (
|
||
Path.home()
|
||
/ "eigent"
|
||
/ current_email
|
||
/ f"project_{id}"
|
||
/ f"task_{data.task_id}"
|
||
)
|
||
new_folder_path.mkdir(parents=True, exist_ok=True)
|
||
os.environ["file_save_path"] = str(new_folder_path)
|
||
chat_logger.info(
|
||
f"Updated file_save_path to: {new_folder_path}"
|
||
)
|
||
|
||
# Store the new folder path in task_lock
|
||
# for potential cleanup and persistence
|
||
task_lock.new_folder_path = new_folder_path
|
||
else:
|
||
chat_logger.warning(
|
||
"Could not update"
|
||
" file_save_path -"
|
||
f" email: {current_email},"
|
||
f" project_id: {id}"
|
||
)
|
||
|
||
except Exception as e:
|
||
chat_logger.error(
|
||
"Error updating file path for"
|
||
f" project_id: {id},"
|
||
f" task_id: {data.task_id}:"
|
||
f" {e}"
|
||
)
|
||
|
||
asyncio.run(
|
||
task_lock.put_queue(
|
||
ActionImproveData(
|
||
data=ImprovePayload(
|
||
question=data.question,
|
||
attaches=data.attaches or [],
|
||
),
|
||
new_task_id=data.task_id,
|
||
)
|
||
)
|
||
)
|
||
chat_logger.info(
|
||
"Improvement request queued with preserved context",
|
||
extra={"project_id": id},
|
||
)
|
||
return Response(status_code=201)
|
||
|
||
|
||
@router.put("/chat/{id}", name="supplement task")
|
||
def supplement(id: str, data: SupplementChat):
|
||
chat_logger.info("Chat supplement requested", extra={"task_id": id})
|
||
task_lock = get_task_lock(id)
|
||
if task_lock.status != Status.done:
|
||
raise UserException(code.error, "Please wait task done")
|
||
asyncio.run(task_lock.put_queue(ActionSupplementData(data=data)))
|
||
chat_logger.debug("Supplement data queued", extra={"task_id": id})
|
||
return Response(status_code=201)
|
||
|
||
|
||
@router.delete("/chat/{id}", name="stop chat")
|
||
def stop(id: str):
|
||
"""stop the task"""
|
||
chat_logger.info("=" * 80)
|
||
chat_logger.info(
|
||
"🛑 [STOP-BUTTON] DELETE /chat/{id} request received from frontend"
|
||
)
|
||
chat_logger.info(f"[STOP-BUTTON] project_id/task_id: {id}")
|
||
chat_logger.info("=" * 80)
|
||
task_lock = get_task_lock_if_exists(id)
|
||
if task_lock is not None:
|
||
chat_logger.info(
|
||
"[STOP-BUTTON] Task lock retrieved,"
|
||
f" task_lock.id: {task_lock.id},"
|
||
f" task_lock.status: {task_lock.status}"
|
||
)
|
||
chat_logger.info(
|
||
"[STOP-BUTTON] Queueing"
|
||
" ActionStopData(Action.stop)"
|
||
" to task_lock queue"
|
||
)
|
||
try:
|
||
asyncio.run(
|
||
task_lock.put_queue(ActionStopData(action=Action.stop))
|
||
)
|
||
chat_logger.info(
|
||
"[STOP-BUTTON] ActionStopData queued"
|
||
" successfully, this will trigger"
|
||
" workforce.stop_gracefully()"
|
||
)
|
||
except Exception as e:
|
||
chat_logger.warning(
|
||
"[STOP-BUTTON] Failed to queue ActionStopData",
|
||
extra={"task_id": id, "error": str(e)},
|
||
)
|
||
else:
|
||
chat_logger.warning(
|
||
"[STOP-BUTTON] Task lock not found, task may already be stopped",
|
||
extra={"task_id": id},
|
||
)
|
||
return Response(status_code=204)
|
||
|
||
|
||
@router.post("/chat/{id}/human-reply")
|
||
def human_reply(id: str, data: HumanReply):
|
||
chat_logger.info(
|
||
"Human reply received",
|
||
extra={"task_id": id, "reply_length": len(data.reply)},
|
||
)
|
||
task_lock = get_task_lock(id)
|
||
asyncio.run(task_lock.put_human_input(data.agent, data.reply))
|
||
chat_logger.debug("Human reply processed", extra={"task_id": id})
|
||
return Response(status_code=201)
|
||
|
||
|
||
@router.post("/chat/{id}/install-mcp")
|
||
def install_mcp(id: str, data: McpServers):
|
||
chat_logger.info(
|
||
"Installing MCP servers",
|
||
extra={
|
||
"task_id": id,
|
||
"servers_count": len(data.get("mcpServers", {})),
|
||
},
|
||
)
|
||
task_lock = get_task_lock(id)
|
||
asyncio.run(
|
||
task_lock.put_queue(
|
||
ActionInstallMcpData(action=Action.install_mcp, data=data)
|
||
)
|
||
)
|
||
chat_logger.info("MCP installation queued", extra={"task_id": id})
|
||
return Response(status_code=201)
|
||
|
||
|
||
@router.post("/chat/{id}/add-task", name="add task to workforce")
|
||
def add_task(id: str, data: AddTaskRequest):
|
||
"""Add a new task to the workforce"""
|
||
chat_logger.info(
|
||
"Adding task to workforce for"
|
||
f" task_id: {id},"
|
||
f" content: {data.content[:100]}..."
|
||
)
|
||
task_lock = get_task_lock(id)
|
||
|
||
try:
|
||
# Queue the add task action
|
||
add_task_action = ActionAddTaskData(
|
||
content=data.content,
|
||
project_id=data.project_id,
|
||
task_id=data.task_id,
|
||
additional_info=data.additional_info,
|
||
insert_position=data.insert_position,
|
||
)
|
||
asyncio.run(task_lock.put_queue(add_task_action))
|
||
return Response(status_code=201)
|
||
|
||
except Exception as e:
|
||
chat_logger.error(f"Error adding task for task_id: {id}: {e}")
|
||
raise UserException(code.error, f"Failed to add task: {str(e)}")
|
||
|
||
|
||
@router.delete(
|
||
"/chat/{project_id}/remove-task/{task_id}",
|
||
name="remove task from workforce",
|
||
)
|
||
def remove_task(project_id: str, task_id: str):
|
||
"""Remove a task from the workforce"""
|
||
chat_logger.info(
|
||
f"Removing task {task_id} from workforce for project_id: {project_id}"
|
||
)
|
||
task_lock = get_task_lock(project_id)
|
||
|
||
try:
|
||
# Queue the remove task action
|
||
remove_task_action = ActionRemoveTaskData(
|
||
task_id=task_id, project_id=project_id
|
||
)
|
||
asyncio.run(task_lock.put_queue(remove_task_action))
|
||
|
||
chat_logger.info(
|
||
"Task removal request queued for"
|
||
f" project_id: {project_id},"
|
||
f" removing task: {task_id}"
|
||
)
|
||
return Response(status_code=204)
|
||
|
||
except Exception as e:
|
||
chat_logger.error(
|
||
f"Error removing task {task_id} for project_id: {project_id}: {e}"
|
||
)
|
||
raise UserException(code.error, f"Failed to remove task: {str(e)}")
|
||
|
||
|
||
@router.post("/chat/{project_id}/skip-task", name="skip task in workforce")
|
||
def skip_task(project_id: str):
|
||
"""
|
||
Skip/Stop current task execution while preserving context.
|
||
This endpoint is called when user clicks the Stop button.
|
||
|
||
Behavior:
|
||
- Stops workforce gracefully
|
||
- Marks task as done
|
||
- Preserves conversation_history and last_task_result in task_lock
|
||
- Sends 'end' event to frontend
|
||
- Keeps SSE connection alive for multi-turn conversation
|
||
"""
|
||
chat_logger.info("=" * 80)
|
||
chat_logger.info(
|
||
"[STOP-BUTTON] SKIP-TASK request"
|
||
" received from frontend"
|
||
" (User clicked Stop)"
|
||
)
|
||
chat_logger.info(f"[STOP-BUTTON] project_id: {project_id}")
|
||
chat_logger.info("=" * 80)
|
||
task_lock = get_task_lock_if_exists(project_id)
|
||
if task_lock is None:
|
||
chat_logger.warning(
|
||
"[STOP-BUTTON] Task lock not found, task may already be stopped",
|
||
extra={"project_id": project_id},
|
||
)
|
||
return Response(status_code=204)
|
||
chat_logger.info(
|
||
"[STOP-BUTTON] Task lock retrieved,"
|
||
f" task_lock.id: {task_lock.id},"
|
||
" task_lock.status:"
|
||
f" {task_lock.status}"
|
||
)
|
||
|
||
try:
|
||
# Queue the skip task action - this will
|
||
# preserve context for multi-turn
|
||
skip_task_action = ActionSkipTaskData(project_id=project_id)
|
||
chat_logger.info(
|
||
"[STOP-BUTTON] Queueing"
|
||
" ActionSkipTaskData"
|
||
" (preserves context,"
|
||
" marks as done)"
|
||
)
|
||
asyncio.run(task_lock.put_queue(skip_task_action))
|
||
|
||
chat_logger.info(
|
||
"[STOP-BUTTON] Skip request"
|
||
" queued - task will stop"
|
||
" gracefully and preserve context"
|
||
)
|
||
return Response(status_code=201)
|
||
|
||
except Exception as e:
|
||
chat_logger.error(
|
||
"[STOP-BUTTON] Error skipping"
|
||
" task for"
|
||
f" project_id: {project_id}:"
|
||
f" {e}"
|
||
)
|
||
raise UserException(code.error, f"Failed to skip task: {str(e)}")
|