fix: remove skyvern/cli from pyupgrade exclude list and fix violations (#5236)

This commit is contained in:
Celal Zamanoğlu 2026-03-25 19:05:41 +03:00 committed by GitHub
parent 9d2c2543c1
commit e909d3e4a1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 413 additions and 24 deletions

View file

@ -190,6 +190,11 @@ function ActionNode({ id, data, type }: NodeProps<ActionNode>) {
onParametersChange={(parameterKeys) => {
update({ parameterKeys });
}}
onCredentialTotpIdentifier={(totpIdentifier) => {
if (!data.totpIdentifier?.trim()) {
update({ totpIdentifier });
}
}}
/>
</div>
<div className="flex items-center justify-between">

View file

@ -202,6 +202,11 @@ function FileDownloadNode({ id, data }: NodeProps<FileDownloadNode>) {
onParametersChange={(parameterKeys) => {
update({ parameterKeys });
}}
onCredentialTotpIdentifier={(totpIdentifier) => {
if (!data.totpIdentifier?.trim()) {
update({ totpIdentifier });
}
}}
/>
</div>
<div className="flex items-center justify-between">

View file

@ -36,6 +36,10 @@ type Props = {
onUrlAutoFill?: (url: string) => void;
/** Current URL value of the login block — skip auto-fill if already set */
currentUrl?: string;
/** Called when a credential with a totp_identifier is selected to auto-fill the 2FA identifier */
onTotpIdentifierAutoFill?: (totpIdentifier: string) => void;
/** Current totp_identifier value — skip auto-fill if already set */
currentTotpIdentifier?: string;
};
// Function to generate a unique credential parameter key
@ -62,6 +66,8 @@ function LoginBlockCredentialSelector({
onChange,
onUrlAutoFill,
currentUrl,
onTotpIdentifierAutoFill,
currentTotpIdentifier,
}: Props) {
const { setIsOpen, setType } = useCredentialModalState();
const nodes = useNodes<AppNode>();
@ -131,6 +137,11 @@ function LoginBlockCredentialSelector({
type: "credential" as const,
hasBrowserProfile: !!credential.browser_profile_id,
browserProfileUrl: credential.tested_url ?? null,
totpIdentifier:
credential.credential_type === "password" &&
"totp_identifier" in credential.credential
? credential.credential.totp_identifier ?? null
: null,
}));
// Only show non-Skyvern credential parameters (Bitwarden, 1Password, Azure Vault)
@ -250,6 +261,16 @@ function LoginBlockCredentialSelector({
) {
onUrlAutoFill?.(selectedCredential.browserProfileUrl);
}
// Auto-fill the 2FA identifier from the credential's totp_identifier
// when the field is empty.
if (
selectedCredential &&
!currentTotpIdentifier?.trim() &&
selectedCredential.totpIdentifier
) {
onTotpIdentifierAutoFill?.(selectedCredential.totpIdentifier);
}
}}
>
<SelectTrigger

View file

@ -162,6 +162,12 @@ function LoginNode({ id, data, type }: NodeProps<LoginNode>) {
update({ url });
}
}}
currentTotpIdentifier={data.totpIdentifier ?? undefined}
onTotpIdentifierAutoFill={(totpIdentifier) => {
if (editable) {
update({ totpIdentifier });
}
}}
/>
</div>
</div>
@ -191,6 +197,11 @@ function LoginNode({ id, data, type }: NodeProps<LoginNode>) {
onParametersChange={(parameterKeys) => {
update({ parameterKeys });
}}
onCredentialTotpIdentifier={(totpIdentifier) => {
if (!data.totpIdentifier?.trim()) {
update({ totpIdentifier });
}
}}
/>
</div>
<div className="space-y-2">

View file

@ -347,6 +347,11 @@ function NavigationNode({ id, data, type }: NodeProps<NavigationNode>) {
onParametersChange={(parameterKeys) => {
update({ parameterKeys });
}}
onCredentialTotpIdentifier={(totpIdentifier) => {
if (!data.totpIdentifier?.trim()) {
update({ totpIdentifier });
}
}}
/>
</div>
<div className="space-y-2">

View file

@ -3,7 +3,7 @@ import { useWorkflowParametersStore } from "@/store/WorkflowParametersStore";
import { HelpTooltip } from "@/components/HelpTooltip";
import { helpTooltips } from "../../helpContent";
import { useCredentialsQuery } from "@/routes/workflows/hooks/useCredentialsQuery";
import { useContext, useMemo } from "react";
import { useCallback, useContext, useMemo } from "react";
import CloudContext from "@/store/CloudContext";
import { parameterIsSkyvernCredential } from "../../types";
@ -11,12 +11,15 @@ type Props = {
availableOutputParameters: Array<string>;
parameters: Array<string>;
onParametersChange: (parameters: Array<string>) => void;
/** Called when a credential parameter with a totp_identifier is added */
onCredentialTotpIdentifier?: (totpIdentifier: string) => void;
};
function ParametersMultiSelect({
availableOutputParameters,
parameters,
onParametersChange,
onCredentialTotpIdentifier,
}: Props) {
const isCloud = useContext(CloudContext);
const { parameters: workflowParameters } = useWorkflowParametersStore();
@ -57,6 +60,47 @@ function ParametersMultiSelect({
});
}, [keys, workflowParameters, isSuccess, credentialIdsInVault]);
const handleValueChange = useCallback(
(newParameters: Array<string>) => {
onParametersChange(newParameters);
// Check if a credential parameter was newly added
if (onCredentialTotpIdentifier) {
const addedKeys = newParameters.filter(
(key) => !parameters.includes(key),
);
for (const key of addedKeys) {
const param = workflowParameters.find((p) => p.key === key);
if (
param &&
param.parameterType === "credential" &&
parameterIsSkyvernCredential(param)
) {
const credential = credentials.find(
(c) => c.credential_id === param.credentialId,
);
if (
credential &&
credential.credential_type === "password" &&
"totp_identifier" in credential.credential &&
credential.credential.totp_identifier
) {
onCredentialTotpIdentifier(credential.credential.totp_identifier);
break;
}
}
}
}
},
[
onParametersChange,
onCredentialTotpIdentifier,
parameters,
workflowParameters,
credentials,
],
);
return (
<div className="space-y-2">
<header className="flex gap-2">
@ -65,7 +109,7 @@ function ParametersMultiSelect({
</header>
<MultiSelect
value={parameters}
onValueChange={onParametersChange}
onValueChange={handleValueChange}
options={options}
maxCount={50}
/>

View file

@ -159,6 +159,11 @@ function TaskNode({ id, data, type }: NodeProps<TaskNode>) {
onParametersChange={(parameterKeys) => {
update({ parameterKeys });
}}
onCredentialTotpIdentifier={(totpIdentifier) => {
if (!data.totpIdentifier?.trim()) {
update({ totpIdentifier });
}
}}
/>
</div>
</div>

View file

@ -42,7 +42,9 @@ EXTRACT_ACTION_SCROLL_AMOUNT = 500 # pixels per scroll action from extract-acti
# Text input constants
TEXT_INPUT_DELAY = 10 # 10ms between each character input
TEXT_PRESS_MAX_LENGTH = 20
# Number of trailing characters typed keystroke-by-keystroke (the rest use fill()).
# 10 chars yield 9 inter-key intervals, balancing speed with realistic input cadence.
TEXT_PRESS_MAX_LENGTH = 10
# Script generation constants
DEFAULT_SCRIPT_RUN_ID = "default"

View file

@ -23,6 +23,7 @@ from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import download_file as download_file_from_url
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.event.factory import EventStrategyFactory
from skyvern.library.ai_locator import AILocator
from skyvern.webeye.actions import handler_utils
from skyvern.webeye.actions.action_types import ActionType
@ -3269,7 +3270,7 @@ class SkyvernPage(Page):
y: int,
**kwargs: Any,
) -> None:
await self.page.mouse.move(x, y)
await EventStrategyFactory.move_cursor(self.page, x, y)
@action_wrap(ActionType.DRAG)
async def drag(

View file

View file

@ -0,0 +1,49 @@
from abc import ABC, abstractmethod
from playwright.async_api import Locator, Page
class CursorEventStrategy(ABC):
"""Strategy for dispatching cursor events to a page."""
@abstractmethod
async def move_to(self, page: Page, x: float, y: float) -> None:
pass
def sync_position(self, page: Page, x: float, y: float) -> None:
"""Notify the strategy that the cursor is now at (x, y) without generating movement.
Override in stateful strategies that track cursor position.
"""
@abstractmethod
async def move_to_element(self, page: Page, locator: Locator) -> tuple[float, float]:
pass
@abstractmethod
async def click(self, page: Page, locator: Locator) -> None:
pass
class InputEventStrategy(ABC):
"""Strategy for dispatching keyboard input events to a page."""
@abstractmethod
async def type_text(self, page: Page, locator: Locator | None, text: str) -> None:
pass
@abstractmethod
async def clear_field(self, page: Page, locator: Locator, char_count: int) -> None:
pass
class ScrollEventStrategy(ABC):
"""Strategy for dispatching scroll events to a page."""
@abstractmethod
async def scroll_to_element(self, page: Page, locator: Locator) -> None:
pass
@abstractmethod
async def scroll_by(self, page: Page, delta_y: float) -> None:
pass

View file

@ -0,0 +1,62 @@
"""Default event strategies that delegate to standard Playwright calls.
These are used as fallbacks when no custom strategy is registered or when
the feature flag disables custom strategies at runtime.
"""
import structlog
from playwright.async_api import Locator, Page
from skyvern.config import settings
from skyvern.constants import TEXT_INPUT_DELAY
from skyvern.forge.sdk.event.base import CursorEventStrategy, InputEventStrategy, ScrollEventStrategy
LOG = structlog.get_logger(__name__)
class DefaultCursorStrategy(CursorEventStrategy):
"""Cursor strategy that uses plain Playwright mouse movement."""
async def move_to(self, page: Page, x: float, y: float) -> None:
await page.mouse.move(x, y)
async def move_to_element(self, page: Page, locator: Locator) -> tuple[float, float]:
try:
bbox = await locator.bounding_box()
if bbox is None:
LOG.debug("move_to_element: element has no bounding box, skipping")
return 0.0, 0.0
x = bbox["x"] + bbox["width"] / 2
y = bbox["y"] + bbox["height"] / 2
await page.mouse.move(x, y)
return x, y
except Exception:
LOG.debug("move_to_element failed", exc_info=True)
return 0.0, 0.0
async def click(self, page: Page, locator: Locator) -> None:
await locator.click()
class DefaultInputStrategy(InputEventStrategy):
"""Input strategy that uses plain Playwright typing."""
async def type_text(self, page: Page, locator: Locator | None, text: str) -> None:
if locator is not None:
for char in text:
await locator.type(char, delay=TEXT_INPUT_DELAY, timeout=settings.BROWSER_ACTION_TIMEOUT_MS)
else:
await page.keyboard.type(text)
async def clear_field(self, page: Page, locator: Locator, char_count: int) -> None:
await locator.clear(timeout=settings.BROWSER_ACTION_TIMEOUT_MS)
class DefaultScrollStrategy(ScrollEventStrategy):
"""Scroll strategy that uses plain Playwright wheel events."""
async def scroll_to_element(self, page: Page, locator: Locator) -> None:
await locator.scroll_into_view_if_needed()
async def scroll_by(self, page: Page, delta_y: float) -> None:
await page.mouse.wheel(0, delta_y)

View file

@ -0,0 +1,102 @@
import structlog
from playwright.async_api import Locator, Page
from skyvern.forge.sdk.event.base import CursorEventStrategy, InputEventStrategy, ScrollEventStrategy
from skyvern.forge.sdk.event.default import DefaultCursorStrategy, DefaultInputStrategy, DefaultScrollStrategy
LOG = structlog.get_logger(__name__)
_default_cursor = DefaultCursorStrategy()
_default_input = DefaultInputStrategy()
_default_scroll = DefaultScrollStrategy()
class EventStrategyFactory:
__cursor: CursorEventStrategy | None = None
__input: InputEventStrategy | None = None
__scroll: ScrollEventStrategy | None = None
# -- setters ----------------------------------------------------------------
@staticmethod
def set_cursor_strategy(strategy: CursorEventStrategy) -> None:
EventStrategyFactory.__cursor = strategy
@staticmethod
def set_input_strategy(strategy: InputEventStrategy) -> None:
EventStrategyFactory.__input = strategy
@staticmethod
def set_scroll_strategy(strategy: ScrollEventStrategy) -> None:
EventStrategyFactory.__scroll = strategy
@staticmethod
def reset() -> None:
"""Clear all custom strategies, reverting to defaults."""
EventStrategyFactory.__cursor = None
EventStrategyFactory.__input = None
EventStrategyFactory.__scroll = None
# -- getters (always return a non-None strategy) ----------------------------
@staticmethod
def get_cursor_strategy() -> CursorEventStrategy:
return EventStrategyFactory.__cursor or _default_cursor
@staticmethod
def get_input_strategy() -> InputEventStrategy:
return EventStrategyFactory.__input or _default_input
@staticmethod
def get_scroll_strategy() -> ScrollEventStrategy:
return EventStrategyFactory.__scroll or _default_scroll
# -- cursor convenience methods ---------------------------------------------
@staticmethod
async def move_cursor(page: Page, x: float, y: float) -> None:
"""Move cursor using the active strategy."""
await EventStrategyFactory.get_cursor_strategy().move_to(page, x, y)
@staticmethod
async def move_to_element(page: Page, locator: Locator) -> None:
"""Move cursor to element. Failures are logged and swallowed."""
try:
await EventStrategyFactory.get_cursor_strategy().move_to_element(page, locator)
except Exception:
LOG.debug("Cursor move_to_element failed, proceeding with action", exc_info=True)
@staticmethod
def sync_cursor_position(page: Page, x: float, y: float) -> None:
"""Update cursor position without generating movement."""
EventStrategyFactory.get_cursor_strategy().sync_position(page, x, y)
# -- input convenience methods ----------------------------------------------
@staticmethod
async def type_text(page: Page, locator: Locator | None, text: str) -> None:
"""Type text using the active input strategy."""
await EventStrategyFactory.get_input_strategy().type_text(page, locator, text)
@staticmethod
async def clear_field(page: Page, locator: Locator, char_count: int) -> None:
"""Clear field using the active input strategy."""
await EventStrategyFactory.get_input_strategy().clear_field(page, locator, char_count)
# -- scroll convenience methods ---------------------------------------------
@staticmethod
async def scroll_by(page: Page, scroll_x: float, scroll_y: float) -> None:
"""Scroll using the active strategy for vertical-only, raw wheel for horizontal."""
if scroll_x == 0:
await EventStrategyFactory.get_scroll_strategy().scroll_by(page, scroll_y)
else:
await page.mouse.wheel(scroll_x, scroll_y)
@staticmethod
async def scroll_to_element(page: Page, locator: Locator) -> None:
"""Scroll to element using the active strategy. Failures are logged and swallowed."""
try:
await EventStrategyFactory.get_scroll_strategy().scroll_to_element(page, locator)
except Exception:
LOG.debug("scroll_to_element failed, proceeding with action", exc_info=True)

View file

@ -313,6 +313,7 @@ async def create_credential(
credential_response = PasswordCredentialResponse(
username=data.credential.username,
totp_type=data.credential.totp_type if hasattr(data.credential, "totp_type") else "none",
totp_identifier=data.credential.totp_identifier if hasattr(data.credential, "totp_identifier") else None,
)
return CredentialResponse(
credential=credential_response,
@ -1860,6 +1861,7 @@ def _convert_to_response(credential: Credential) -> CredentialResponse:
credential_response = PasswordCredentialResponse(
username=credential.username or credential.credential_id,
totp_type=credential.totp_type,
totp_identifier=credential.totp_identifier,
)
return CredentialResponse(
credential=credential_response,

View file

@ -35,7 +35,7 @@ class TotpType(StrEnum):
class PasswordCredentialResponse(BaseModel):
"""Response model for password credentials — non-sensitive fields only.
SECURITY: Must NEVER include password, TOTP secret, or TOTP identifier.
SECURITY: Must NEVER include password or TOTP secret.
"""
username: str = Field(..., description="The username associated with the credential", examples=["user@example.com"])

View file

@ -70,6 +70,7 @@ from skyvern.forge.sdk.api.llm.schema_validator import validate_and_fill_extract
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.core.skyvern_context import current as skyvern_current
from skyvern.forge.sdk.core.skyvern_context import ensure_context
from skyvern.forge.sdk.event.factory import EventStrategyFactory
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.forge.sdk.services.bitwarden import BitwardenConstants
@ -730,6 +731,7 @@ async def handle_click_action(
if await skyvern_element.navigate_to_a_href(page=page):
return [ActionSuccess()]
await EventStrategyFactory.move_cursor(page, action.x, action.y)
if action.repeat == 1:
await page.mouse.click(x=action.x, y=action.y, button=action.button)
elif action.repeat == 2:
@ -1124,7 +1126,7 @@ async def handle_input_text_action(
) -> list[ActionResult]:
if not action.element_id:
# This is a CUA type action
await page.keyboard.type(action.text)
await EventStrategyFactory.type_text(page, None, action.text)
return [ActionSuccess()]
dom = DomUtil(scraped_page, page)
@ -1997,6 +1999,7 @@ async def handle_select_option_action(
await skyvern_element.scroll_into_view()
try:
await EventStrategyFactory.move_to_element(page, skyvern_element.get_locator())
await skyvern_element.get_locator().click(timeout=timeout)
except Exception:
LOG.info(
@ -2106,6 +2109,7 @@ async def handle_hover_action(
try:
await skyvern_element.hover_to_reveal()
await skyvern_element.get_locator().scroll_into_view_if_needed()
await EventStrategyFactory.move_to_element(page, skyvern_element.get_locator())
await skyvern_element.get_locator().hover(timeout=settings.BROWSER_ACTION_TIMEOUT_MS)
if action.hold_seconds and action.hold_seconds > 0:
@ -2274,7 +2278,7 @@ async def handle_scroll_action(
viewport = page.viewport_size
center_x = viewport["width"] // 2 if viewport else 640
center_y = viewport["height"] // 2 if viewport else 360
await page.mouse.move(center_x, center_y)
await EventStrategyFactory.move_cursor(page, center_x, center_y)
wheel_delta = 500 if scroll_direction == "down" else -500
# Dynamically compute iterations based on remaining scrollable distance
# so we reach the bottom even on very long T&C pages.
@ -2293,6 +2297,11 @@ async def handle_scroll_action(
iterations=iterations,
wheel_delta=wheel_delta,
)
# Scroll per-iteration with page-reaction pauses between each chunk
# (e.g. lazy-load, infinite scroll, dynamically enabled buttons).
# Use raw page.mouse.wheel() here — the chunking + 100ms pauses already
# provide a natural pattern, and applying the custom event strategy
# per-iteration would add excessive latency per chunk.
for _ in range(iterations):
await page.mouse.wheel(0, wheel_delta)
await page.wait_for_timeout(100)
@ -2314,13 +2323,13 @@ async def handle_scroll_action(
"Could not find scrollable container near element, falling back to mouse wheel",
element_id=action.element_id,
)
await page.mouse.wheel(action.scroll_x, action.scroll_y)
await EventStrategyFactory.scroll_by(page, action.scroll_x, action.scroll_y)
elif action.x and action.y:
# Coordinate-based scrolling from CUA/UI-TARS agents
await page.mouse.move(action.x, action.y)
await page.mouse.wheel(action.scroll_x, action.scroll_y)
await EventStrategyFactory.move_cursor(page, action.x, action.y)
await EventStrategyFactory.scroll_by(page, action.scroll_x, action.scroll_y)
else:
await page.mouse.wheel(action.scroll_x, action.scroll_y)
await EventStrategyFactory.scroll_by(page, action.scroll_x, action.scroll_y)
return [ActionSuccess()]
@ -2344,7 +2353,7 @@ async def handle_move_action(
task: Task,
step: Step,
) -> list[ActionResult]:
await page.mouse.move(action.x, action.y)
await EventStrategyFactory.move_cursor(page, action.x, action.y)
return [ActionSuccess()]
@ -2509,6 +2518,7 @@ async def chain_click(
"""
try:
if not await skyvern_element.navigate_to_a_href(page=page):
await EventStrategyFactory.move_to_element(page, locator)
await locator.click(timeout=timeout)
LOG.info("Chain click: main element click succeeded", action=action, locator=locator)
return [ActionSuccess()]
@ -3497,6 +3507,7 @@ async def select_from_dropdown(
"Find an alternative option with the same value. Try to select the option.",
value=value,
)
await EventStrategyFactory.move_to_element(page, locator)
await locator.click(timeout=timeout)
single_select_result.action_result = ActionSuccess()
return single_select_result

View file

@ -5,8 +5,9 @@ import structlog
from playwright.async_api import Locator, Page
from skyvern.config import settings
from skyvern.constants import TEXT_INPUT_DELAY, TEXT_PRESS_MAX_LENGTH
from skyvern.constants import TEXT_PRESS_MAX_LENGTH
from skyvern.forge.sdk.api.files import download_file as download_file_api
from skyvern.forge.sdk.event.factory import EventStrategyFactory
LOG = structlog.get_logger()
@ -35,8 +36,7 @@ async def input_sequentially(locator: Locator, text: str, timeout: float = setti
await locator.fill(text[: length - TEXT_PRESS_MAX_LENGTH], timeout=timeout)
text = text[length - TEXT_PRESS_MAX_LENGTH :]
for char in text:
await locator.type(char, delay=TEXT_INPUT_DELAY, timeout=timeout)
await EventStrategyFactory.type_text(locator.page, locator, text)
async def keypress(page: Page, keys: list[str], hold: bool = False, duration: float = 0) -> None:
@ -95,18 +95,24 @@ async def drag(
page: Page, start_x: int | None = None, start_y: int | None = None, path: list[tuple[int, int]] | None = None
) -> None:
if start_x and start_y:
await page.mouse.move(start_x, start_y)
await EventStrategyFactory.move_cursor(page, start_x, start_y)
await page.mouse.down()
path = path or []
last_x: float = start_x if start_x is not None else 0.0
last_y: float = start_y if start_y is not None else 0.0
for point in path:
x, y = point[0], point[1]
await page.mouse.move(x, y)
last_x, last_y = point[0], point[1]
await EventStrategyFactory.move_cursor(page, last_x, last_y)
await page.mouse.up()
# Sync cursor strategy position to the final drag point so the next
# move_to() starts from the correct location.
if start_x is not None or path:
EventStrategyFactory.sync_cursor_position(page, last_x, last_y)
async def left_mouse(page: Page, x: int | None, y: int | None, direction: Literal["down", "up"]) -> None:
if x and y:
await page.mouse.move(x, y)
await EventStrategyFactory.move_cursor(page, x, y)
if direction == "down":
await page.mouse.down()
elif direction == "up":

View file

@ -11,7 +11,7 @@ import structlog
from playwright.async_api import ElementHandle, FloatRect, Frame, FrameLocator, Locator, Page, TimeoutError
from skyvern.config import settings
from skyvern.constants import SKYVERN_ID_ATTR, TEXT_INPUT_DELAY
from skyvern.constants import SKYVERN_ID_ATTR
from skyvern.exceptions import (
ElementIsNotLabel,
ElementOutOfCurrentViewport,
@ -26,6 +26,7 @@ from skyvern.exceptions import (
SkyvernException,
)
from skyvern.experimentation.wait_utils import get_or_create_wait_config, get_wait_time, scroll_into_view_wait
from skyvern.forge.sdk.event.factory import EventStrategyFactory
from skyvern.webeye.actions import handler_utils
from skyvern.webeye.scraper.scraped_page import ScrapedPage, json_to_html
from skyvern.webeye.scraper.scraper import IncrementalScrapePage, trim_element
@ -702,8 +703,8 @@ class SkyvernElement:
await self.get_locator().press(key=key, timeout=timeout)
async def press_fill(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
for char in text:
await self.get_locator().type(char, delay=TEXT_INPUT_DELAY, timeout=timeout)
locator = self.get_locator()
await EventStrategyFactory.type_text(locator.page, locator, text)
async def input(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
if self.get_tag_name().lower() not in COMMON_INPUT_TAGS:
@ -715,7 +716,8 @@ class SkyvernElement:
await self.get_locator().fill(text, timeout=timeout)
async def input_clear(self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
await self.get_locator().clear(timeout=timeout)
locator = self.get_locator()
await EventStrategyFactory.clear_field(locator.page, locator, char_count=0)
async def check(
self,
@ -819,7 +821,7 @@ class SkyvernElement:
if dest_x < 0 or dest_y < 0:
raise ElementOutOfCurrentViewport(element_id=self.get_id())
await page.mouse.move(dest_x, dest_y)
await EventStrategyFactory.move_cursor(page, dest_x, dest_y)
return dest_x, dest_y
@ -833,6 +835,7 @@ class SkyvernElement:
if await self.is_disabled(dynamic=True):
raise InteractWithDisabledElement(element_id=self.get_id())
await EventStrategyFactory.move_to_element(page, self.get_locator())
try:
await self.get_locator().click(timeout=timeout)
return

View file

@ -0,0 +1,55 @@
"""Tests for _convert_to_response in credentials routes."""
from datetime import datetime
from skyvern.forge.sdk.routes.credentials import _convert_to_response
from skyvern.forge.sdk.schemas.credentials import (
Credential,
CredentialType,
CredentialVaultType,
TotpType,
)
def _make_credential(**overrides: object) -> Credential:
defaults = {
"credential_id": "cred_test",
"organization_id": "org_test",
"name": "Test Credential",
"vault_type": CredentialVaultType.BITWARDEN,
"item_id": "item_test",
"credential_type": CredentialType.PASSWORD,
"username": "user@example.com",
"totp_type": TotpType.AUTHENTICATOR,
"totp_identifier": None,
"card_last4": None,
"card_brand": None,
"secret_label": None,
"browser_profile_id": None,
"tested_url": None,
"user_context": None,
"save_browser_session_intent": False,
"created_at": datetime(2026, 1, 1),
"modified_at": datetime(2026, 1, 1),
"deleted_at": None,
}
defaults.update(overrides)
return Credential(**defaults)
def test_convert_to_response_includes_totp_identifier() -> None:
credential = _make_credential(totp_identifier="login_otp")
response = _convert_to_response(credential)
assert response.credential.totp_identifier == "login_otp"
def test_convert_to_response_totp_identifier_none_when_not_set() -> None:
credential = _make_credential(totp_identifier=None)
response = _convert_to_response(credential)
assert response.credential.totp_identifier is None
def test_convert_to_response_includes_totp_type() -> None:
credential = _make_credential(totp_type=TotpType.EMAIL)
response = _convert_to_response(credential)
assert response.credential.totp_type == TotpType.EMAIL