# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. from __future__ import annotations import os from typing import Any, Union, Mapping from typing_extensions import Self, override import httpx from . import _exceptions from ._qs import Querystring from ._types import ( NOT_GIVEN, Omit, Timeout, NotGiven, Transport, ProxiesTypes, RequestOptions, ) from ._utils import is_given, get_async_library from ._version import __version__ from .resources import app, file, find, event, config, session from ._streaming import Stream as Stream, AsyncStream as AsyncStream from ._exceptions import APIStatusError from ._base_client import ( DEFAULT_MAX_RETRIES, SyncAPIClient, AsyncAPIClient, ) __all__ = [ "Timeout", "Transport", "ProxiesTypes", "RequestOptions", "Opencode", "AsyncOpencode", "Client", "AsyncClient", ] class Opencode(SyncAPIClient): event: event.EventResource app: app.AppResource find: find.FindResource file: file.FileResource config: config.ConfigResource session: session.SessionResource with_raw_response: OpencodeWithRawResponse with_streaming_response: OpencodeWithStreamedResponse # client options def __init__( self, *, base_url: str | httpx.URL | None = None, timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN, max_retries: int = DEFAULT_MAX_RETRIES, default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, # Configure a custom httpx client. # We provide a `DefaultHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`. # See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details. http_client: httpx.Client | None = None, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. # # This parameter may be removed or changed in the future. # If you rely on this feature, please open a GitHub issue # outlining your use-case to help us decide if it should be # part of our public interface in the future. _strict_response_validation: bool = False, ) -> None: """Construct a new synchronous Opencode client instance.""" if base_url is None: base_url = os.environ.get("OPENCODE_BASE_URL") if base_url is None: base_url = f"http://localhost:54321" super().__init__( version=__version__, base_url=base_url, max_retries=max_retries, timeout=timeout, http_client=http_client, custom_headers=default_headers, custom_query=default_query, _strict_response_validation=_strict_response_validation, ) self._default_stream_cls = Stream self.event = event.EventResource(self) self.app = app.AppResource(self) self.find = find.FindResource(self) self.file = file.FileResource(self) self.config = config.ConfigResource(self) self.session = session.SessionResource(self) self.with_raw_response = OpencodeWithRawResponse(self) self.with_streaming_response = OpencodeWithStreamedResponse(self) @property @override def qs(self) -> Querystring: return Querystring(array_format="comma") @property @override def default_headers(self) -> dict[str, str | Omit]: return { **super().default_headers, "X-Stainless-Async": "false", **self._custom_headers, } def copy( self, *, base_url: str | httpx.URL | None = None, timeout: float | Timeout | None | NotGiven = NOT_GIVEN, http_client: httpx.Client | None = None, max_retries: int | NotGiven = NOT_GIVEN, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, set_default_query: Mapping[str, object] | None = None, _extra_kwargs: Mapping[str, Any] = {}, ) -> Self: """ Create a new client instance re-using the same options given to the current client with optional overriding. """ if default_headers is not None and set_default_headers is not None: raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive") if default_query is not None and set_default_query is not None: raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive") headers = self._custom_headers if default_headers is not None: headers = {**headers, **default_headers} elif set_default_headers is not None: headers = set_default_headers params = self._custom_query if default_query is not None: params = {**params, **default_query} elif set_default_query is not None: params = set_default_query http_client = http_client or self._client return self.__class__( base_url=base_url or self.base_url, timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, **_extra_kwargs, ) # Alias for `copy` for nicer inline usage, e.g. # client.with_options(timeout=10).foo.create(...) with_options = copy @override def _make_status_error( self, err_msg: str, *, body: object, response: httpx.Response, ) -> APIStatusError: if response.status_code == 400: return _exceptions.BadRequestError(err_msg, response=response, body=body) if response.status_code == 401: return _exceptions.AuthenticationError(err_msg, response=response, body=body) if response.status_code == 403: return _exceptions.PermissionDeniedError(err_msg, response=response, body=body) if response.status_code == 404: return _exceptions.NotFoundError(err_msg, response=response, body=body) if response.status_code == 409: return _exceptions.ConflictError(err_msg, response=response, body=body) if response.status_code == 422: return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body) if response.status_code == 429: return _exceptions.RateLimitError(err_msg, response=response, body=body) if response.status_code >= 500: return _exceptions.InternalServerError(err_msg, response=response, body=body) return APIStatusError(err_msg, response=response, body=body) class AsyncOpencode(AsyncAPIClient): event: event.AsyncEventResource app: app.AsyncAppResource find: find.AsyncFindResource file: file.AsyncFileResource config: config.AsyncConfigResource session: session.AsyncSessionResource with_raw_response: AsyncOpencodeWithRawResponse with_streaming_response: AsyncOpencodeWithStreamedResponse # client options def __init__( self, *, base_url: str | httpx.URL | None = None, timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN, max_retries: int = DEFAULT_MAX_RETRIES, default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, # Configure a custom httpx client. # We provide a `DefaultAsyncHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`. # See the [httpx documentation](https://www.python-httpx.org/api/#asyncclient) for more details. http_client: httpx.AsyncClient | None = None, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. # # This parameter may be removed or changed in the future. # If you rely on this feature, please open a GitHub issue # outlining your use-case to help us decide if it should be # part of our public interface in the future. _strict_response_validation: bool = False, ) -> None: """Construct a new async AsyncOpencode client instance.""" if base_url is None: base_url = os.environ.get("OPENCODE_BASE_URL") if base_url is None: base_url = f"http://localhost:54321" super().__init__( version=__version__, base_url=base_url, max_retries=max_retries, timeout=timeout, http_client=http_client, custom_headers=default_headers, custom_query=default_query, _strict_response_validation=_strict_response_validation, ) self._default_stream_cls = AsyncStream self.event = event.AsyncEventResource(self) self.app = app.AsyncAppResource(self) self.find = find.AsyncFindResource(self) self.file = file.AsyncFileResource(self) self.config = config.AsyncConfigResource(self) self.session = session.AsyncSessionResource(self) self.with_raw_response = AsyncOpencodeWithRawResponse(self) self.with_streaming_response = AsyncOpencodeWithStreamedResponse(self) @property @override def qs(self) -> Querystring: return Querystring(array_format="comma") @property @override def default_headers(self) -> dict[str, str | Omit]: return { **super().default_headers, "X-Stainless-Async": f"async:{get_async_library()}", **self._custom_headers, } def copy( self, *, base_url: str | httpx.URL | None = None, timeout: float | Timeout | None | NotGiven = NOT_GIVEN, http_client: httpx.AsyncClient | None = None, max_retries: int | NotGiven = NOT_GIVEN, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, set_default_query: Mapping[str, object] | None = None, _extra_kwargs: Mapping[str, Any] = {}, ) -> Self: """ Create a new client instance re-using the same options given to the current client with optional overriding. """ if default_headers is not None and set_default_headers is not None: raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive") if default_query is not None and set_default_query is not None: raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive") headers = self._custom_headers if default_headers is not None: headers = {**headers, **default_headers} elif set_default_headers is not None: headers = set_default_headers params = self._custom_query if default_query is not None: params = {**params, **default_query} elif set_default_query is not None: params = set_default_query http_client = http_client or self._client return self.__class__( base_url=base_url or self.base_url, timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, **_extra_kwargs, ) # Alias for `copy` for nicer inline usage, e.g. # client.with_options(timeout=10).foo.create(...) with_options = copy @override def _make_status_error( self, err_msg: str, *, body: object, response: httpx.Response, ) -> APIStatusError: if response.status_code == 400: return _exceptions.BadRequestError(err_msg, response=response, body=body) if response.status_code == 401: return _exceptions.AuthenticationError(err_msg, response=response, body=body) if response.status_code == 403: return _exceptions.PermissionDeniedError(err_msg, response=response, body=body) if response.status_code == 404: return _exceptions.NotFoundError(err_msg, response=response, body=body) if response.status_code == 409: return _exceptions.ConflictError(err_msg, response=response, body=body) if response.status_code == 422: return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body) if response.status_code == 429: return _exceptions.RateLimitError(err_msg, response=response, body=body) if response.status_code >= 500: return _exceptions.InternalServerError(err_msg, response=response, body=body) return APIStatusError(err_msg, response=response, body=body) class OpencodeWithRawResponse: def __init__(self, client: Opencode) -> None: self.event = event.EventResourceWithRawResponse(client.event) self.app = app.AppResourceWithRawResponse(client.app) self.find = find.FindResourceWithRawResponse(client.find) self.file = file.FileResourceWithRawResponse(client.file) self.config = config.ConfigResourceWithRawResponse(client.config) self.session = session.SessionResourceWithRawResponse(client.session) class AsyncOpencodeWithRawResponse: def __init__(self, client: AsyncOpencode) -> None: self.event = event.AsyncEventResourceWithRawResponse(client.event) self.app = app.AsyncAppResourceWithRawResponse(client.app) self.find = find.AsyncFindResourceWithRawResponse(client.find) self.file = file.AsyncFileResourceWithRawResponse(client.file) self.config = config.AsyncConfigResourceWithRawResponse(client.config) self.session = session.AsyncSessionResourceWithRawResponse(client.session) class OpencodeWithStreamedResponse: def __init__(self, client: Opencode) -> None: self.event = event.EventResourceWithStreamingResponse(client.event) self.app = app.AppResourceWithStreamingResponse(client.app) self.find = find.FindResourceWithStreamingResponse(client.find) self.file = file.FileResourceWithStreamingResponse(client.file) self.config = config.ConfigResourceWithStreamingResponse(client.config) self.session = session.SessionResourceWithStreamingResponse(client.session) class AsyncOpencodeWithStreamedResponse: def __init__(self, client: AsyncOpencode) -> None: self.event = event.AsyncEventResourceWithStreamingResponse(client.event) self.app = app.AsyncAppResourceWithStreamingResponse(client.app) self.find = find.AsyncFindResourceWithStreamingResponse(client.find) self.file = file.AsyncFileResourceWithStreamingResponse(client.file) self.config = config.AsyncConfigResourceWithStreamingResponse(client.config) self.session = session.AsyncSessionResourceWithStreamingResponse(client.session) Client = Opencode AsyncClient = AsyncOpencode