mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-30 20:25:33 +00:00
update chat_snpshot.py
This commit is contained in:
parent
4a9f26a8a4
commit
d8367fe4e9
1 changed files with 74 additions and 56 deletions
|
|
@ -1,56 +1,74 @@
|
|||
from typing import Optional
|
||||
from sqlalchemy import Column, Integer, text
|
||||
from sqlmodel import Field
|
||||
from app.model.abstract.model import AbstractModel, DefaultTimes
|
||||
from pydantic import BaseModel
|
||||
import os
|
||||
import base64
|
||||
import time
|
||||
|
||||
from app.component.sqids import encode_user_id
|
||||
|
||||
|
||||
class ChatSnapshot(AbstractModel, DefaultTimes, table=True):
|
||||
id: int = Field(default=None, primary_key=True)
|
||||
user_id: int = Field(sa_column=(Column(Integer, server_default=text("0"))))
|
||||
api_task_id: str = Field(index=True)
|
||||
camel_task_id: str = Field(index=True)
|
||||
browser_url: str
|
||||
image_path: str
|
||||
|
||||
@classmethod
|
||||
def get_user_dir(cls, user_id: int) -> str:
|
||||
return os.path.join("app", "public", "upload", encode_user_id(user_id))
|
||||
|
||||
@classmethod
|
||||
def caclDir(cls, path: str) -> float:
|
||||
"""Return disk usage of path directory (in MB, rounded to 2 decimal places)"""
|
||||
total_size = 0
|
||||
for dirpath, dirnames, filenames in os.walk(path):
|
||||
for f in filenames:
|
||||
fp = os.path.join(dirpath, f)
|
||||
if os.path.isfile(fp):
|
||||
total_size += os.path.getsize(fp)
|
||||
size_mb = total_size / (1024 * 1024)
|
||||
return round(size_mb, 2)
|
||||
|
||||
|
||||
class ChatSnapshotIn(BaseModel):
|
||||
api_task_id: str
|
||||
user_id: Optional[int] = None
|
||||
camel_task_id: str
|
||||
browser_url: str
|
||||
image_base64: str
|
||||
|
||||
@staticmethod
|
||||
def save_image(user_id: int, api_task_id: str, image_base64: str) -> str:
|
||||
if "," in image_base64:
|
||||
image_base64 = image_base64.split(",", 1)[1]
|
||||
user_dir = encode_user_id(user_id)
|
||||
folder = os.path.join("app", "public", "upload", user_dir, api_task_id)
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
filename = f"{int(time.time() * 1000)}.jpg"
|
||||
file_path = os.path.join(folder, filename)
|
||||
with open(file_path, "wb") as f:
|
||||
f.write(base64.b64decode(image_base64))
|
||||
return f"/public/upload/{user_dir}/{api_task_id}/{filename}"
|
||||
from typing import Optional
|
||||
from sqlalchemy import Column, Integer, text
|
||||
from sqlmodel import Field
|
||||
from app.model.abstract.model import AbstractModel, DefaultTimes
|
||||
from pydantic import BaseModel
|
||||
import os
|
||||
import base64
|
||||
import time
|
||||
import re
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
|
||||
from app.component.sqids import encode_user_id
|
||||
|
||||
|
||||
class ChatSnapshot(AbstractModel, DefaultTimes, table=True):
|
||||
id: int = Field(default=None, primary_key=True)
|
||||
user_id: int = Field(sa_column=(Column(Integer, server_default=text("0"))))
|
||||
api_task_id: str = Field(index=True)
|
||||
camel_task_id: str = Field(index=True)
|
||||
browser_url: str
|
||||
image_path: str
|
||||
|
||||
@classmethod
|
||||
def get_user_dir(cls, user_id: int) -> str:
|
||||
return os.path.join("app", "public", "upload", encode_user_id(user_id))
|
||||
|
||||
@classmethod
|
||||
def caclDir(cls, path: str) -> float:
|
||||
"""Return disk usage of path directory (in MB, rounded to 2 decimal places)"""
|
||||
total_size = 0
|
||||
for dirpath, dirnames, filenames in os.walk(path):
|
||||
for f in filenames:
|
||||
fp = os.path.join(dirpath, f)
|
||||
if os.path.isfile(fp):
|
||||
total_size += os.path.getsize(fp)
|
||||
size_mb = total_size / (1024 * 1024)
|
||||
return round(size_mb, 2)
|
||||
|
||||
|
||||
class ChatSnapshotIn(BaseModel):
|
||||
api_task_id: str
|
||||
user_id: Optional[int] = None
|
||||
camel_task_id: str
|
||||
browser_url: str
|
||||
image_base64: str
|
||||
|
||||
@staticmethod
|
||||
def save_image(user_id: int, api_task_id: str, image_base64: str) -> str:
|
||||
if "," in image_base64:
|
||||
image_base64 = image_base64.split(",", 1)[1]
|
||||
|
||||
user_dir = encode_user_id(user_id)
|
||||
if os.path.basename(user_dir) != user_dir or not re.fullmatch(r"[A-Za-z0-9._-]{1,128}", user_dir or ""):
|
||||
raise ValueError("Invalid user_id")
|
||||
|
||||
base_dir = os.path.abspath(os.path.join("app", "public", "upload"))
|
||||
|
||||
# Keep api_task_id as part of the path but ensure it cannot traverse
|
||||
safe_api_task_id = os.path.basename(api_task_id)
|
||||
if safe_api_task_id != api_task_id or not re.fullmatch(r"[A-Za-z0-9._-]{1,128}", safe_api_task_id or ""):
|
||||
raise ValueError("Invalid api_task_id")
|
||||
|
||||
folder = os.path.normpath(os.path.join(base_dir, user_dir, safe_api_task_id))
|
||||
# Directory traversal guard: ensure final path stays under base_dir
|
||||
if not folder.startswith(base_dir + os.sep):
|
||||
raise ValueError("Unsafe upload path detected")
|
||||
|
||||
Path(folder).mkdir(parents=True, exist_ok=True)
|
||||
filename = f"{int(time.time() * 1000)}_{uuid4().hex}.jpg"
|
||||
file_path = os.path.join(folder, filename)
|
||||
with open(file_path, "wb") as f:
|
||||
f.write(base64.b64decode(image_base64))
|
||||
return f"/public/upload/{user_dir}/{safe_api_task_id}/{filename}"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue