mirror of
https://github.com/Skyvern-AI/skyvern.git
synced 2025-09-04 11:40:37 +00:00
438 lines
16 KiB
Python
438 lines
16 KiB
Python
# This file was auto-generated by Fern from our API Definition.
|
|
|
|
import typing
|
|
from ..core.client_wrapper import SyncClientWrapper
|
|
from ..core.request_options import RequestOptions
|
|
from .types.agent_get_run_response import AgentGetRunResponse
|
|
from ..core.jsonable_encoder import jsonable_encoder
|
|
from ..core.pydantic_utilities import parse_obj_as
|
|
from ..errors.unprocessable_entity_error import UnprocessableEntityError
|
|
from ..types.http_validation_error import HttpValidationError
|
|
from json.decoder import JSONDecodeError
|
|
from ..core.api_error import ApiError
|
|
from ..types.run_engine import RunEngine
|
|
from ..types.proxy_location import ProxyLocation
|
|
from ..types.task_run_request_data_extraction_schema import TaskRunRequestDataExtractionSchema
|
|
from ..types.task_run_response import TaskRunResponse
|
|
from ..core.serialization import convert_and_respect_annotation_metadata
|
|
from ..errors.bad_request_error import BadRequestError
|
|
from ..core.client_wrapper import AsyncClientWrapper
|
|
|
|
# this is used as the default value for optional parameters
|
|
OMIT = typing.cast(typing.Any, ...)
|
|
|
|
|
|
class AgentClient:
|
|
def __init__(self, *, client_wrapper: SyncClientWrapper):
|
|
self._client_wrapper = client_wrapper
|
|
|
|
def get_run(self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> AgentGetRunResponse:
|
|
"""
|
|
Parameters
|
|
----------
|
|
run_id : str
|
|
|
|
request_options : typing.Optional[RequestOptions]
|
|
Request-specific configuration.
|
|
|
|
Returns
|
|
-------
|
|
AgentGetRunResponse
|
|
Successful Response
|
|
|
|
Examples
|
|
--------
|
|
from skyvern import Skyvern
|
|
|
|
client = Skyvern(
|
|
api_key="YOUR_API_KEY",
|
|
authorization="YOUR_AUTHORIZATION",
|
|
)
|
|
client.agent.get_run(
|
|
run_id="run_id",
|
|
)
|
|
"""
|
|
_response = self._client_wrapper.httpx_client.request(
|
|
f"v1/runs/{jsonable_encoder(run_id)}",
|
|
method="GET",
|
|
request_options=request_options,
|
|
)
|
|
try:
|
|
if 200 <= _response.status_code < 300:
|
|
return typing.cast(
|
|
AgentGetRunResponse,
|
|
parse_obj_as(
|
|
type_=AgentGetRunResponse, # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
if _response.status_code == 422:
|
|
raise UnprocessableEntityError(
|
|
typing.cast(
|
|
HttpValidationError,
|
|
parse_obj_as(
|
|
type_=HttpValidationError, # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
)
|
|
_response_json = _response.json()
|
|
except JSONDecodeError:
|
|
raise ApiError(status_code=_response.status_code, body=_response.text)
|
|
raise ApiError(status_code=_response.status_code, body=_response_json)
|
|
|
|
def run_task(
|
|
self,
|
|
*,
|
|
prompt: str,
|
|
url: typing.Optional[str] = OMIT,
|
|
title: typing.Optional[str] = OMIT,
|
|
engine: typing.Optional[RunEngine] = OMIT,
|
|
proxy_location: typing.Optional[ProxyLocation] = OMIT,
|
|
data_extraction_schema: typing.Optional[TaskRunRequestDataExtractionSchema] = OMIT,
|
|
error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
|
|
max_steps: typing.Optional[int] = OMIT,
|
|
webhook_url: typing.Optional[str] = OMIT,
|
|
totp_identifier: typing.Optional[str] = OMIT,
|
|
totp_url: typing.Optional[str] = OMIT,
|
|
browser_session_id: typing.Optional[str] = OMIT,
|
|
publish_workflow: typing.Optional[bool] = OMIT,
|
|
request_options: typing.Optional[RequestOptions] = None,
|
|
) -> TaskRunResponse:
|
|
"""
|
|
Run a task
|
|
|
|
Parameters
|
|
----------
|
|
prompt : str
|
|
The goal or task description for Skyvern to accomplish
|
|
|
|
url : typing.Optional[str]
|
|
The starting URL for the task. If not provided, Skyvern will attempt to determine an appropriate URL
|
|
|
|
title : typing.Optional[str]
|
|
Optional title for the task
|
|
|
|
engine : typing.Optional[RunEngine]
|
|
The Skyvern engine version to use for this task
|
|
|
|
proxy_location : typing.Optional[ProxyLocation]
|
|
Geographic Proxy location to route the browser traffic through
|
|
|
|
data_extraction_schema : typing.Optional[TaskRunRequestDataExtractionSchema]
|
|
Schema defining what data should be extracted from the webpage
|
|
|
|
error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]]
|
|
Custom mapping of error codes to error messages if Skyvern encounters an error
|
|
|
|
max_steps : typing.Optional[int]
|
|
Maximum number of steps the task can take before timing out
|
|
|
|
webhook_url : typing.Optional[str]
|
|
URL to send task status updates to after a run is finished
|
|
|
|
totp_identifier : typing.Optional[str]
|
|
Identifier for TOTP (Time-based One-Time Password) authentication if codes are being pushed to Skyvern
|
|
|
|
totp_url : typing.Optional[str]
|
|
URL for TOTP authentication setup if Skyvern should be polling endpoint for 2FA codes
|
|
|
|
browser_session_id : typing.Optional[str]
|
|
ID of an existing browser session to reuse, having it continue from the current screen state
|
|
|
|
publish_workflow : typing.Optional[bool]
|
|
Whether to publish this task as a reusable workflow.
|
|
|
|
request_options : typing.Optional[RequestOptions]
|
|
Request-specific configuration.
|
|
|
|
Returns
|
|
-------
|
|
TaskRunResponse
|
|
Successfully run task
|
|
|
|
Examples
|
|
--------
|
|
from skyvern import Skyvern
|
|
|
|
client = Skyvern(
|
|
api_key="YOUR_API_KEY",
|
|
authorization="YOUR_AUTHORIZATION",
|
|
)
|
|
client.agent.run_task(
|
|
prompt="prompt",
|
|
)
|
|
"""
|
|
_response = self._client_wrapper.httpx_client.request(
|
|
"v1/tasks",
|
|
method="POST",
|
|
json={
|
|
"prompt": prompt,
|
|
"url": url,
|
|
"title": title,
|
|
"engine": engine,
|
|
"proxy_location": proxy_location,
|
|
"data_extraction_schema": convert_and_respect_annotation_metadata(
|
|
object_=data_extraction_schema, annotation=TaskRunRequestDataExtractionSchema, direction="write"
|
|
),
|
|
"error_code_mapping": error_code_mapping,
|
|
"max_steps": max_steps,
|
|
"webhook_url": webhook_url,
|
|
"totp_identifier": totp_identifier,
|
|
"totp_url": totp_url,
|
|
"browser_session_id": browser_session_id,
|
|
"publish_workflow": publish_workflow,
|
|
},
|
|
request_options=request_options,
|
|
omit=OMIT,
|
|
)
|
|
try:
|
|
if 200 <= _response.status_code < 300:
|
|
return typing.cast(
|
|
TaskRunResponse,
|
|
parse_obj_as(
|
|
type_=TaskRunResponse, # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
if _response.status_code == 400:
|
|
raise BadRequestError(
|
|
typing.cast(
|
|
typing.Optional[typing.Any],
|
|
parse_obj_as(
|
|
type_=typing.Optional[typing.Any], # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
)
|
|
if _response.status_code == 422:
|
|
raise UnprocessableEntityError(
|
|
typing.cast(
|
|
HttpValidationError,
|
|
parse_obj_as(
|
|
type_=HttpValidationError, # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
)
|
|
_response_json = _response.json()
|
|
except JSONDecodeError:
|
|
raise ApiError(status_code=_response.status_code, body=_response.text)
|
|
raise ApiError(status_code=_response.status_code, body=_response_json)
|
|
|
|
|
|
class AsyncAgentClient:
|
|
def __init__(self, *, client_wrapper: AsyncClientWrapper):
|
|
self._client_wrapper = client_wrapper
|
|
|
|
async def get_run(
|
|
self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None
|
|
) -> AgentGetRunResponse:
|
|
"""
|
|
Parameters
|
|
----------
|
|
run_id : str
|
|
|
|
request_options : typing.Optional[RequestOptions]
|
|
Request-specific configuration.
|
|
|
|
Returns
|
|
-------
|
|
AgentGetRunResponse
|
|
Successful Response
|
|
|
|
Examples
|
|
--------
|
|
import asyncio
|
|
|
|
from skyvern import AsyncSkyvern
|
|
|
|
client = AsyncSkyvern(
|
|
api_key="YOUR_API_KEY",
|
|
authorization="YOUR_AUTHORIZATION",
|
|
)
|
|
|
|
|
|
async def main() -> None:
|
|
await client.agent.get_run(
|
|
run_id="run_id",
|
|
)
|
|
|
|
|
|
asyncio.run(main())
|
|
"""
|
|
_response = await self._client_wrapper.httpx_client.request(
|
|
f"v1/runs/{jsonable_encoder(run_id)}",
|
|
method="GET",
|
|
request_options=request_options,
|
|
)
|
|
try:
|
|
if 200 <= _response.status_code < 300:
|
|
return typing.cast(
|
|
AgentGetRunResponse,
|
|
parse_obj_as(
|
|
type_=AgentGetRunResponse, # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
if _response.status_code == 422:
|
|
raise UnprocessableEntityError(
|
|
typing.cast(
|
|
HttpValidationError,
|
|
parse_obj_as(
|
|
type_=HttpValidationError, # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
)
|
|
_response_json = _response.json()
|
|
except JSONDecodeError:
|
|
raise ApiError(status_code=_response.status_code, body=_response.text)
|
|
raise ApiError(status_code=_response.status_code, body=_response_json)
|
|
|
|
async def run_task(
|
|
self,
|
|
*,
|
|
prompt: str,
|
|
url: typing.Optional[str] = OMIT,
|
|
title: typing.Optional[str] = OMIT,
|
|
engine: typing.Optional[RunEngine] = OMIT,
|
|
proxy_location: typing.Optional[ProxyLocation] = OMIT,
|
|
data_extraction_schema: typing.Optional[TaskRunRequestDataExtractionSchema] = OMIT,
|
|
error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
|
|
max_steps: typing.Optional[int] = OMIT,
|
|
webhook_url: typing.Optional[str] = OMIT,
|
|
totp_identifier: typing.Optional[str] = OMIT,
|
|
totp_url: typing.Optional[str] = OMIT,
|
|
browser_session_id: typing.Optional[str] = OMIT,
|
|
publish_workflow: typing.Optional[bool] = OMIT,
|
|
request_options: typing.Optional[RequestOptions] = None,
|
|
) -> TaskRunResponse:
|
|
"""
|
|
Run a task
|
|
|
|
Parameters
|
|
----------
|
|
prompt : str
|
|
The goal or task description for Skyvern to accomplish
|
|
|
|
url : typing.Optional[str]
|
|
The starting URL for the task. If not provided, Skyvern will attempt to determine an appropriate URL
|
|
|
|
title : typing.Optional[str]
|
|
Optional title for the task
|
|
|
|
engine : typing.Optional[RunEngine]
|
|
The Skyvern engine version to use for this task
|
|
|
|
proxy_location : typing.Optional[ProxyLocation]
|
|
Geographic Proxy location to route the browser traffic through
|
|
|
|
data_extraction_schema : typing.Optional[TaskRunRequestDataExtractionSchema]
|
|
Schema defining what data should be extracted from the webpage
|
|
|
|
error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]]
|
|
Custom mapping of error codes to error messages if Skyvern encounters an error
|
|
|
|
max_steps : typing.Optional[int]
|
|
Maximum number of steps the task can take before timing out
|
|
|
|
webhook_url : typing.Optional[str]
|
|
URL to send task status updates to after a run is finished
|
|
|
|
totp_identifier : typing.Optional[str]
|
|
Identifier for TOTP (Time-based One-Time Password) authentication if codes are being pushed to Skyvern
|
|
|
|
totp_url : typing.Optional[str]
|
|
URL for TOTP authentication setup if Skyvern should be polling endpoint for 2FA codes
|
|
|
|
browser_session_id : typing.Optional[str]
|
|
ID of an existing browser session to reuse, having it continue from the current screen state
|
|
|
|
publish_workflow : typing.Optional[bool]
|
|
Whether to publish this task as a reusable workflow.
|
|
|
|
request_options : typing.Optional[RequestOptions]
|
|
Request-specific configuration.
|
|
|
|
Returns
|
|
-------
|
|
TaskRunResponse
|
|
Successfully run task
|
|
|
|
Examples
|
|
--------
|
|
import asyncio
|
|
|
|
from skyvern import AsyncSkyvern
|
|
|
|
client = AsyncSkyvern(
|
|
api_key="YOUR_API_KEY",
|
|
authorization="YOUR_AUTHORIZATION",
|
|
)
|
|
|
|
|
|
async def main() -> None:
|
|
await client.agent.run_task(
|
|
prompt="prompt",
|
|
)
|
|
|
|
|
|
asyncio.run(main())
|
|
"""
|
|
_response = await self._client_wrapper.httpx_client.request(
|
|
"v1/tasks",
|
|
method="POST",
|
|
json={
|
|
"prompt": prompt,
|
|
"url": url,
|
|
"title": title,
|
|
"engine": engine,
|
|
"proxy_location": proxy_location,
|
|
"data_extraction_schema": convert_and_respect_annotation_metadata(
|
|
object_=data_extraction_schema, annotation=TaskRunRequestDataExtractionSchema, direction="write"
|
|
),
|
|
"error_code_mapping": error_code_mapping,
|
|
"max_steps": max_steps,
|
|
"webhook_url": webhook_url,
|
|
"totp_identifier": totp_identifier,
|
|
"totp_url": totp_url,
|
|
"browser_session_id": browser_session_id,
|
|
"publish_workflow": publish_workflow,
|
|
},
|
|
request_options=request_options,
|
|
omit=OMIT,
|
|
)
|
|
try:
|
|
if 200 <= _response.status_code < 300:
|
|
return typing.cast(
|
|
TaskRunResponse,
|
|
parse_obj_as(
|
|
type_=TaskRunResponse, # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
if _response.status_code == 400:
|
|
raise BadRequestError(
|
|
typing.cast(
|
|
typing.Optional[typing.Any],
|
|
parse_obj_as(
|
|
type_=typing.Optional[typing.Any], # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
)
|
|
if _response.status_code == 422:
|
|
raise UnprocessableEntityError(
|
|
typing.cast(
|
|
HttpValidationError,
|
|
parse_obj_as(
|
|
type_=HttpValidationError, # type: ignore
|
|
object_=_response.json(),
|
|
),
|
|
)
|
|
)
|
|
_response_json = _response.json()
|
|
except JSONDecodeError:
|
|
raise ApiError(status_code=_response.status_code, body=_response.text)
|
|
raise ApiError(status_code=_response.status_code, body=_response_json)
|