qwen-code/packages/sdk-python/src/qwen_code_sdk/sync_query.py
jinye e384338145
feat(SDK) Add Python SDK implementation for #3010 (#3494)
* Codex worktree snapshot: startup-cleanup

Co-authored-by: Codex

* Add Python SDK real smoke test

Adds a repository-only real E2E smoke script for the Python SDK, plus npm and developer documentation entry points.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): address review findings — bugs, type safety, and test coverage

- Fix prepare_spawn_info: JS files now use "node" instead of sys.executable
- Fix protocol.py: correct total=False misuse on 7 TypedDicts (required fields were optional)
- Fix query.py: add _closed guard in _ensure_started, suppress exceptions in close()
- Fix sync_query.py: prevent close() deadlock, add context manager, add timeouts
- Fix transport.py: handle malformed JSON lines, add _closed guard in start()
- Fix validation.py: use uuid.RFC_4122 instead of magic UUID
- Fix __init__.py: export TextBlock, widen query_sync signature
- Remove dead code: ensure_not_aborted, write_json_line, _thread_error
- Add 12 new tests (29 → 41): context managers, JSON skip, closed guards, spawn info, timeouts

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): address wenshao review — session_id, bool validation, debug stderr

- Fix continue_session=True generating a wrong random session_id
- Add _as_optional_bool helper for strict type validation on bool fields
- Default debug stderr to sys.stderr when no custom callback is provided

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): address remaining wenshao review feedback

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* test(cli): harden settings dialog restart prompt test

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): review fixes — UUID compat, stderr fallback, sync cleanup

- Remove UUID version restriction to support v6/v7/v8 (RFC 9562)
- Always write to sys.stderr when stderr callback raises (was silent when debug=False)
- Prevent duplicate _STOP sentinel in SyncQuery.close() via _stop_sent flag
- Add ruff format --check to CI workflow
- Fix smoke_real.py version guard: fail early before imports instead of NameError
- Apply ruff format to existing files

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): remaining review fixes — exit_code attr, guard strictness, sync timeout

- Add exit_code attribute to ProcessExitError for programmatic access
- Strengthen is_control_response/is_control_cancel guards to require
  payload fields, preventing misrouting of malformed messages
- Expose control_request_timeout property on Query so SyncQuery uses
  the configured timeout instead of a hardcoded 30s default
- Use dataclasses.replace() instead of direct mutation on frozen-style
  QueryOptions in query() factory
- Add ResourceWarning in SyncQuery.__del__ when not properly closed

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): add exit_code default and guard __del__ against partial GC

- Give ProcessExitError.exit_code a default value (-1) so user code can
  construct the exception with just a message string
- Wrap SyncQuery.__del__ in try/except AttributeError to prevent crashes
  when the object is partially garbage-collected

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): review fixes — resource leak, type safety, CI matrix, docs

- Fix SyncQuery.__del__ to call close() on GC instead of only warning
- Replace hasattr duck-type check with isinstance(prompt, AsyncIterable)
- Type-validate permission_mode/auth_type in QueryOptions.from_mapping
- Use TypeGuard return types on all is_sdk_*/is_control_* predicates
- Add 5s margin to sync wrapper timeouts to prevent error type masking
- Expand CI matrix to test Python 3.10, 3.11, 3.12
- Change ProcessExitError.exit_code default from -1 to None
- Add stderr to docs QueryOptions listing
- Update README sync example to use context manager pattern

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): preserve iterator exhaustion state and suppress detached task warning

- Add _exhausted flag to Query.__anext__ and SyncQuery.__next__ so
  repeated iteration after end-of-stream raises Stop(Async)Iteration
  instead of blocking forever.
- Remove re-raise in _initialize() to prevent asyncio
  "Task exception was never retrieved" warning on detached tasks;
  the error is already surfaced via _finish_with_error().

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): reject mcp_servers at validation time and add iterator/init tests

- Reject mcp_servers in validate_query_options() with a clear error
  instead of advertising MCP support to the CLI and then failing at
  runtime when mcp_message arrives.
- Remove dead mcp_servers branch from _initialize().
- Add tests for async/sync iterator exhaustion, detached init task
  warning suppression, and mcp_servers validation.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix(sdk-python): fix ruff lint errors in new tests

- Use ControlRequestTimeoutError instead of bare Exception (B017)
- Fix import sorting for stdlib vs third-party (I001)
- Break long line to stay within 88-char limit (E501)

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* style(sdk-python): apply ruff format to new tests

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

---------

Co-authored-by: jinye.djy <jinye.djy@alibaba-inc.com>
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-04-25 07:02:58 +08:00

217 lines
6.6 KiB
Python

"""Synchronous wrapper around the async Query API."""
from __future__ import annotations
import asyncio
import threading
import warnings
from collections.abc import AsyncIterable, AsyncIterator, Iterable, Mapping
from queue import Queue
from typing import Any, cast
from .protocol import SDKMessage, SDKUserMessage
from .query import Query, query
from .types import QueryOptions, QueryOptionsDict
_STOP = object()
_SYNC_TIMEOUT_MARGIN = 5.0
class SyncQuery:
def __init__(
self,
prompt: str | Iterable[SDKUserMessage] | AsyncIterable[SDKUserMessage],
options: QueryOptions | QueryOptionsDict | Mapping[str, Any] | None = None,
) -> None:
self._queue: Queue[SDKMessage | Exception | object] = Queue()
self._ready = threading.Event()
self._shutdown = threading.Event()
self._stop_sent = threading.Event()
self._exhausted = False
self._query: Query | None = None
self._consumer_task: asyncio.Task[None] | None = None
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._run_loop,
name="qwen-sdk-sync-loop",
daemon=True,
)
self._thread.start()
if isinstance(prompt, str) or isinstance(prompt, AsyncIterable):
source_prompt: str | AsyncIterable[SDKUserMessage] = prompt
else:
source_prompt = _iterable_to_async(prompt)
future = asyncio.run_coroutine_threadsafe(
self._bootstrap(source_prompt, options),
self._loop,
)
try:
future.result()
except Exception:
self._stop_loop()
raise
def _run_loop(self) -> None:
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
async def _bootstrap(
self,
prompt: str | AsyncIterable[SDKUserMessage],
options: QueryOptions | QueryOptionsDict | Mapping[str, Any] | None,
) -> None:
self._query = query(prompt=prompt, options=options)
self._ready.set()
self._consumer_task = asyncio.create_task(self._consume())
async def _consume(self) -> None:
assert self._query is not None
try:
async for message in self._query:
self._queue.put(message)
except Exception as exc:
self._queue.put(exc)
finally:
if not self._stop_sent.is_set():
self._stop_sent.set()
self._queue.put(_STOP)
def _require_query(self) -> Query:
self._ready.wait(timeout=30)
if self._query is None:
raise RuntimeError("SyncQuery failed to initialize")
return self._query
def __iter__(self) -> SyncQuery:
return self
def __next__(self) -> SDKMessage:
if self._exhausted:
raise StopIteration
item = self._queue.get()
if item is _STOP:
self._exhausted = True
raise StopIteration
if isinstance(item, Exception):
raise item
return cast(SDKMessage, item)
def __enter__(self) -> SyncQuery:
return self
def __exit__(self, *_args: object) -> None:
self.close()
def interrupt(self) -> None:
q = self._require_query()
asyncio.run_coroutine_threadsafe(q.interrupt(), self._loop).result(
timeout=q.control_request_timeout + _SYNC_TIMEOUT_MARGIN
)
def set_model(self, model: str) -> None:
q = self._require_query()
asyncio.run_coroutine_threadsafe(q.set_model(model), self._loop).result(
timeout=q.control_request_timeout + _SYNC_TIMEOUT_MARGIN
)
def set_permission_mode(self, mode: str) -> None:
q = self._require_query()
asyncio.run_coroutine_threadsafe(
q.set_permission_mode(mode),
self._loop,
).result(timeout=q.control_request_timeout + _SYNC_TIMEOUT_MARGIN)
def supported_commands(self) -> Any:
q = self._require_query()
return asyncio.run_coroutine_threadsafe(
q.supported_commands(),
self._loop,
).result(timeout=q.control_request_timeout + _SYNC_TIMEOUT_MARGIN)
def mcp_server_status(self) -> Any:
q = self._require_query()
return asyncio.run_coroutine_threadsafe(
q.mcp_server_status(),
self._loop,
).result(timeout=q.control_request_timeout + _SYNC_TIMEOUT_MARGIN)
def get_session_id(self) -> str:
q = self._require_query()
return q.get_session_id()
def is_closed(self) -> bool:
q = self._require_query()
return q.is_closed()
def close(self) -> None:
if self._shutdown.is_set():
return
self._shutdown.set()
q = self._query
if q is not None:
try:
asyncio.run_coroutine_threadsafe(q.close(), self._loop).result(
timeout=30
)
except Exception:
pass
# Wait for _consume() to put _STOP before stopping the loop,
# otherwise consumers blocked on queue.get() will deadlock.
if self._consumer_task is not None:
try:
asyncio.run_coroutine_threadsafe(
self._await_consumer(), self._loop
).result(timeout=5)
except Exception:
pass
if not self._stop_sent.is_set():
self._stop_sent.set()
self._queue.put(_STOP)
self._stop_loop()
async def _await_consumer(self) -> None:
if self._consumer_task is not None:
try:
await asyncio.wait_for(self._consumer_task, timeout=5.0)
except Exception:
pass
def _stop_loop(self) -> None:
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join(timeout=5)
if not self._loop.is_closed():
self._loop.close()
def __del__(self) -> None:
try:
if not self._shutdown.is_set():
warnings.warn(
"SyncQuery was not closed. "
"Use 'with SyncQuery(...) as q:' or call q.close() explicitly.",
ResourceWarning,
stacklevel=1,
)
try:
self.close()
except Exception:
pass
except AttributeError:
pass
async def _iterable_to_async(
messages: Iterable[SDKUserMessage],
) -> AsyncIterator[SDKUserMessage]:
for message in messages:
yield message