update server logs

This commit is contained in:
Saedbhati 2025-10-16 21:05:09 +05:30
parent 6eed287dcb
commit 40779bf1e5
43 changed files with 2354 additions and 1808 deletions

View file

@ -6,7 +6,7 @@ from fastapi_babel import _
from sqlmodel import Session, select, desc
from app.component.auth import Auth, auth_must
from app.component.database import session
import traceroot
from utils import traceroot_wrapper as traceroot
logger = traceroot.get_logger("server_chat_history")
@ -16,38 +16,59 @@ router = APIRouter(prefix="/chat", tags=["Chat History"])
@router.post("/history", name="save chat history", response_model=ChatHistoryOut)
@traceroot.trace()
def create_chat_history(data: ChatHistoryIn, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
logger.info(f"Creating chat history for user {auth.user.id}, task_id: {data.task_id}")
data.user_id = auth.user.id
chat_history = ChatHistory(**data.model_dump())
session.add(chat_history)
session.commit()
session.refresh(chat_history)
logger.info(f"Chat history created: {chat_history.id}")
return chat_history
"""Save new chat history."""
user_id = auth.user.id
try:
data.user_id = user_id
chat_history = ChatHistory(**data.model_dump())
session.add(chat_history)
session.commit()
session.refresh(chat_history)
logger.info("Chat history created", extra={"user_id": user_id, "history_id": chat_history.id, "task_id": data.task_id})
return chat_history
except Exception as e:
session.rollback()
logger.error("Chat history creation failed", extra={"user_id": user_id, "task_id": data.task_id, "error": str(e)}, exc_info=True)
raise
@router.get("/histories", name="get chat history")
@traceroot.trace()
def list_chat_history(session: Session = Depends(session), auth: Auth = Depends(auth_must)) -> Page[ChatHistoryOut]:
logger.info(f"Listing chat histories for user {auth.user.id}")
stmt = select(ChatHistory).where(ChatHistory.user_id == auth.user.id).order_by(desc(ChatHistory.created_at))
"""List chat histories for current user."""
user_id = auth.user.id
stmt = select(ChatHistory).where(ChatHistory.user_id == user_id).order_by(desc(ChatHistory.created_at))
result = paginate(session, stmt)
logger.debug(f"Found {result.total if hasattr(result, 'total') else 'N/A'} chat histories")
total = result.total if hasattr(result, 'total') else 0
logger.debug("Chat histories listed", extra={"user_id": user_id, "total": total})
return result
@router.delete("/history/{history_id}", name="delete chat history")
@traceroot.trace()
def delete_chat_history(history_id: str, session: Session = Depends(session)):
logger.info(f"Deleting chat history: {history_id}")
def delete_chat_history(history_id: str, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
"""Delete chat history."""
user_id = auth.user.id
history = session.exec(select(ChatHistory).where(ChatHistory.id == history_id)).first()
if not history:
logger.warning(f"Chat history not found: {history_id}")
logger.warning("Chat history not found for deletion", extra={"user_id": user_id, "history_id": history_id})
raise HTTPException(status_code=404, detail="Chat History not found")
session.delete(history)
session.commit()
logger.info(f"Chat history deleted: {history_id}")
return Response(status_code=204)
if history.user_id != user_id:
logger.warning("Unauthorized deletion attempt", extra={"user_id": user_id, "history_id": history_id, "owner_id": history.user_id})
raise HTTPException(status_code=403, detail="You are not allowed to delete this chat history")
try:
session.delete(history)
session.commit()
logger.info("Chat history deleted", extra={"user_id": user_id, "history_id": history_id})
return Response(status_code=204)
except Exception as e:
session.rollback()
logger.error("Chat history deletion failed", extra={"user_id": user_id, "history_id": history_id, "error": str(e)}, exc_info=True)
raise
@router.put("/history/{history_id}", name="update chat history", response_model=ChatHistoryOut)
@ -55,18 +76,25 @@ def delete_chat_history(history_id: str, session: Session = Depends(session)):
def update_chat_history(
history_id: int, data: ChatHistoryUpdate, session: Session = Depends(session), auth: Auth = Depends(auth_must)
):
logger.info(f"Updating chat history: {history_id} for user {auth.user.id}")
"""Update chat history."""
user_id = auth.user.id
history = session.exec(select(ChatHistory).where(ChatHistory.id == history_id)).first()
if not history:
logger.warning(f"Chat history not found: {history_id}")
logger.warning("Chat history not found for update", extra={"user_id": user_id, "history_id": history_id})
raise HTTPException(status_code=404, detail="Chat History not found")
if history.user_id != auth.user.id:
logger.warning(f"Unauthorized update attempt on history {history_id} by user {auth.user.id}")
if history.user_id != user_id:
logger.warning("Unauthorized update attempt", extra={"user_id": user_id, "history_id": history_id, "owner_id": history.user_id})
raise HTTPException(status_code=403, detail="You are not allowed to update this chat history")
update_data = data.model_dump(exclude_unset=True)
logger.debug(f"Update data: {list(update_data.keys())}")
history.update_fields(update_data)
history.save(session)
session.refresh(history)
logger.info(f"Chat history updated: {history_id}")
return history
try:
update_data = data.model_dump(exclude_unset=True)
history.update_fields(update_data)
history.save(session)
session.refresh(history)
logger.info("Chat history updated", extra={"user_id": user_id, "history_id": history_id, "fields_updated": list(update_data.keys())})
return history
except Exception as e:
logger.error("Chat history update failed", extra={"user_id": user_id, "history_id": history_id, "error": str(e)}, exc_info=True)
raise

View file

@ -1,78 +1,107 @@
from fastapi import APIRouter, Depends, HTTPException, Response
from sqlmodel import Session, asc, select
from app.component.database import session
import json
import asyncio
from itsdangerous import SignatureExpired, BadTimeSignature
from starlette.responses import StreamingResponse
from app.model.chat.chat_share import ChatHistoryShareOut, ChatShare, ChatShareIn
from app.model.chat.chat_step import ChatStep
from app.model.chat.chat_history import ChatHistory
router = APIRouter(prefix="/chat", tags=["Chat Share"])
@router.get("/share/info/{token}", name="Get shared chat info", response_model=ChatHistoryShareOut)
def get_share_info(token: str, session: Session = Depends(session)):
"""
Get shared chat history info by token, excluding sensitive data.
"""
try:
task_id = ChatShare.verify_token(token, False)
except (SignatureExpired, BadTimeSignature):
raise HTTPException(status_code=400, detail="Share link is invalid or has expired.")
stmt = select(ChatHistory).where(ChatHistory.task_id == task_id)
history = session.exec(stmt).one_or_none()
if not history:
raise HTTPException(status_code=404, detail="Chat history not found.")
return history
@router.get("/share/playback/{token}", name="Playback shared chat via SSE")
async def share_playback(token: str, session: Session = Depends(session), delay_time: float = 0):
"""
Playbacks the chat history via a sharing token (SSE).
delay_time: control sse interval, max 5 seconds
"""
if delay_time > 5:
delay_time = 5
try:
task_id = ChatShare.verify_token(token, False)
except SignatureExpired:
raise HTTPException(status_code=400, detail="Share link has expired.")
except BadTimeSignature:
raise HTTPException(status_code=400, detail="Share link is invalid.")
async def event_generator():
stmt = select(ChatStep).where(ChatStep.task_id == task_id).order_by(asc(ChatStep.id))
steps = session.exec(stmt).all()
if not steps:
yield f"data: {json.dumps({'error': 'No steps found for this task.'})}\n\n"
return
for step in steps:
step_data = {
"id": step.id,
"task_id": step.task_id,
"step": step.step,
"data": step.data,
"created_at": step.created_at.isoformat() if step.created_at else None,
}
yield f"data: {json.dumps(step_data)}\n\n"
if delay_time > 0 and step.step != "create_agent":
await asyncio.sleep(delay_time)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.post("/share", name="Generate sharable link for a task(1 day expiration)")
def create_share_link(data: ChatShareIn):
"""
Generates a sharing token with an expiration time for the specified task_id.
"""
share_token = ChatShare.generate_token(data.task_id)
return {"share_token": share_token}
from fastapi import APIRouter, Depends, HTTPException, Response
from sqlmodel import Session, asc, select
from app.component.database import session
import json
import asyncio
from itsdangerous import SignatureExpired, BadTimeSignature
from starlette.responses import StreamingResponse
from app.model.chat.chat_share import ChatHistoryShareOut, ChatShare, ChatShareIn
from app.model.chat.chat_step import ChatStep
from app.model.chat.chat_history import ChatHistory
from utils import traceroot_wrapper as traceroot
logger = traceroot.get_logger("server_chat_share")
router = APIRouter(prefix="/chat", tags=["Chat Share"])
@router.get("/share/info/{token}", name="Get shared chat info", response_model=ChatHistoryShareOut)
@traceroot.trace()
def get_share_info(token: str, session: Session = Depends(session)):
"""
Get shared chat history info by token, excluding sensitive data.
"""
try:
task_id = ChatShare.verify_token(token, False)
except SignatureExpired:
logger.warning("Shared chat access failed: token expired", extra={"token_prefix": token[:10]})
raise HTTPException(status_code=400, detail="Share link is invalid or has expired.")
except BadTimeSignature:
logger.warning("Shared chat access failed: invalid token", extra={"token_prefix": token[:10]})
raise HTTPException(status_code=400, detail="Share link is invalid or has expired.")
stmt = select(ChatHistory).where(ChatHistory.task_id == task_id)
history = session.exec(stmt).one_or_none()
if not history:
logger.warning("Shared chat not found", extra={"task_id": task_id})
raise HTTPException(status_code=404, detail="Chat history not found.")
logger.info("Shared chat info accessed", extra={"task_id": task_id})
return history
@router.get("/share/playback/{token}", name="Playback shared chat via SSE")
@traceroot.trace()
async def share_playback(token: str, session: Session = Depends(session), delay_time: float = 0):
"""
Playbacks the chat history via a sharing token (SSE).
delay_time: control sse interval, max 5 seconds
"""
if delay_time > 5:
logger.debug("Delay time capped", extra={"requested": delay_time, "capped": 5})
delay_time = 5
try:
task_id = ChatShare.verify_token(token, False)
except SignatureExpired:
logger.warning("Shared chat playback failed: token expired", extra={"token_prefix": token[:10]})
raise HTTPException(status_code=400, detail="Share link has expired.")
except BadTimeSignature:
logger.warning("Shared chat playback failed: invalid token", extra={"token_prefix": token[:10]})
raise HTTPException(status_code=400, detail="Share link is invalid.")
async def event_generator():
try:
stmt = select(ChatStep).where(ChatStep.task_id == task_id).order_by(asc(ChatStep.id))
steps = session.exec(stmt).all()
if not steps:
logger.warning("No steps found for playback", extra={"task_id": task_id})
yield f"data: {json.dumps({'error': 'No steps found for this task.'})}\n\n"
return
logger.info("Shared chat playback started", extra={"task_id": task_id, "step_count": len(steps), "delay_time": delay_time})
for idx, step in enumerate(steps, start=1):
step_data = {
"id": step.id,
"task_id": step.task_id,
"step": step.step,
"data": step.data,
"created_at": step.created_at.isoformat() if step.created_at else None,
}
yield f"data: {json.dumps(step_data)}\n\n"
if delay_time > 0 and step.step != "create_agent":
await asyncio.sleep(delay_time)
logger.info("Shared chat playback completed", extra={"task_id": task_id, "step_count": len(steps)})
except Exception as e:
logger.error("Shared chat playback error", extra={"task_id": task_id, "error": str(e)}, exc_info=True)
yield f"data: {json.dumps({'error': 'Playback error occurred.'})}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.post("/share", name="Generate sharable link for a task(1 day expiration)")
@traceroot.trace()
def create_share_link(data: ChatShareIn):
"""Generate sharing token with 1-day expiration for task."""
try:
share_token = ChatShare.generate_token(data.task_id)
logger.info("Share link created", extra={"task_id": data.task_id, "token_prefix": share_token[:10]})
return {"share_token": share_token}
except Exception as e:
logger.error("Share link creation failed", extra={"task_id": data.task_id, "error": str(e)}, exc_info=True)
raise

View file

@ -1,81 +1,138 @@
from app.model.chat.chat_snpshot import ChatSnapshot, ChatSnapshotIn
from typing import List, Optional
from fastapi import Depends, HTTPException, Response, APIRouter
from sqlmodel import Session, select
from app.component.database import session
from app.component.auth import Auth, auth_must
from fastapi_babel import _
router = APIRouter(prefix="/chat", tags=["Chat Snapshot Management"])
@router.get("/snapshots", name="list chat snapshots", response_model=List[ChatSnapshot])
async def list_chat_snapshots(
api_task_id: Optional[str] = None,
camel_task_id: Optional[str] = None,
browser_url: Optional[str] = None,
session: Session = Depends(session),
):
query = select(ChatSnapshot)
if api_task_id is not None:
query = query.where(ChatSnapshot.api_task_id == api_task_id)
if camel_task_id is not None:
query = query.where(ChatSnapshot.camel_task_id == camel_task_id)
if browser_url is not None:
query = query.where(ChatSnapshot.browser_url == browser_url)
snapshots = session.exec(query).all()
return snapshots
@router.get("/snapshots/{snapshot_id}", name="get chat snapshot", response_model=ChatSnapshot)
async def get_chat_snapshot(snapshot_id: int, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
snapshot = session.get(ChatSnapshot, snapshot_id)
if not snapshot:
raise HTTPException(status_code=404, detail=_("Chat snapshot not found"))
return snapshot
@router.post("/snapshots", name="create chat snapshot", response_model=ChatSnapshot)
async def create_chat_snapshot(
snapshot: ChatSnapshotIn, auth: Auth = Depends(auth_must), session: Session = Depends(session)
):
image_path = ChatSnapshotIn.save_image(auth.user.id, snapshot.api_task_id, snapshot.image_base64)
chat_snapshot = ChatSnapshot(
user_id=auth.user.id,
api_task_id=snapshot.api_task_id,
camel_task_id=snapshot.camel_task_id,
browser_url=snapshot.browser_url,
image_path=image_path,
)
session.add(chat_snapshot)
session.commit()
session.refresh(chat_snapshot)
return Response(status_code=200)
@router.put("/snapshots/{snapshot_id}", name="update chat snapshot", response_model=ChatSnapshot)
async def update_chat_snapshot(
snapshot_id: int,
snapshot_update: ChatSnapshot,
session: Session = Depends(session),
auth: Auth = Depends(auth_must),
):
db_snapshot = session.get(ChatSnapshot, snapshot_id)
if not db_snapshot:
raise HTTPException(status_code=404, detail=_("Chat snapshot not found"))
for key, value in snapshot_update.dict(exclude_unset=True).items():
setattr(db_snapshot, key, value)
session.add(db_snapshot)
session.commit()
session.refresh(db_snapshot)
return db_snapshot
@router.delete("/snapshots/{snapshot_id}", name="delete chat snapshot")
async def delete_chat_snapshot(snapshot_id: int, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
db_snapshot = session.get(ChatSnapshot, snapshot_id)
if not db_snapshot:
raise HTTPException(status_code=404, detail=_("Chat snapshot not found"))
session.delete(db_snapshot)
session.commit()
return Response(status_code=204)
from app.model.chat.chat_snpshot import ChatSnapshot, ChatSnapshotIn
from typing import List, Optional
from fastapi import Depends, HTTPException, Response, APIRouter
from sqlmodel import Session, select
from app.component.database import session
from app.component.auth import Auth, auth_must
from fastapi_babel import _
from utils import traceroot_wrapper as traceroot
logger = traceroot.get_logger("server_chat_snapshot")
router = APIRouter(prefix="/chat", tags=["Chat Snapshot Management"])
@router.get("/snapshots", name="list chat snapshots", response_model=List[ChatSnapshot])
@traceroot.trace()
async def list_chat_snapshots(
api_task_id: Optional[str] = None,
camel_task_id: Optional[str] = None,
browser_url: Optional[str] = None,
session: Session = Depends(session),
):
"""List chat snapshots with optional filtering."""
query = select(ChatSnapshot)
if api_task_id is not None:
query = query.where(ChatSnapshot.api_task_id == api_task_id)
if camel_task_id is not None:
query = query.where(ChatSnapshot.camel_task_id == camel_task_id)
if browser_url is not None:
query = query.where(ChatSnapshot.browser_url == browser_url)
snapshots = session.exec(query).all()
logger.debug("Snapshots listed", extra={"api_task_id": api_task_id, "camel_task_id": camel_task_id, "count": len(snapshots)})
return snapshots
@router.get("/snapshots/{snapshot_id}", name="get chat snapshot", response_model=ChatSnapshot)
@traceroot.trace()
async def get_chat_snapshot(snapshot_id: int, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
"""Get specific chat snapshot."""
user_id = auth.user.id
snapshot = session.get(ChatSnapshot, snapshot_id)
if not snapshot:
logger.warning("Snapshot not found", extra={"user_id": user_id, "snapshot_id": snapshot_id})
raise HTTPException(status_code=404, detail=_("Chat snapshot not found"))
logger.debug("Snapshot retrieved", extra={"user_id": user_id, "snapshot_id": snapshot_id, "api_task_id": snapshot.api_task_id})
return snapshot
@router.post("/snapshots", name="create chat snapshot", response_model=ChatSnapshot)
@traceroot.trace()
async def create_chat_snapshot(
snapshot: ChatSnapshotIn, auth: Auth = Depends(auth_must), session: Session = Depends(session)
):
"""Create new chat snapshot from image."""
user_id = auth.user.id
try:
image_path = ChatSnapshotIn.save_image(user_id, snapshot.api_task_id, snapshot.image_base64)
chat_snapshot = ChatSnapshot(
user_id=user_id,
api_task_id=snapshot.api_task_id,
camel_task_id=snapshot.camel_task_id,
browser_url=snapshot.browser_url,
image_path=image_path,
)
session.add(chat_snapshot)
session.commit()
session.refresh(chat_snapshot)
logger.info("Snapshot created", extra={"user_id": user_id, "snapshot_id": chat_snapshot.id, "api_task_id": snapshot.api_task_id, "image_path": image_path})
return chat_snapshot
except Exception as e:
session.rollback()
logger.error("Snapshot creation failed", extra={"user_id": user_id, "api_task_id": snapshot.api_task_id, "error": str(e)}, exc_info=True)
raise
@router.put("/snapshots/{snapshot_id}", name="update chat snapshot", response_model=ChatSnapshot)
@traceroot.trace()
async def update_chat_snapshot(
snapshot_id: int,
snapshot_update: ChatSnapshot,
session: Session = Depends(session),
auth: Auth = Depends(auth_must),
):
"""Update chat snapshot."""
user_id = auth.user.id
db_snapshot = session.get(ChatSnapshot, snapshot_id)
if not db_snapshot:
logger.warning("Snapshot not found for update", extra={"user_id": user_id, "snapshot_id": snapshot_id})
raise HTTPException(status_code=404, detail=_("Chat snapshot not found"))
if db_snapshot.user_id != user_id:
logger.warning("Unauthorized snapshot update", extra={"user_id": user_id, "snapshot_id": snapshot_id, "owner_id": db_snapshot.user_id})
raise HTTPException(status_code=403, detail=_("You are not allowed to update this snapshot"))
try:
update_data = snapshot_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_snapshot, key, value)
session.add(db_snapshot)
session.commit()
session.refresh(db_snapshot)
logger.info("Snapshot updated", extra={"user_id": user_id, "snapshot_id": snapshot_id, "fields_updated": list(update_data.keys())})
return db_snapshot
except Exception as e:
session.rollback()
logger.error("Snapshot update failed", extra={"user_id": user_id, "snapshot_id": snapshot_id, "error": str(e)}, exc_info=True)
raise
@router.delete("/snapshots/{snapshot_id}", name="delete chat snapshot")
@traceroot.trace()
async def delete_chat_snapshot(snapshot_id: int, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
"""Delete chat snapshot."""
user_id = auth.user.id
db_snapshot = session.get(ChatSnapshot, snapshot_id)
if not db_snapshot:
logger.warning("Snapshot not found for deletion", extra={"user_id": user_id, "snapshot_id": snapshot_id})
raise HTTPException(status_code=404, detail=_("Chat snapshot not found"))
if db_snapshot.user_id != user_id:
logger.warning("Unauthorized snapshot deletion", extra={"user_id": user_id, "snapshot_id": snapshot_id, "owner_id": db_snapshot.user_id})
raise HTTPException(status_code=403, detail=_("You are not allowed to delete this snapshot"))
try:
session.delete(db_snapshot)
session.commit()
logger.info("Snapshot deleted", extra={"user_id": user_id, "snapshot_id": snapshot_id, "image_path": db_snapshot.image_path})
return Response(status_code=204)
except Exception as e:
session.rollback()
logger.error("Snapshot deletion failed", extra={"user_id": user_id, "snapshot_id": snapshot_id, "error": str(e)}, exc_info=True)
raise

View file

@ -1,105 +1,163 @@
import asyncio
import json
from typing import List, Optional
from fastapi import Depends, HTTPException, Query, Response, APIRouter
from fastapi.responses import StreamingResponse
from sqlmodel import Session, asc, select
from app.component.database import session
from app.component.auth import Auth, auth_must
from fastapi_babel import _
from app.model.chat.chat_step import ChatStep, ChatStepOut, ChatStepIn
router = APIRouter(prefix="/chat", tags=["Chat Step Management"])
@router.get("/steps", name="list chat steps", response_model=List[ChatStepOut])
async def list_chat_steps(
task_id: str, step: Optional[str] = None, session: Session = Depends(session), auth: Auth = Depends(auth_must)
):
query = select(ChatStep)
if task_id is not None:
query = query.where(ChatStep.task_id == task_id)
if step is not None:
query = query.where(ChatStep.step == step)
chat_steps = session.exec(query).all()
return chat_steps
@router.get("/steps/playback/{task_id}", name="Playback Chat Step via SSE")
async def share_playback(
task_id: str, delay_time: float = 0, session: Session = Depends(session), auth: Auth = Depends(auth_must)
):
"""
Playbacks the chat steps (SSE).
"""
if delay_time > 5:
delay_time = 5
async def event_generator():
stmt = select(ChatStep).where(ChatStep.task_id == task_id).order_by(asc(ChatStep.id))
steps = session.exec(stmt).all()
if not steps:
yield f"data: {json.dumps({'error': 'No steps found for this task.'})}\n\n"
return
for step in steps:
step_data = {
"id": step.id,
"task_id": step.task_id,
"step": step.step,
"data": step.data,
"created_at": step.created_at.isoformat() if step.created_at else None,
}
yield f"data: {json.dumps(step_data)}\n\n"
if delay_time > 0:
await asyncio.sleep(delay_time)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.get("/steps/{step_id}", name="get chat step", response_model=ChatStepOut)
async def get_chat_step(step_id: int, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
chat_step = session.get(ChatStep, step_id)
if not chat_step:
raise HTTPException(status_code=404, detail=_("Chat step not found"))
return chat_step
@router.post("/steps", name="create chat step")
# TODO Limit request sources
async def create_chat_step(step: ChatStepIn, session: Session = Depends(session)):
chat_step = ChatStep(
task_id=step.task_id,
step=step.step,
data=step.data,
)
session.add(chat_step)
session.commit()
session.refresh(chat_step)
return {"code": 200, "msg": "success"}
@router.put("/steps/{step_id}", name="update chat step", response_model=ChatStepOut)
async def update_chat_step(
step_id: int, chat_step_update: ChatStep, session: Session = Depends(session), auth: Auth = Depends(auth_must)
):
db_chat_step = session.get(ChatStep, step_id)
if not db_chat_step:
raise HTTPException(status_code=404, detail=_("Chat step not found"))
for key, value in chat_step_update.dict(exclude_unset=True).items():
setattr(db_chat_step, key, value)
session.add(db_chat_step)
session.commit()
session.refresh(db_chat_step)
return db_chat_step
@router.delete("/steps/{step_id}", name="delete chat step")
async def delete_chat_step(step_id: int, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
db_chat_step = session.get(ChatStep, step_id)
if not db_chat_step:
raise HTTPException(status_code=404, detail=_("Chat step not found"))
session.delete(db_chat_step)
session.commit()
return Response(status_code=204)
import asyncio
import json
from typing import List, Optional
from fastapi import Depends, HTTPException, Query, Response, APIRouter
from fastapi.responses import StreamingResponse
from sqlmodel import Session, asc, select
from app.component.database import session
from app.component.auth import Auth, auth_must
from fastapi_babel import _
from app.model.chat.chat_step import ChatStep, ChatStepOut, ChatStepIn
from utils import traceroot_wrapper as traceroot
logger = traceroot.get_logger("server_chat_step")
router = APIRouter(prefix="/chat", tags=["Chat Step Management"])
@router.get("/steps", name="list chat steps", response_model=List[ChatStepOut])
@traceroot.trace()
async def list_chat_steps(
task_id: str, step: Optional[str] = None, session: Session = Depends(session), auth: Auth = Depends(auth_must)
):
"""List chat steps for a task with optional step type filtering."""
user_id = auth.user.id
query = select(ChatStep)
if task_id is not None:
query = query.where(ChatStep.task_id == task_id)
if step is not None:
query = query.where(ChatStep.step == step)
chat_steps = session.exec(query).all()
logger.debug("Chat steps listed", extra={"user_id": user_id, "task_id": task_id, "step_type": step, "count": len(chat_steps)})
return chat_steps
@router.get("/steps/playback/{task_id}", name="Playback Chat Step via SSE")
@traceroot.trace()
async def share_playback(
task_id: str, delay_time: float = 0, session: Session = Depends(session), auth: Auth = Depends(auth_must)
):
"""Playback chat steps via SSE stream."""
user_id = auth.user.id
if delay_time > 5:
logger.debug("Delay time capped", extra={"user_id": user_id, "task_id": task_id, "requested": delay_time, "capped": 5})
delay_time = 5
async def event_generator():
try:
stmt = select(ChatStep).where(ChatStep.task_id == task_id).order_by(asc(ChatStep.id))
steps = session.exec(stmt).all()
if not steps:
logger.warning("No steps found for playback", extra={"user_id": user_id, "task_id": task_id})
yield f"data: {json.dumps({'error': 'No steps found for this task.'})}\n\n"
return
logger.info("Chat step playback started", extra={"user_id": user_id, "task_id": task_id, "step_count": len(steps), "delay_time": delay_time})
for step in steps:
step_data = {
"id": step.id,
"task_id": step.task_id,
"step": step.step,
"data": step.data,
"created_at": step.created_at.isoformat() if step.created_at else None,
}
yield f"data: {json.dumps(step_data)}\n\n"
if delay_time > 0:
await asyncio.sleep(delay_time)
logger.info("Chat step playback completed", extra={"user_id": user_id, "task_id": task_id, "step_count": len(steps)})
except Exception as e:
logger.error("Chat step playback error", extra={"user_id": user_id, "task_id": task_id, "error": str(e)}, exc_info=True)
yield f"data: {json.dumps({'error': 'Playback error occurred.'})}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.get("/steps/{step_id}", name="get chat step", response_model=ChatStepOut)
@traceroot.trace()
async def get_chat_step(step_id: int, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
"""Get specific chat step."""
user_id = auth.user.id
chat_step = session.get(ChatStep, step_id)
if not chat_step:
logger.warning("Chat step not found", extra={"user_id": user_id, "step_id": step_id})
raise HTTPException(status_code=404, detail=_("Chat step not found"))
logger.debug("Chat step retrieved", extra={"user_id": user_id, "step_id": step_id, "task_id": chat_step.task_id})
return chat_step
@router.post("/steps", name="create chat step")
@traceroot.trace()
async def create_chat_step(step: ChatStepIn, session: Session = Depends(session)):
"""Create new chat step. TODO: Implement request source validation."""
try:
chat_step = ChatStep(
task_id=step.task_id,
step=step.step,
data=step.data,
)
session.add(chat_step)
session.commit()
session.refresh(chat_step)
logger.info("Chat step created", extra={"step_id": chat_step.id, "task_id": step.task_id, "step_type": step.step})
return {"code": 200, "msg": "success"}
except Exception as e:
session.rollback()
logger.error("Chat step creation failed", extra={"task_id": step.task_id, "step_type": step.step, "error": str(e)}, exc_info=True)
raise
@router.put("/steps/{step_id}", name="update chat step", response_model=ChatStepOut)
@traceroot.trace()
async def update_chat_step(
step_id: int, chat_step_update: ChatStep, session: Session = Depends(session), auth: Auth = Depends(auth_must)
):
"""Update chat step."""
user_id = auth.user.id
db_chat_step = session.get(ChatStep, step_id)
if not db_chat_step:
logger.warning("Chat step not found for update", extra={"user_id": user_id, "step_id": step_id})
raise HTTPException(status_code=404, detail=_("Chat step not found"))
try:
update_data = chat_step_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_chat_step, key, value)
session.add(db_chat_step)
session.commit()
session.refresh(db_chat_step)
logger.info("Chat step updated", extra={"user_id": user_id, "step_id": step_id, "task_id": db_chat_step.task_id, "fields_updated": list(update_data.keys())})
return db_chat_step
except Exception as e:
session.rollback()
logger.error("Chat step update failed", extra={"user_id": user_id, "step_id": step_id, "error": str(e)}, exc_info=True)
raise
@router.delete("/steps/{step_id}", name="delete chat step")
@traceroot.trace()
async def delete_chat_step(step_id: int, session: Session = Depends(session), auth: Auth = Depends(auth_must)):
"""Delete chat step."""
user_id = auth.user.id
db_chat_step = session.get(ChatStep, step_id)
if not db_chat_step:
logger.warning("Chat step not found for deletion", extra={"user_id": user_id, "step_id": step_id})
raise HTTPException(status_code=404, detail=_("Chat step not found"))
try:
session.delete(db_chat_step)
session.commit()
logger.info("Chat step deleted", extra={"user_id": user_id, "step_id": step_id, "task_id": db_chat_step.task_id})
return Response(status_code=204)
except Exception as e:
session.rollback()
logger.error("Chat step deletion failed", extra={"user_id": user_id, "step_id": step_id, "error": str(e)}, exc_info=True)
raise