mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-30 11:55:12 +00:00
Co-authored-by: 4pmtong <web_chentong@163.com> Co-authored-by: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com> Co-authored-by: Wendong-Fan <w3ndong.fan@gmail.com>
90 lines
3.4 KiB
Python
90 lines
3.4 KiB
Python
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
|
|
import time
|
|
import httpx
|
|
import asyncio
|
|
import os
|
|
import json
|
|
from app.service.chat_service import Chat
|
|
from app.component.environment import env
|
|
from app.service.task import get_task_lock_if_exists
|
|
from utils import traceroot_wrapper as traceroot
|
|
|
|
logger = traceroot.get_logger("sync_step")
|
|
|
|
|
|
def sync_step(func):
|
|
async def wrapper(*args, **kwargs):
|
|
server_url = env("SERVER_URL")
|
|
sync_url = server_url + "/chat/steps" if server_url else None
|
|
async for value in func(*args, **kwargs):
|
|
if not server_url:
|
|
yield value
|
|
continue
|
|
|
|
if isinstance(value, str) and value.startswith("data: "):
|
|
value_json_str = value[len("data: ") :].strip()
|
|
else:
|
|
value_json_str = value
|
|
|
|
try:
|
|
json_data = json.loads(value_json_str)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse JSON in sync_step: {e}. Value: {value_json_str}")
|
|
yield value
|
|
continue
|
|
|
|
if "step" not in json_data or "data" not in json_data:
|
|
logger.error(f"Missing 'step' or 'data' key in sync_step JSON. Keys: {list(json_data.keys())}")
|
|
yield value
|
|
continue
|
|
|
|
# Dynamic task_id extraction - prioritize runtime data over static args
|
|
chat: Chat = args[0] if args and hasattr(args[0], 'task_id') else None
|
|
task_id = None
|
|
|
|
if chat is not None:
|
|
task_lock = get_task_lock_if_exists(chat.project_id)
|
|
if task_lock is not None:
|
|
task_id = task_lock.current_task_id \
|
|
if hasattr(task_lock, 'current_task_id') and task_lock.current_task_id else chat.task_id
|
|
else:
|
|
logger.warning(f"Task lock not found for project_id {chat.project_id}, using chat.task_id")
|
|
task_id = chat.task_id
|
|
|
|
if task_id:
|
|
asyncio.create_task(
|
|
send_to_api(
|
|
sync_url,
|
|
{
|
|
"task_id": task_id,
|
|
"step": json_data["step"],
|
|
"data": json_data["data"],
|
|
"timestamp": time.time_ns() / 1_000_000_000,
|
|
},
|
|
)
|
|
)
|
|
yield value
|
|
|
|
return wrapper
|
|
|
|
|
|
async def send_to_api(url, data):
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
res = await client.post(url, json=data)
|
|
# logger.info(res)
|
|
except Exception as e:
|
|
logger.error(f"Failed to sync step to {url}: {type(e).__name__}: {e}")
|