eigent/backend/app/controller/file_controller.py
Tong Chen 6c827a3d06
refactor: establish Brain-centered architecture and frontend/backend separation foundations (#1597)
Co-authored-by: Douglas <douglas.ym.lai@gmail.com>
Co-authored-by: Douglas Lai <115660088+Douglasymlai@users.noreply.github.com>
2026-05-01 17:03:33 +08:00

319 lines
10 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import logging
import mimetypes
import re
import time
from pathlib import Path
from typing import Annotated
from urllib.parse import quote
from fastapi import APIRouter, File, Header, HTTPException, Query, UploadFile
from fastapi.responses import FileResponse
from app.component.environment import env
from app.utils.file_utils import list_files, resolve_under_base
router = APIRouter()
file_logger = logging.getLogger("file_controller")
# Config
MAX_FILE_SIZE_BYTES = 50 * 1024 * 1024 # 50MB
MAX_FILES_PER_SESSION = 20
WORKSPACE_ROOT = env("EIGENT_WORKSPACE", "~/.eigent/workspace")
SESSION_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
def _get_eigent_root() -> Path:
"""Base root for eigent storage (~/eigent). Do NOT use env file_save_path
here: chat overwrites it to task path, which would break list/stream."""
eigent = Path.home() / "eigent"
if eigent.exists():
return eigent
dot_eigent = Path.home() / ".eigent"
if dot_eigent.exists():
return dot_eigent
return eigent # default to ~/eigent
def _get_workspace_root() -> Path:
return Path(WORKSPACE_ROOT).expanduser()
def _validate_session_id(session_id: str) -> str:
normalized = (session_id or "").strip()
if not SESSION_ID_PATTERN.fullmatch(normalized):
raise ValueError("Invalid X-Session-ID")
return normalized
def _get_session_uploads_dir(session_id: str) -> Path:
root = _get_workspace_root().resolve()
validated = _validate_session_id(session_id)
uploads_dir = (root / validated / "uploads").resolve()
try:
uploads_dir.relative_to(root)
except ValueError as exc:
raise ValueError("Invalid X-Session-ID") from exc
return uploads_dir
def _count_session_uploads(session_id: str) -> int:
uploads_dir = _get_session_uploads_dir(session_id)
if not uploads_dir.exists():
return 0
return len(list(uploads_dir.iterdir()))
@router.post("/files")
async def upload_file(
file: Annotated[UploadFile, File()],
x_session_id: Annotated[str | None, Header(alias="X-Session-ID")] = None,
) -> dict:
"""
Upload file. Requires X-Session-ID header.
Returns file_id for message attachments reference.
"""
if not x_session_id:
raise HTTPException(
status_code=400,
detail="X-Session-ID header is required for file upload",
)
try:
validated_session_id = _validate_session_id(x_session_id)
except ValueError as exc:
raise HTTPException(
status_code=400, detail="Invalid X-Session-ID"
) from exc
# Check session file count limit
count = _count_session_uploads(validated_session_id)
if count >= MAX_FILES_PER_SESSION:
raise HTTPException(
status_code=429,
detail=f"Maximum {MAX_FILES_PER_SESSION} files per session",
)
# Read and validate size
content = await file.read()
if len(content) > MAX_FILE_SIZE_BYTES:
raise HTTPException(
status_code=413,
detail=f"File size exceeds {MAX_FILE_SIZE_BYTES // (1024 * 1024)}MB limit",
)
# Generate safe filename
timestamp = int(time.time() * 1000)
safe_name = "".join(
c if c.isalnum() or c in "._-" else "_"
for c in (file.filename or "file")
)
stored_name = f"{safe_name}_{timestamp}"
# Write to disk
uploads_dir = _get_session_uploads_dir(validated_session_id)
uploads_dir.mkdir(parents=True, exist_ok=True)
target_path = uploads_dir / stored_name
target_path.write_bytes(content)
file_id = f"upload://{stored_name}"
file_logger.info(
f"File uploaded: session={validated_session_id}, file_id={file_id}, size={len(content)}"
)
return {
"file_id": file_id,
"filename": file.filename or "file",
"size": len(content),
}
def _sanitize_email(email: str) -> str:
"""Sanitize email for use in path (match chat_controller logic)."""
return re.sub(r'[\\/*?:"<>|\s]', "_", email.split("@")[0]).strip(".")
def _normalize_relative_path(path: str) -> str:
"""Normalize relative path to URL-safe POSIX style."""
return path.replace("\\", "/")
def _get_project_root(email: str, project_id: str) -> Path:
"""Get project root path: ~/eigent/{email}/project_{project_id}/."""
root = _get_eigent_root()
email_sanitized = _sanitize_email(email)
return root / email_sanitized / f"project_{project_id}"
def _resolve_project_root(email: str, project_id: str) -> Path:
"""
Resolve project root, preferring the email-scoped path but falling back to
any local project_{project_id} directory when the stored email differs from
the current login identity.
"""
preferred = _get_project_root(email, project_id)
if preferred.exists():
return preferred
root = _get_eigent_root()
candidate_name = f"project_{project_id}"
try:
for child in root.iterdir():
if not child.is_dir():
continue
candidate = child / candidate_name
if candidate.exists():
file_logger.info(
"Resolved project root via fallback lookup: %s -> %s",
preferred,
candidate,
)
return candidate
except FileNotFoundError:
pass
except Exception as e:
file_logger.warning("project root fallback lookup failed: %s", e)
return preferred
@router.get("/files")
async def list_project_files(
project_id: str = Query(..., description="Project ID"),
email: str = Query(..., description="User email"),
task_id: str | None = Query(
None, description="Optional task ID to scope listing"
),
) -> list[dict]:
"""
List files in project working directory (Brain storage).
Used by Web mode when ipcRenderer is unavailable.
Returns [{filename, url}] where url can be used to fetch file content.
"""
if not project_id or not email:
raise HTTPException(
status_code=400,
detail="project_id and email are required",
)
project_root = _resolve_project_root(email, project_id)
list_dir = str(project_root)
if task_id:
list_dir = str(project_root / f"task_{task_id}")
if not Path(list_dir).exists():
file_logger.debug(
"list_project_files: path does not exist: %s",
list_dir,
)
return []
base_path = str(project_root.resolve())
try:
paths = list_files(list_dir, base=base_path, max_entries=500)
except Exception as e:
file_logger.warning("list_project_files failed: %s", e)
return []
result: list[dict] = []
for abs_path in paths:
try:
rel = _normalize_relative_path(
Path(abs_path).relative_to(base_path).as_posix()
)
# URL-encode the relative path for stream endpoint
path_param = quote(rel, safe="")
result.append(
{
"filename": Path(abs_path).name,
"url": f"/files/stream?path={path_param}&project_id={quote(project_id)}&email={quote(email)}",
"relativePath": rel,
}
)
except (ValueError, OSError):
continue
return result
@router.get("/files/stream")
async def stream_file(
path: str = Query(..., description="Relative path from project root"),
project_id: str = Query(..., description="Project ID"),
email: str = Query(..., description="User email"),
):
"""
Stream file content. Path must be relative to project root.
Used by Web mode to fetch file content for display.
"""
if not path or not project_id or not email:
raise HTTPException(
status_code=400,
detail="path, project_id and email are required",
)
project_root = _resolve_project_root(email, project_id)
# Resolve path and ensure it stays under project root (security)
try:
resolved = resolve_under_base(path, str(project_root.resolve()))
except Exception as e:
file_logger.warning("stream_file path validation failed: %s", e)
raise HTTPException(status_code=400, detail="Invalid path") from e
p = Path(resolved)
if not p.is_file():
raise HTTPException(status_code=404, detail="File not found")
media_type, _ = mimetypes.guess_type(str(p))
if not media_type:
media_type = "application/octet-stream"
# content_disposition_type=inline: display in iframe instead of triggering download
return FileResponse(
path=str(p),
filename=p.name,
media_type=media_type,
content_disposition_type="inline",
)
@router.get("/files/preview/{email}/{project_id}/{file_path:path}")
async def preview_file(
email: str,
project_id: str,
file_path: str,
):
"""
Preview file content with a path-based URL so relative references inside
HTML/CSS/JS resolve against the project directory structure.
"""
if not file_path or not project_id or not email:
raise HTTPException(
status_code=400,
detail="file_path, project_id and email are required",
)
project_root = _resolve_project_root(email, project_id)
try:
resolved = resolve_under_base(file_path, str(project_root.resolve()))
except Exception as e:
file_logger.warning("preview_file path validation failed: %s", e)
raise HTTPException(status_code=400, detail="Invalid path") from e
p = Path(resolved)
if not p.is_file():
raise HTTPException(status_code=404, detail="File not found")
media_type, _ = mimetypes.guess_type(str(p))
if not media_type:
media_type = "application/octet-stream"
return FileResponse(
path=str(p),
filename=p.name,
media_type=media_type,
content_disposition_type="inline",
)