eigent/backend/app/controller/chat_controller.py
bytecraftii 49e148a2f9
Add langfuse and update logger (#952)
Co-authored-by: bytecraftii <bytecraftii@users.noreply.github.com>
Co-authored-by: Wendong-Fan <w3ndong.fan@gmail.com>
2026-01-25 08:13:07 +08:00

369 lines
15 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import asyncio
import os
import re
import time
from pathlib import Path
from dotenv import load_dotenv
from fastapi import APIRouter, HTTPException, Request, Response
from fastapi.responses import StreamingResponse
import logging
from app.component import code
from app.exception.exception import UserException
from app.model.chat import Chat, HumanReply, McpServers, Status, SupplementChat, AddTaskRequest, sse_json
from app.service.chat_service import step_solve
from app.service.task import (
Action,
ActionImproveData,
ActionInstallMcpData,
ActionStopData,
ActionSupplementData,
ActionAddTaskData,
ActionRemoveTaskData,
ActionSkipTaskData,
get_or_create_task_lock,
get_task_lock,
set_current_task_id,
delete_task_lock,
task_locks,
)
from app.component.environment import set_user_env_path, sanitize_env_path
from app.utils.workforce import Workforce
from camel.tasks.task import Task
router = APIRouter()
# Logger for chat controller
chat_logger = logging.getLogger("chat_controller")
# SSE timeout configuration (60 minutes in seconds)
SSE_TIMEOUT_SECONDS = 60 * 60
async def _cleanup_task_lock_safe(task_lock, reason: str) -> bool:
"""Safely cleanup task lock with existence check.
Args:
task_lock: The task lock to cleanup
reason: Reason for cleanup (for logging)
Returns:
True if cleanup was performed, False otherwise
"""
if not task_lock:
return False
# Check if task_lock still exists before attempting cleanup
if task_lock.id not in task_locks:
chat_logger.debug(f"[{reason}] Task lock already removed, skipping cleanup",
extra={"task_id": task_lock.id})
return False
try:
task_lock.status = Status.done
await delete_task_lock(task_lock.id)
chat_logger.info(f"[{reason}] Task lock cleanup completed",
extra={"task_id": task_lock.id})
return True
except Exception as e:
chat_logger.error(f"[{reason}] Failed to cleanup task lock",
extra={"task_id": task_lock.id, "error": str(e)}, exc_info=True)
return False
async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS, task_lock=None):
"""Wraps a stream generator with timeout handling.
Closes the SSE connection if no data is received within the timeout period.
Triggers cleanup if timeout occurs to prevent resource leaks.
"""
last_data_time = time.time()
generator = stream_generator.__aiter__()
cleanup_triggered = False
try:
while True:
elapsed = time.time() - last_data_time
remaining_timeout = timeout_seconds - elapsed
try:
data = await asyncio.wait_for(generator.__anext__(), timeout=remaining_timeout)
last_data_time = time.time()
yield data
except asyncio.TimeoutError:
chat_logger.warning("SSE timeout: No data received, closing connection",
extra={"timeout_seconds": timeout_seconds})
yield sse_json("error", {"message": f"Connection timeout: No data received for {timeout_seconds // 60} minutes"})
cleanup_triggered = await _cleanup_task_lock_safe(task_lock, "TIMEOUT")
break
except StopAsyncIteration:
break
except asyncio.CancelledError:
chat_logger.info("[STREAM-CANCELLED] Stream cancelled, triggering cleanup")
if not cleanup_triggered:
await _cleanup_task_lock_safe(task_lock, "CANCELLED")
raise
except Exception as e:
chat_logger.error("[STREAM-ERROR] Unexpected error in stream wrapper",
extra={"error": str(e)}, exc_info=True)
if not cleanup_triggered:
await _cleanup_task_lock_safe(task_lock, "ERROR")
raise
@router.post("/chat", name="start chat")
async def post(data: Chat, request: Request):
chat_logger.info(
"Starting new chat session",
extra={"project_id": data.project_id, "task_id": data.task_id, "user": data.email}
)
task_lock = get_or_create_task_lock(data.project_id)
# Set user-specific environment path for this thread
set_user_env_path(data.env_path)
# Load environment with validated path
safe_env_path = sanitize_env_path(data.env_path)
if safe_env_path:
load_dotenv(dotenv_path=safe_env_path)
os.environ["file_save_path"] = data.file_save_path()
os.environ["browser_port"] = str(data.browser_port)
os.environ["OPENAI_API_KEY"] = data.api_key
os.environ["OPENAI_API_BASE_URL"] = data.api_url or "https://api.openai.com/v1"
os.environ["CAMEL_MODEL_LOG_ENABLED"] = "true"
# Set user-specific search engine configuration if provided
if data.search_config:
for key, value in data.search_config.items():
if value:
os.environ[key] = value
chat_logger.debug(f"Set search config: {key}", extra={"project_id": data.project_id})
email_sanitized = re.sub(r'[\\/*?:"<>|\s]', "_", data.email.split("@")[0]).strip(".")
camel_log = (
Path.home()
/ ".eigent"
/ email_sanitized
/ ("project_" + data.project_id)
/ ("task_" + data.task_id)
/ "camel_logs"
)
camel_log.mkdir(parents=True, exist_ok=True)
os.environ["CAMEL_LOG_DIR"] = str(camel_log)
if data.is_cloud():
os.environ["cloud_api_key"] = data.api_key
# Set the initial current_task_id in task_lock
set_current_task_id(data.project_id, data.task_id)
# Put initial action in queue to start processing
await task_lock.put_queue(ActionImproveData(data=data.question, new_task_id=data.task_id))
chat_logger.info(
"Chat session initialized",
extra={"project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log)},
)
return StreamingResponse(
timeout_stream_wrapper(step_solve(data, request, task_lock), task_lock=task_lock), media_type="text/event-stream"
)
@router.post("/chat/{id}", name="improve chat")
def improve(id: str, data: SupplementChat):
chat_logger.info("Chat improvement requested", extra={"task_id": id, "question_length": len(data.question)})
task_lock = get_task_lock(id)
# Allow continuing conversation even after task is done
# This supports multi-turn conversation after complex task completion
if task_lock.status == Status.done:
# Reset status to allow processing new messages
task_lock.status = Status.confirming
# Clear any existing background tasks since workforce was stopped
if hasattr(task_lock, "background_tasks"):
task_lock.background_tasks.clear()
# Note: conversation_history and last_task_result are preserved
# Log context preservation
if hasattr(task_lock, "conversation_history"):
chat_logger.info(f"[CONTEXT] Preserved {len(task_lock.conversation_history)} conversation entries")
if hasattr(task_lock, "last_task_result"):
chat_logger.info(f"[CONTEXT] Preserved task result: {len(task_lock.last_task_result)} chars")
# If task_id is provided, optimistically update file_save_path (will be destroyed if task is not complex)
# this is because a NEW workforce instance may be created for this task
new_folder_path = None
if data.task_id:
try:
# Get current environment values needed to construct new path
current_email = None
# Extract email from current file_save_path if available
current_file_save_path = os.environ.get("file_save_path", "")
if current_file_save_path:
path_parts = Path(current_file_save_path).parts
if len(path_parts) >= 3 and "eigent" in path_parts:
eigent_index = path_parts.index("eigent")
if eigent_index + 1 < len(path_parts):
current_email = path_parts[eigent_index + 1]
# If we have the necessary information, update the file_save_path
if current_email and id:
# Create new path using the existing pattern: email/project_{project_id}/task_{task_id}
new_folder_path = Path.home() / "eigent" / current_email / f"project_{id}" / f"task_{data.task_id}"
new_folder_path.mkdir(parents=True, exist_ok=True)
os.environ["file_save_path"] = str(new_folder_path)
chat_logger.info(f"Updated file_save_path to: {new_folder_path}")
# Store the new folder path in task_lock for potential cleanup and persistence
task_lock.new_folder_path = new_folder_path
else:
chat_logger.warning(f"Could not update file_save_path - email: {current_email}, project_id: {id}")
except Exception as e:
chat_logger.error(f"Error updating file path for project_id: {id}, task_id: {data.task_id}: {e}")
asyncio.run(task_lock.put_queue(ActionImproveData(data=data.question, new_task_id=data.task_id)))
chat_logger.info("Improvement request queued with preserved context", extra={"project_id": id})
return Response(status_code=201)
@router.put("/chat/{id}", name="supplement task")
def supplement(id: str, data: SupplementChat):
chat_logger.info("Chat supplement requested", extra={"task_id": id})
task_lock = get_task_lock(id)
if task_lock.status != Status.done:
raise UserException(code.error, "Please wait task done")
asyncio.run(task_lock.put_queue(ActionSupplementData(data=data)))
chat_logger.debug("Supplement data queued", extra={"task_id": id})
return Response(status_code=201)
@router.delete("/chat/{id}", name="stop chat")
def stop(id: str):
"""stop the task"""
chat_logger.info("=" * 80)
chat_logger.info("🛑 [STOP-BUTTON] DELETE /chat/{id} request received from frontend")
chat_logger.info(f"[STOP-BUTTON] project_id/task_id: {id}")
chat_logger.info("=" * 80)
try:
task_lock = get_task_lock(id)
chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}")
chat_logger.info(f"[STOP-BUTTON] Queueing ActionStopData(Action.stop) to task_lock queue")
asyncio.run(task_lock.put_queue(ActionStopData(action=Action.stop)))
chat_logger.info(f"[STOP-BUTTON] ✅ ActionStopData queued successfully, this will trigger workforce.stop_gracefully()")
except Exception as e:
# Task lock may not exist if task is already finished or never started
chat_logger.warning(f"[STOP-BUTTON] ⚠️ Task lock not found or already stopped, task_id: {id}, error: {str(e)}")
return Response(status_code=204)
@router.post("/chat/{id}/human-reply")
def human_reply(id: str, data: HumanReply):
chat_logger.info("Human reply received", extra={"task_id": id, "reply_length": len(data.reply)})
task_lock = get_task_lock(id)
asyncio.run(task_lock.put_human_input(data.agent, data.reply))
chat_logger.debug("Human reply processed", extra={"task_id": id})
return Response(status_code=201)
@router.post("/chat/{id}/install-mcp")
def install_mcp(id: str, data: McpServers):
chat_logger.info("Installing MCP servers", extra={"task_id": id, "servers_count": len(data.get("mcpServers", {}))})
task_lock = get_task_lock(id)
asyncio.run(task_lock.put_queue(ActionInstallMcpData(action=Action.install_mcp, data=data)))
chat_logger.info("MCP installation queued", extra={"task_id": id})
return Response(status_code=201)
@router.post("/chat/{id}/add-task", name="add task to workforce")
def add_task(id: str, data: AddTaskRequest):
"""Add a new task to the workforce"""
chat_logger.info(f"Adding task to workforce for task_id: {id}, content: {data.content[:100]}...")
task_lock = get_task_lock(id)
try:
# Queue the add task action
add_task_action = ActionAddTaskData(
content=data.content,
project_id=data.project_id,
task_id=data.task_id,
additional_info=data.additional_info,
insert_position=data.insert_position,
)
asyncio.run(task_lock.put_queue(add_task_action))
return Response(status_code=201)
except Exception as e:
chat_logger.error(f"Error adding task for task_id: {id}: {e}")
raise UserException(code.error, f"Failed to add task: {str(e)}")
@router.delete("/chat/{project_id}/remove-task/{task_id}", name="remove task from workforce")
def remove_task(project_id: str, task_id: str):
"""Remove a task from the workforce"""
chat_logger.info(f"Removing task {task_id} from workforce for project_id: {project_id}")
task_lock = get_task_lock(project_id)
try:
# Queue the remove task action
remove_task_action = ActionRemoveTaskData(task_id=task_id, project_id=project_id)
asyncio.run(task_lock.put_queue(remove_task_action))
chat_logger.info(f"Task removal request queued for project_id: {project_id}, removing task: {task_id}")
return Response(status_code=204)
except Exception as e:
chat_logger.error(f"Error removing task {task_id} for project_id: {project_id}: {e}")
raise UserException(code.error, f"Failed to remove task: {str(e)}")
@router.post("/chat/{project_id}/skip-task", name="skip task in workforce")
def skip_task(project_id: str):
"""
Skip/Stop current task execution while preserving context.
This endpoint is called when user clicks the Stop button.
Behavior:
- Stops workforce gracefully
- Marks task as done
- Preserves conversation_history and last_task_result in task_lock
- Sends 'end' event to frontend
- Keeps SSE connection alive for multi-turn conversation
"""
chat_logger.info("=" * 80)
chat_logger.info(f"🛑 [STOP-BUTTON] SKIP-TASK request received from frontend (User clicked Stop)")
chat_logger.info(f"[STOP-BUTTON] project_id: {project_id}")
chat_logger.info("=" * 80)
task_lock = get_task_lock(project_id)
chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}")
try:
# Queue the skip task action - this will preserve context for multi-turn
skip_task_action = ActionSkipTaskData(project_id=project_id)
chat_logger.info(f"[STOP-BUTTON] Queueing ActionSkipTaskData (preserves context, marks as done)")
asyncio.run(task_lock.put_queue(skip_task_action))
chat_logger.info(f"[STOP-BUTTON] ✅ Skip request queued - task will stop gracefully and preserve context")
return Response(status_code=201)
except Exception as e:
chat_logger.error(f"[STOP-BUTTON] Error skipping task for project_id: {project_id}: {e}")
raise UserException(code.error, f"Failed to skip task: {str(e)}")