mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-17 04:01:18 +00:00
Co-authored-by: bytecii <bytecii@users.noreply.github.com> Co-authored-by: Tong Chen <web_chentong@163.com>
139 lines
4.9 KiB
Python
139 lines
4.9 KiB
Python
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
import httpx
|
|
|
|
from benchmark.environment import BenchmarkData, ModelKwargs
|
|
|
|
|
|
class BenchmarkClient:
|
|
"""Client for running benchmarks against the Eigent API."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
base_url: str = "http://localhost:5001",
|
|
timeout: int = 600,
|
|
auto_reply_human: str | None = "yes, proceed",
|
|
verbose: bool = False,
|
|
):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.timeout = timeout
|
|
self.auto_reply_human = auto_reply_human
|
|
self.verbose = verbose
|
|
self.client = httpx.AsyncClient(timeout=timeout)
|
|
|
|
async def run(
|
|
self,
|
|
data: BenchmarkData,
|
|
model_kwargs: ModelKwargs | None = None,
|
|
verbose: bool | None = None,
|
|
) -> list[dict]:
|
|
"""Run a single benchmark and return all events."""
|
|
chat = data.to_chat(model_kwargs or ModelKwargs())
|
|
payload = chat.model_dump()
|
|
|
|
should_print = verbose if verbose is not None else self.verbose
|
|
events = []
|
|
task_started = False
|
|
last_step = None
|
|
|
|
async with self.client.stream(
|
|
"POST", f"{self.base_url}/chat", json=payload
|
|
) as response:
|
|
response.raise_for_status()
|
|
|
|
async for line in response.aiter_lines():
|
|
if not line.strip() or not line.startswith("data: "):
|
|
continue
|
|
|
|
try:
|
|
event = json.loads(line[6:])
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
events.append(event)
|
|
step = event.get("step")
|
|
if should_print:
|
|
last_step = self._print_event(event, last_step)
|
|
|
|
if step == "to_sub_tasks" and not task_started:
|
|
task_started = True
|
|
asyncio.create_task(self._start_task(chat.project_id))
|
|
|
|
if step == "ask" and self.auto_reply_human:
|
|
agent = event.get("data", {}).get("agent", "")
|
|
if agent:
|
|
asyncio.create_task(
|
|
self._human_reply(chat.project_id, agent)
|
|
)
|
|
|
|
if step == "end":
|
|
break
|
|
|
|
return events
|
|
|
|
# Steps that stream token-by-token and should be grouped on one line
|
|
_STREAMING_STEPS = {"decompose_text", "agent_step_text"}
|
|
|
|
@staticmethod
|
|
def _print_event(event: dict, last_step: str | None) -> str:
|
|
"""Print event, grouping streaming tokens on the same line.
|
|
Returns the current step for tracking."""
|
|
step = event.get("step", "")
|
|
data = event.get("data", "")
|
|
if isinstance(data, dict):
|
|
content = data.get("content", "")
|
|
else:
|
|
content = str(data)
|
|
|
|
if step in BenchmarkClient._STREAMING_STEPS:
|
|
if last_step != step:
|
|
# New streaming group — print label then content inline
|
|
if last_step in BenchmarkClient._STREAMING_STEPS:
|
|
print(flush=True) # close previous streaming line
|
|
print(f"[{step}] {content}", end="", flush=True)
|
|
else:
|
|
print(content, end="", flush=True)
|
|
else:
|
|
if last_step in BenchmarkClient._STREAMING_STEPS:
|
|
print(flush=True) # close previous streaming line
|
|
print(f"[{step}] {content[:200]}", flush=True)
|
|
|
|
return step
|
|
|
|
async def _start_task(self, project_id: str):
|
|
"""Start workforce task execution after planning."""
|
|
url = f"{self.base_url}/task/{project_id}/start"
|
|
response = await self.client.post(url)
|
|
response.raise_for_status()
|
|
|
|
async def _human_reply(self, project_id: str, agent: str):
|
|
"""Auto-reply to agent questions."""
|
|
url = f"{self.base_url}/chat/{project_id}/human-reply"
|
|
payload = {"agent": agent, "reply": self.auto_reply_human}
|
|
response = await self.client.post(url, json=payload)
|
|
response.raise_for_status()
|
|
|
|
async def close(self):
|
|
await self.client.aclose()
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.close()
|