mirror of
https://github.com/anomalyco/opencode-sdk-python.git
synced 2026-04-28 12:39:54 +00:00
feat(api): update via SDK Studio
This commit is contained in:
parent
604017133e
commit
ff05a4adf0
130 changed files with 17166 additions and 1 deletions
248
tests/test_streaming.py
Normal file
248
tests/test_streaming.py
Normal file
|
|
@ -0,0 +1,248 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Iterator, AsyncIterator
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from opencode import Opencode, AsyncOpencode
|
||||
from opencode._streaming import Stream, AsyncStream, ServerSentEvent
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_basic(sync: bool, client: Opencode, async_client: AsyncOpencode) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b"event: completion\n"
|
||||
yield b'data: {"foo":true}\n'
|
||||
yield b"\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "completion"
|
||||
assert sse.json() == {"foo": True}
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_data_missing_event(sync: bool, client: Opencode, async_client: AsyncOpencode) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b'data: {"foo":true}\n'
|
||||
yield b"\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event is None
|
||||
assert sse.json() == {"foo": True}
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_event_missing_data(sync: bool, client: Opencode, async_client: AsyncOpencode) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b"event: ping\n"
|
||||
yield b"\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "ping"
|
||||
assert sse.data == ""
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_multiple_events(sync: bool, client: Opencode, async_client: AsyncOpencode) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b"event: ping\n"
|
||||
yield b"\n"
|
||||
yield b"event: completion\n"
|
||||
yield b"\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "ping"
|
||||
assert sse.data == ""
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "completion"
|
||||
assert sse.data == ""
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_multiple_events_with_data(sync: bool, client: Opencode, async_client: AsyncOpencode) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b"event: ping\n"
|
||||
yield b'data: {"foo":true}\n'
|
||||
yield b"\n"
|
||||
yield b"event: completion\n"
|
||||
yield b'data: {"bar":false}\n'
|
||||
yield b"\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "ping"
|
||||
assert sse.json() == {"foo": True}
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "completion"
|
||||
assert sse.json() == {"bar": False}
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_multiple_data_lines_with_empty_line(sync: bool, client: Opencode, async_client: AsyncOpencode) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b"event: ping\n"
|
||||
yield b"data: {\n"
|
||||
yield b'data: "foo":\n'
|
||||
yield b"data: \n"
|
||||
yield b"data:\n"
|
||||
yield b"data: true}\n"
|
||||
yield b"\n\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "ping"
|
||||
assert sse.json() == {"foo": True}
|
||||
assert sse.data == '{\n"foo":\n\n\ntrue}'
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_data_json_escaped_double_new_line(sync: bool, client: Opencode, async_client: AsyncOpencode) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b"event: ping\n"
|
||||
yield b'data: {"foo": "my long\\n\\ncontent"}'
|
||||
yield b"\n\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "ping"
|
||||
assert sse.json() == {"foo": "my long\n\ncontent"}
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_multiple_data_lines(sync: bool, client: Opencode, async_client: AsyncOpencode) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b"event: ping\n"
|
||||
yield b"data: {\n"
|
||||
yield b'data: "foo":\n'
|
||||
yield b"data: true}\n"
|
||||
yield b"\n\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event == "ping"
|
||||
assert sse.json() == {"foo": True}
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_special_new_line_character(
|
||||
sync: bool,
|
||||
client: Opencode,
|
||||
async_client: AsyncOpencode,
|
||||
) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b'data: {"content":" culpa"}\n'
|
||||
yield b"\n"
|
||||
yield b'data: {"content":" \xe2\x80\xa8"}\n'
|
||||
yield b"\n"
|
||||
yield b'data: {"content":"foo"}\n'
|
||||
yield b"\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event is None
|
||||
assert sse.json() == {"content": " culpa"}
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event is None
|
||||
assert sse.json() == {"content": "
"}
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event is None
|
||||
assert sse.json() == {"content": "foo"}
|
||||
|
||||
await assert_empty_iter(iterator)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||||
async def test_multi_byte_character_multiple_chunks(
|
||||
sync: bool,
|
||||
client: Opencode,
|
||||
async_client: AsyncOpencode,
|
||||
) -> None:
|
||||
def body() -> Iterator[bytes]:
|
||||
yield b'data: {"content":"'
|
||||
# bytes taken from the string 'известни' and arbitrarily split
|
||||
# so that some multi-byte characters span multiple chunks
|
||||
yield b"\xd0"
|
||||
yield b"\xb8\xd0\xb7\xd0"
|
||||
yield b"\xb2\xd0\xb5\xd1\x81\xd1\x82\xd0\xbd\xd0\xb8"
|
||||
yield b'"}\n'
|
||||
yield b"\n"
|
||||
|
||||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||||
|
||||
sse = await iter_next(iterator)
|
||||
assert sse.event is None
|
||||
assert sse.json() == {"content": "известни"}
|
||||
|
||||
|
||||
async def to_aiter(iter: Iterator[bytes]) -> AsyncIterator[bytes]:
|
||||
for chunk in iter:
|
||||
yield chunk
|
||||
|
||||
|
||||
async def iter_next(iter: Iterator[ServerSentEvent] | AsyncIterator[ServerSentEvent]) -> ServerSentEvent:
|
||||
if isinstance(iter, AsyncIterator):
|
||||
return await iter.__anext__()
|
||||
|
||||
return next(iter)
|
||||
|
||||
|
||||
async def assert_empty_iter(iter: Iterator[ServerSentEvent] | AsyncIterator[ServerSentEvent]) -> None:
|
||||
with pytest.raises((StopAsyncIteration, RuntimeError)):
|
||||
await iter_next(iter)
|
||||
|
||||
|
||||
def make_event_iterator(
|
||||
content: Iterator[bytes],
|
||||
*,
|
||||
sync: bool,
|
||||
client: Opencode,
|
||||
async_client: AsyncOpencode,
|
||||
) -> Iterator[ServerSentEvent] | AsyncIterator[ServerSentEvent]:
|
||||
if sync:
|
||||
return Stream(cast_to=object, client=client, response=httpx.Response(200, content=content))._iter_events()
|
||||
|
||||
return AsyncStream(
|
||||
cast_to=object, client=async_client, response=httpx.Response(200, content=to_aiter(content))
|
||||
)._iter_events()
|
||||
Loading…
Add table
Add a link
Reference in a new issue