opencode-sdk-python/src/opencode_ai/_client.py
2025-06-27 14:47:09 +00:00

393 lines
15 KiB
Python

# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
from __future__ import annotations
import os
from typing import Any, Union, Mapping
from typing_extensions import Self, override
import httpx
from . import _exceptions
from ._qs import Querystring
from ._types import (
NOT_GIVEN,
Omit,
Timeout,
NotGiven,
Transport,
ProxiesTypes,
RequestOptions,
)
from ._utils import is_given, get_async_library
from ._version import __version__
from .resources import app, file, event, config, session
from ._streaming import Stream as Stream, AsyncStream as AsyncStream
from ._exceptions import APIStatusError
from ._base_client import (
DEFAULT_MAX_RETRIES,
SyncAPIClient,
AsyncAPIClient,
)
__all__ = [
"Timeout",
"Transport",
"ProxiesTypes",
"RequestOptions",
"Opencode",
"AsyncOpencode",
"Client",
"AsyncClient",
]
class Opencode(SyncAPIClient):
event: event.EventResource
app: app.AppResource
file: file.FileResource
config: config.ConfigResource
session: session.SessionResource
with_raw_response: OpencodeWithRawResponse
with_streaming_response: OpencodeWithStreamedResponse
# client options
def __init__(
self,
*,
base_url: str | httpx.URL | None = None,
timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN,
max_retries: int = DEFAULT_MAX_RETRIES,
default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
# Configure a custom httpx client.
# We provide a `DefaultHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`.
# See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details.
http_client: httpx.Client | None = None,
# Enable or disable schema validation for data returned by the API.
# When enabled an error APIResponseValidationError is raised
# if the API responds with invalid data for the expected schema.
#
# This parameter may be removed or changed in the future.
# If you rely on this feature, please open a GitHub issue
# outlining your use-case to help us decide if it should be
# part of our public interface in the future.
_strict_response_validation: bool = False,
) -> None:
"""Construct a new synchronous Opencode client instance."""
if base_url is None:
base_url = os.environ.get("OPENCODE_BASE_URL")
if base_url is None:
base_url = f"http://localhost:54321"
super().__init__(
version=__version__,
base_url=base_url,
max_retries=max_retries,
timeout=timeout,
http_client=http_client,
custom_headers=default_headers,
custom_query=default_query,
_strict_response_validation=_strict_response_validation,
)
self.event = event.EventResource(self)
self.app = app.AppResource(self)
self.file = file.FileResource(self)
self.config = config.ConfigResource(self)
self.session = session.SessionResource(self)
self.with_raw_response = OpencodeWithRawResponse(self)
self.with_streaming_response = OpencodeWithStreamedResponse(self)
@property
@override
def qs(self) -> Querystring:
return Querystring(array_format="comma")
@property
@override
def default_headers(self) -> dict[str, str | Omit]:
return {
**super().default_headers,
"X-Stainless-Async": "false",
**self._custom_headers,
}
def copy(
self,
*,
base_url: str | httpx.URL | None = None,
timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
http_client: httpx.Client | None = None,
max_retries: int | NotGiven = NOT_GIVEN,
default_headers: Mapping[str, str] | None = None,
set_default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
set_default_query: Mapping[str, object] | None = None,
_extra_kwargs: Mapping[str, Any] = {},
) -> Self:
"""
Create a new client instance re-using the same options given to the current client with optional overriding.
"""
if default_headers is not None and set_default_headers is not None:
raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive")
if default_query is not None and set_default_query is not None:
raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive")
headers = self._custom_headers
if default_headers is not None:
headers = {**headers, **default_headers}
elif set_default_headers is not None:
headers = set_default_headers
params = self._custom_query
if default_query is not None:
params = {**params, **default_query}
elif set_default_query is not None:
params = set_default_query
http_client = http_client or self._client
return self.__class__(
base_url=base_url or self.base_url,
timeout=self.timeout if isinstance(timeout, NotGiven) else timeout,
http_client=http_client,
max_retries=max_retries if is_given(max_retries) else self.max_retries,
default_headers=headers,
default_query=params,
**_extra_kwargs,
)
# Alias for `copy` for nicer inline usage, e.g.
# client.with_options(timeout=10).foo.create(...)
with_options = copy
@override
def _make_status_error(
self,
err_msg: str,
*,
body: object,
response: httpx.Response,
) -> APIStatusError:
if response.status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=body)
if response.status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=body)
if response.status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=body)
if response.status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=body)
if response.status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=body)
if response.status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body)
if response.status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=body)
if response.status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=body)
return APIStatusError(err_msg, response=response, body=body)
class AsyncOpencode(AsyncAPIClient):
event: event.AsyncEventResource
app: app.AsyncAppResource
file: file.AsyncFileResource
config: config.AsyncConfigResource
session: session.AsyncSessionResource
with_raw_response: AsyncOpencodeWithRawResponse
with_streaming_response: AsyncOpencodeWithStreamedResponse
# client options
def __init__(
self,
*,
base_url: str | httpx.URL | None = None,
timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN,
max_retries: int = DEFAULT_MAX_RETRIES,
default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
# Configure a custom httpx client.
# We provide a `DefaultAsyncHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`.
# See the [httpx documentation](https://www.python-httpx.org/api/#asyncclient) for more details.
http_client: httpx.AsyncClient | None = None,
# Enable or disable schema validation for data returned by the API.
# When enabled an error APIResponseValidationError is raised
# if the API responds with invalid data for the expected schema.
#
# This parameter may be removed or changed in the future.
# If you rely on this feature, please open a GitHub issue
# outlining your use-case to help us decide if it should be
# part of our public interface in the future.
_strict_response_validation: bool = False,
) -> None:
"""Construct a new async AsyncOpencode client instance."""
if base_url is None:
base_url = os.environ.get("OPENCODE_BASE_URL")
if base_url is None:
base_url = f"http://localhost:54321"
super().__init__(
version=__version__,
base_url=base_url,
max_retries=max_retries,
timeout=timeout,
http_client=http_client,
custom_headers=default_headers,
custom_query=default_query,
_strict_response_validation=_strict_response_validation,
)
self.event = event.AsyncEventResource(self)
self.app = app.AsyncAppResource(self)
self.file = file.AsyncFileResource(self)
self.config = config.AsyncConfigResource(self)
self.session = session.AsyncSessionResource(self)
self.with_raw_response = AsyncOpencodeWithRawResponse(self)
self.with_streaming_response = AsyncOpencodeWithStreamedResponse(self)
@property
@override
def qs(self) -> Querystring:
return Querystring(array_format="comma")
@property
@override
def default_headers(self) -> dict[str, str | Omit]:
return {
**super().default_headers,
"X-Stainless-Async": f"async:{get_async_library()}",
**self._custom_headers,
}
def copy(
self,
*,
base_url: str | httpx.URL | None = None,
timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
http_client: httpx.AsyncClient | None = None,
max_retries: int | NotGiven = NOT_GIVEN,
default_headers: Mapping[str, str] | None = None,
set_default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
set_default_query: Mapping[str, object] | None = None,
_extra_kwargs: Mapping[str, Any] = {},
) -> Self:
"""
Create a new client instance re-using the same options given to the current client with optional overriding.
"""
if default_headers is not None and set_default_headers is not None:
raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive")
if default_query is not None and set_default_query is not None:
raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive")
headers = self._custom_headers
if default_headers is not None:
headers = {**headers, **default_headers}
elif set_default_headers is not None:
headers = set_default_headers
params = self._custom_query
if default_query is not None:
params = {**params, **default_query}
elif set_default_query is not None:
params = set_default_query
http_client = http_client or self._client
return self.__class__(
base_url=base_url or self.base_url,
timeout=self.timeout if isinstance(timeout, NotGiven) else timeout,
http_client=http_client,
max_retries=max_retries if is_given(max_retries) else self.max_retries,
default_headers=headers,
default_query=params,
**_extra_kwargs,
)
# Alias for `copy` for nicer inline usage, e.g.
# client.with_options(timeout=10).foo.create(...)
with_options = copy
@override
def _make_status_error(
self,
err_msg: str,
*,
body: object,
response: httpx.Response,
) -> APIStatusError:
if response.status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=body)
if response.status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=body)
if response.status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=body)
if response.status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=body)
if response.status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=body)
if response.status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body)
if response.status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=body)
if response.status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=body)
return APIStatusError(err_msg, response=response, body=body)
class OpencodeWithRawResponse:
def __init__(self, client: Opencode) -> None:
self.event = event.EventResourceWithRawResponse(client.event)
self.app = app.AppResourceWithRawResponse(client.app)
self.file = file.FileResourceWithRawResponse(client.file)
self.config = config.ConfigResourceWithRawResponse(client.config)
self.session = session.SessionResourceWithRawResponse(client.session)
class AsyncOpencodeWithRawResponse:
def __init__(self, client: AsyncOpencode) -> None:
self.event = event.AsyncEventResourceWithRawResponse(client.event)
self.app = app.AsyncAppResourceWithRawResponse(client.app)
self.file = file.AsyncFileResourceWithRawResponse(client.file)
self.config = config.AsyncConfigResourceWithRawResponse(client.config)
self.session = session.AsyncSessionResourceWithRawResponse(client.session)
class OpencodeWithStreamedResponse:
def __init__(self, client: Opencode) -> None:
self.event = event.EventResourceWithStreamingResponse(client.event)
self.app = app.AppResourceWithStreamingResponse(client.app)
self.file = file.FileResourceWithStreamingResponse(client.file)
self.config = config.ConfigResourceWithStreamingResponse(client.config)
self.session = session.SessionResourceWithStreamingResponse(client.session)
class AsyncOpencodeWithStreamedResponse:
def __init__(self, client: AsyncOpencode) -> None:
self.event = event.AsyncEventResourceWithStreamingResponse(client.event)
self.app = app.AsyncAppResourceWithStreamingResponse(client.app)
self.file = file.AsyncFileResourceWithStreamingResponse(client.file)
self.config = config.AsyncConfigResourceWithStreamingResponse(client.config)
self.session = session.AsyncSessionResourceWithStreamingResponse(client.session)
Client = Opencode
AsyncClient = AsyncOpencode