Support 2FA in Bitwarden (#178)

This commit is contained in:
Kerem Yilmaz 2024-04-10 23:31:17 -07:00 committed by GitHub
parent 6ea649d30b
commit e8b42c9a4f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 113 additions and 48 deletions

View file

@ -207,6 +207,11 @@ class BitwardenListItemsError(BitwardenBaseError):
super().__init__(f"Error listing items in Bitwarden: {message}") super().__init__(f"Error listing items in Bitwarden: {message}")
class BitwardenTOTPError(BitwardenBaseError):
def __init__(self, message: str) -> None:
super().__init__(f"Error generating TOTP in Bitwarden: {message}")
class BitwardenLogoutError(BitwardenBaseError): class BitwardenLogoutError(BitwardenBaseError):
def __init__(self, message: str) -> None: def __init__(self, message: str) -> None:
super().__init__(f"Error logging out of Bitwarden: {message}") super().__init__(f"Error logging out of Bitwarden: {message}")

View file

@ -1,14 +1,32 @@
import json import json
import os import os
import subprocess import subprocess
from enum import StrEnum
import structlog import structlog
from skyvern.exceptions import BitwardenListItemsError, BitwardenLoginError, BitwardenLogoutError, BitwardenUnlockError from skyvern.exceptions import (
BitwardenListItemsError,
BitwardenLoginError,
BitwardenLogoutError,
BitwardenTOTPError,
BitwardenUnlockError,
)
LOG = structlog.get_logger() LOG = structlog.get_logger()
class BitwardenConstants(StrEnum):
CLIENT_ID = "BW_CLIENT_ID"
CLIENT_SECRET = "BW_CLIENT_SECRET"
MASTER_PASSWORD = "BW_MASTER_PASSWORD"
URL = "BW_URL"
USERNAME = "BW_USERNAME"
PASSWORD = "BW_PASSWORD"
TOTP = "BW_TOTP"
class BitwardenService: class BitwardenService:
@staticmethod @staticmethod
def run_command(command: list[str], additional_env: dict[str, str] | None = None) -> subprocess.CompletedProcess: def run_command(command: list[str], additional_env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
@ -34,57 +52,72 @@ class BitwardenService:
Get the secret value from the Bitwarden CLI. Get the secret value from the Bitwarden CLI.
""" """
# Step 1: Set up environment variables and log in # Step 1: Set up environment variables and log in
env = {"BW_CLIENTID": client_id, "BW_CLIENTSECRET": client_secret, "BW_PASSWORD": master_password}
login_command = ["bw", "login", "--apikey"]
login_result = BitwardenService.run_command(login_command, env)
# Print both stdout and stderr for debugging
if login_result.stderr:
raise BitwardenLoginError(login_result.stderr)
# Step 2: Unlock the vault
unlock_command = ["bw", "unlock", "--passwordenv", "BW_PASSWORD"]
unlock_result = BitwardenService.run_command(unlock_command, env)
if unlock_result.stderr:
raise BitwardenUnlockError(unlock_result.stderr)
# Extract session key
try: try:
session_key = unlock_result.stdout.split('"')[1] env = {"BW_CLIENTID": client_id, "BW_CLIENTSECRET": client_secret, "BW_PASSWORD": master_password}
except IndexError: login_command = ["bw", "login", "--apikey"]
raise BitwardenUnlockError("Unable to extract session key.") login_result = BitwardenService.run_command(login_command, env)
if not session_key: # Print both stdout and stderr for debugging
raise BitwardenUnlockError("Session key is empty.") if login_result.stderr:
raise BitwardenLoginError(login_result.stderr)
# Step 3: Retrieve the items # Step 2: Unlock the vault
list_command = ["bw", "list", "items", "--url", url, "--session", session_key] unlock_command = ["bw", "unlock", "--passwordenv", "BW_PASSWORD"]
items_result = BitwardenService.run_command(list_command) unlock_result = BitwardenService.run_command(unlock_command, env)
if items_result.stderr: # This is a part of Bitwarden's client-side telemetry
raise BitwardenListItemsError(items_result.stderr) # TODO -- figure out how to disable this telemetry so we never get this error
# https://github.com/bitwarden/clients/blob/9d10825dbd891c0f41fe1b4c4dd3ca4171f63be5/libs/common/src/services/api.service.ts#L1473
if unlock_result.stderr and "Event post failed" not in unlock_result.stderr:
raise BitwardenUnlockError(unlock_result.stderr)
# Parse the items and extract credentials # Extract session key
try: try:
items = json.loads(items_result.stdout) session_key = unlock_result.stdout.split('"')[1]
except json.JSONDecodeError: except IndexError:
raise BitwardenListItemsError("Failed to parse items JSON. Output: " + items_result.stdout) raise BitwardenUnlockError("Unable to extract session key.")
if not items: if not session_key:
raise BitwardenListItemsError("No items found in Bitwarden.") raise BitwardenUnlockError("Session key is empty.")
credentials = [ # Step 3: Retrieve the items
{"username": item["login"]["username"], "password": item["login"]["password"]} list_command = ["bw", "list", "items", "--url", url, "--session", session_key]
for item in items items_result = BitwardenService.run_command(list_command)
if "login" in item
]
# Step 4: Log out if items_result.stderr and "Event post failed" not in items_result.stderr:
BitwardenService.logout() raise BitwardenListItemsError(items_result.stderr)
# Todo: Handle multiple credentials, for now just return the last one # Parse the items and extract credentials
return credentials[-1] if credentials else {} try:
items = json.loads(items_result.stdout)
except json.JSONDecodeError:
raise BitwardenListItemsError("Failed to parse items JSON. Output: " + items_result.stdout)
if not items:
raise BitwardenListItemsError("No items found in Bitwarden.")
totp_command = ["bw", "get", "totp", url, "--session", session_key]
totp_result = BitwardenService.run_command(totp_command)
if totp_result.stderr and "Event post failed" not in totp_result.stderr:
LOG.warning("Bitwarden TOTP Error", error=totp_result.stderr, e=BitwardenTOTPError(totp_result.stderr))
totp_code = totp_result.stdout
credentials: list[dict[str, str]] = [
{
BitwardenConstants.USERNAME: item["login"]["username"],
BitwardenConstants.PASSWORD: item["login"]["password"],
BitwardenConstants.TOTP: totp_code,
}
for item in items
if "login" in item
]
# Todo: Handle multiple credentials, for now just return the last one
return credentials[-1] if credentials else {}
finally:
# Step 4: Log out
BitwardenService.logout()
@staticmethod @staticmethod
def logout() -> None: def logout() -> None:

View file

@ -5,7 +5,7 @@ import structlog
from skyvern.exceptions import BitwardenBaseError, WorkflowRunContextNotInitialized from skyvern.exceptions import BitwardenBaseError, WorkflowRunContextNotInitialized
from skyvern.forge.sdk.api.aws import AsyncAWSClient from skyvern.forge.sdk.api.aws import AsyncAWSClient
from skyvern.forge.sdk.services.bitwarden import BitwardenService from skyvern.forge.sdk.services.bitwarden import BitwardenConstants, BitwardenService
from skyvern.forge.sdk.workflow.exceptions import OutputParameterKeyCollisionError from skyvern.forge.sdk.workflow.exceptions import OutputParameterKeyCollisionError
from skyvern.forge.sdk.workflow.models.parameter import ( from skyvern.forge.sdk.workflow.models.parameter import (
PARAMETER_TYPE, PARAMETER_TYPE,
@ -88,6 +88,18 @@ class WorkflowRunContext:
return self.secrets.get(secret_id_or_value) return self.secrets.get(secret_id_or_value)
return None return None
def get_secrets_from_password_manager(self) -> dict[str, Any]:
"""
Get the secrets from the password manager. The secrets dict will contain the actual secret values.
"""
secret_credentials = BitwardenService.get_secret_value_from_url(
url=self.secrets[BitwardenConstants.URL],
client_secret=self.secrets[BitwardenConstants.CLIENT_SECRET],
client_id=self.secrets[BitwardenConstants.CLIENT_ID],
master_password=self.secrets[BitwardenConstants.MASTER_PASSWORD],
)
return secret_credentials
@staticmethod @staticmethod
def generate_random_secret_id() -> str: def generate_random_secret_id() -> str:
return f"secret_{uuid.uuid4()}" return f"secret_{uuid.uuid4()}"
@ -138,17 +150,26 @@ class WorkflowRunContext:
url, url,
) )
if secret_credentials: if secret_credentials:
self.secrets[BitwardenConstants.URL] = url
self.secrets[BitwardenConstants.CLIENT_SECRET] = client_secret
self.secrets[BitwardenConstants.CLIENT_ID] = client_id
self.secrets[BitwardenConstants.MASTER_PASSWORD] = master_password
random_secret_id = self.generate_random_secret_id() random_secret_id = self.generate_random_secret_id()
# username secret # username secret
username_secret_id = f"{random_secret_id}_username" username_secret_id = f"{random_secret_id}_username"
self.secrets[username_secret_id] = secret_credentials["username"] self.secrets[username_secret_id] = secret_credentials[BitwardenConstants.USERNAME]
# password secret # password secret
password_secret_id = f"{random_secret_id}_password" password_secret_id = f"{random_secret_id}_password"
self.secrets[password_secret_id] = secret_credentials["password"] self.secrets[password_secret_id] = secret_credentials[BitwardenConstants.PASSWORD]
totp_secret_id = f"{random_secret_id}_totp"
self.secrets[totp_secret_id] = BitwardenConstants.TOTP
self.values[parameter.key] = { self.values[parameter.key] = {
"username": username_secret_id, "username": username_secret_id,
"password": password_secret_id, "password": password_secret_id,
"totp": totp_secret_id,
} }
except BitwardenBaseError as e: except BitwardenBaseError as e:
BitwardenService.logout() BitwardenService.logout()

View file

@ -15,6 +15,7 @@ from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import download_file from skyvern.forge.sdk.api.files import download_file
from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.forge.sdk.services.bitwarden import BitwardenConstants
from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.webeye.actions import actions from skyvern.webeye.actions import actions
from skyvern.webeye.actions.actions import Action, ActionType, ClickAction, ScrapeResult, UploadFileAction, WebAction from skyvern.webeye.actions.actions import Action, ActionType, ClickAction, ScrapeResult, UploadFileAction, WebAction
@ -434,6 +435,10 @@ def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) -> Any:
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id) workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id)
secret_value = workflow_run_context.get_original_secret_value_or_none(parameter) secret_value = workflow_run_context.get_original_secret_value_or_none(parameter)
if secret_value == BitwardenConstants.TOTP:
secrets = workflow_run_context.get_secrets_from_password_manager()
secret_value = secrets[BitwardenConstants.TOTP]
return secret_value if secret_value is not None else parameter return secret_value if secret_value is not None else parameter

View file

@ -1,4 +1,5 @@
// Commands for manipulating rects. // Commands for manipulating rects.
// Want to debug this? Run chromium, go to sources, and create a new snippet with the code in domUtils.js
class Rect { class Rect {
// Create a rect given the top left and bottom right corners. // Create a rect given the top left and bottom right corners.
static create(x1, y1, x2, y2) { static create(x1, y1, x2, y2) {
@ -273,8 +274,8 @@ function hasWidgetRole(element) {
function isInteractableInput(element) { function isInteractableInput(element) {
const tagName = element.tagName.toLowerCase(); const tagName = element.tagName.toLowerCase();
const type = element.getAttribute("type"); const type = element.getAttribute("type") ?? "text"; // Default is text: https://www.w3schools.com/html/html_form_input_types.asp
if (tagName !== "input" || !type) { if (tagName !== "input") {
// let other checks decide // let other checks decide
return false; return false;
} }