mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-23 12:44:31 +00:00
Fixes agent0ai/agent-zero#1609. Issue: "Unauthenticated Path-Containment Bypass in Agent Zero `/api/image_get`" https://github.com/agent0ai/agent-zero/issues/1609 Resolve the path-containment bypass in /api/image_get by resolving requested images against the Agent Zero base directory before serving them, including symlink-aware validation and the development RFC fallback path. Harden SVG and SVGZ responses with nosniff and a sandboxed CSP so uploaded SVGs cannot execute scripts in the Agent Zero origin. Add focused regressions for outside paths, symlink escapes, SVG headers, and development-mode remote validation.
200 lines
6.5 KiB
Python
200 lines
6.5 KiB
Python
import base64
|
|
import os
|
|
from pathlib import Path
|
|
from urllib.parse import quote
|
|
from helpers.api import ApiHandler, Request, Response, send_file
|
|
from helpers import files, runtime
|
|
import io
|
|
from mimetypes import guess_type
|
|
|
|
|
|
IMAGE_EXTENSIONS = (
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".bmp",
|
|
".webp",
|
|
".svg",
|
|
".ico",
|
|
".svgz",
|
|
)
|
|
SVG_EXTENSIONS = (".svg", ".svgz")
|
|
SVG_CONTENT_SECURITY_POLICY = (
|
|
"sandbox; default-src 'none'; script-src 'none'; "
|
|
"img-src 'self' data:; style-src 'unsafe-inline'"
|
|
)
|
|
|
|
|
|
class ImageGet(ApiHandler):
|
|
|
|
@classmethod
|
|
def get_methods(cls) -> list[str]:
|
|
return ["GET"]
|
|
|
|
async def process(self, input: dict, request: Request) -> dict | Response:
|
|
# input data
|
|
path = input.get("path", request.args.get("path", ""))
|
|
|
|
if not path:
|
|
raise ValueError("No path provided")
|
|
|
|
# get file extension and info
|
|
file_ext = os.path.splitext(path)[1].lower()
|
|
filename = os.path.basename(path)
|
|
|
|
if file_ext in IMAGE_EXTENSIONS:
|
|
try:
|
|
local_path = _resolve_allowed_image_path(path)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=403, mimetype="text/plain")
|
|
|
|
# in development environment, try to serve the image from local file system if exists, otherwise from docker
|
|
if runtime.is_development():
|
|
if files.exists(local_path):
|
|
response = send_file(local_path)
|
|
else:
|
|
# Try fetching from Docker via RFC as fallback
|
|
try:
|
|
remote_path = await runtime.call_development_function(
|
|
_resolve_allowed_image_path, path
|
|
)
|
|
if await runtime.call_development_function(
|
|
files.exists, remote_path
|
|
):
|
|
b64_content = await runtime.call_development_function(
|
|
files.read_file_base64, remote_path
|
|
)
|
|
file_content = base64.b64decode(b64_content)
|
|
mime_type, _ = guess_type(filename)
|
|
if not mime_type:
|
|
mime_type = "application/octet-stream"
|
|
response = send_file(
|
|
io.BytesIO(file_content),
|
|
mimetype=mime_type,
|
|
as_attachment=False,
|
|
download_name=filename,
|
|
)
|
|
else:
|
|
response = _send_fallback_icon("image")
|
|
except Exception:
|
|
response = _send_fallback_icon("image")
|
|
else:
|
|
if files.exists(local_path):
|
|
response = send_file(local_path)
|
|
else:
|
|
response = _send_fallback_icon("image")
|
|
|
|
_set_image_headers(response, filename, file_ext)
|
|
return response
|
|
else:
|
|
# Handle non-image files with fallback icons
|
|
return _send_file_type_icon(file_ext, filename)
|
|
|
|
|
|
def _resolve_allowed_image_path(path: str) -> str:
|
|
"""Resolve a requested image path and keep it inside Agent Zero's base dir."""
|
|
|
|
if runtime.is_development():
|
|
candidate = Path(files.fix_dev_path(path))
|
|
else:
|
|
candidate = Path(files.get_abs_path(path))
|
|
|
|
if not candidate.is_absolute():
|
|
candidate = Path(files.get_base_dir()) / candidate
|
|
|
|
base_dir = Path(files.get_base_dir()).resolve()
|
|
resolved = candidate.resolve(strict=False)
|
|
|
|
try:
|
|
resolved.relative_to(base_dir)
|
|
except ValueError as exc:
|
|
raise ValueError("Path is outside of allowed directory") from exc
|
|
|
|
return str(resolved)
|
|
|
|
|
|
def _set_image_headers(response: Response, filename: str, file_ext: str) -> None:
|
|
# Add cache headers for better device sync performance.
|
|
response.headers["Cache-Control"] = "public, max-age=3600"
|
|
response.headers["X-File-Type"] = "image"
|
|
response.headers["X-File-Name"] = quote(filename)
|
|
response.headers["X-Content-Type-Options"] = "nosniff"
|
|
if file_ext in SVG_EXTENSIONS:
|
|
response.headers["Content-Security-Policy"] = SVG_CONTENT_SECURITY_POLICY
|
|
|
|
|
|
def _send_file_type_icon(file_ext, filename=None):
|
|
"""Return appropriate icon for file type"""
|
|
|
|
# Map file extensions to icon names
|
|
icon_mapping = {
|
|
# Archive files
|
|
".zip": "archive",
|
|
".rar": "archive",
|
|
".7z": "archive",
|
|
".tar": "archive",
|
|
".gz": "archive",
|
|
# Document files
|
|
".pdf": "document",
|
|
".doc": "document",
|
|
".docx": "document",
|
|
".txt": "document",
|
|
".rtf": "document",
|
|
".odt": "document",
|
|
# Code files
|
|
".py": "code",
|
|
".js": "code",
|
|
".html": "code",
|
|
".css": "code",
|
|
".json": "code",
|
|
".xml": "code",
|
|
".md": "code",
|
|
".yml": "code",
|
|
".yaml": "code",
|
|
".sql": "code",
|
|
".sh": "code",
|
|
".bat": "code",
|
|
# Spreadsheet files
|
|
".xls": "document",
|
|
".xlsx": "document",
|
|
".csv": "document",
|
|
# Presentation files
|
|
".ppt": "document",
|
|
".pptx": "document",
|
|
".odp": "document",
|
|
}
|
|
|
|
# Get icon name, default to 'file' if not found
|
|
icon_name = icon_mapping.get(file_ext, "file")
|
|
|
|
response = _send_fallback_icon(icon_name)
|
|
|
|
# Add headers for device sync
|
|
if hasattr(response, "headers"):
|
|
response.headers["Cache-Control"] = (
|
|
"public, max-age=86400" # Cache icons for 24 hours
|
|
)
|
|
response.headers["X-File-Type"] = "icon"
|
|
response.headers["X-Icon-Type"] = icon_name
|
|
if filename:
|
|
response.headers["X-File-Name"] = quote(filename)
|
|
|
|
return response
|
|
|
|
|
|
def _send_fallback_icon(icon_name):
|
|
"""Return fallback icon from public directory"""
|
|
|
|
# Path to public icons
|
|
icon_path = files.get_abs_path(f"webui/public/{icon_name}.svg")
|
|
|
|
# Check if specific icon exists, fallback to generic file icon
|
|
if not os.path.exists(icon_path):
|
|
icon_path = files.get_abs_path("webui/public/file.svg")
|
|
|
|
# Final fallback if file.svg doesn't exist
|
|
if not os.path.exists(icon_path):
|
|
raise ValueError(f"Fallback icon not found: {icon_path}")
|
|
|
|
return send_file(icon_path, mimetype="image/svg+xml")
|