import json from collections import Counter from datetime import datetime import requests import structlog from skyvern import analytics from skyvern.exceptions import ( FailedToSendWebhook, MissingValueForParameter, WorkflowNotFound, WorkflowOrganizationMismatch, WorkflowRunNotFound, ) from skyvern.forge import app from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task, TaskStatus from skyvern.forge.sdk.workflow.exceptions import ( ContextParameterSourceNotDefined, InvalidWorkflowDefinition, WorkflowDefinitionHasDuplicateParameterKeys, WorkflowDefinitionHasReservedParameterKeys, ) from skyvern.forge.sdk.workflow.models.block import ( BlockType, BlockTypeVar, CodeBlock, DownloadToS3Block, ForLoopBlock, SendEmailBlock, TaskBlock, TextPromptBlock, UploadToS3Block, ) from skyvern.forge.sdk.workflow.models.parameter import ( PARAMETER_TYPE, AWSSecretParameter, ContextParameter, OutputParameter, Parameter, ParameterType, WorkflowParameter, WorkflowParameterType, ) from skyvern.forge.sdk.workflow.models.workflow import ( Workflow, WorkflowDefinition, WorkflowRequestBody, WorkflowRun, WorkflowRunOutputParameter, WorkflowRunParameter, WorkflowRunStatus, WorkflowRunStatusResponse, ) from skyvern.forge.sdk.workflow.models.yaml import BLOCK_YAML_TYPES, ForLoopBlockYAML, WorkflowCreateYAMLRequest from skyvern.webeye.browser_factory import BrowserState LOG = structlog.get_logger() class WorkflowService: async def setup_workflow_run( self, request_id: str | None, workflow_request: WorkflowRequestBody, workflow_id: str, organization_id: str, max_steps_override: int | None = None, ) -> WorkflowRun: """ Create a workflow run and its parameters. Validate the workflow and the organization. If there are missing parameters with no default value, mark the workflow run as failed. :param request_id: The request id for the workflow run. :param workflow_request: The request body for the workflow run, containing the parameters and the config. :param workflow_id: The workflow id to run. :param organization_id: The organization id for the workflow. :param max_steps_override: The max steps override for the workflow run, if any. :return: The created workflow run. """ # Validate the workflow and the organization workflow = await self.get_workflow(workflow_id=workflow_id, organization_id=organization_id) if workflow is None: LOG.error(f"Workflow {workflow_id} not found") raise WorkflowNotFound(workflow_id=workflow_id) if workflow.organization_id != organization_id: LOG.error(f"Workflow {workflow_id} does not belong to organization {organization_id}") raise WorkflowOrganizationMismatch(workflow_id=workflow_id, organization_id=organization_id) if workflow_request.proxy_location is None and workflow.proxy_location is not None: workflow_request.proxy_location = workflow.proxy_location if workflow_request.webhook_callback_url is None and workflow.webhook_callback_url is not None: workflow_request.webhook_callback_url = workflow.webhook_callback_url # Create the workflow run and set skyvern context workflow_run = await self.create_workflow_run(workflow_request=workflow_request, workflow_id=workflow_id) LOG.info( f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}", request_id=request_id, workflow_run_id=workflow_run.workflow_run_id, workflow_id=workflow.workflow_id, proxy_location=workflow_request.proxy_location, webhook_callback_url=workflow_request.webhook_callback_url, ) skyvern_context.set( SkyvernContext( organization_id=organization_id, request_id=request_id, workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, max_steps_override=max_steps_override, ) ) # Create all the workflow run parameters, AWSSecretParameter won't have workflow run parameters created. all_workflow_parameters = await self.get_workflow_parameters(workflow_id=workflow.workflow_id) workflow_run_parameters = [] for workflow_parameter in all_workflow_parameters: if workflow_request.data and workflow_parameter.key in workflow_request.data: request_body_value = workflow_request.data[workflow_parameter.key] workflow_run_parameter = await self.create_workflow_run_parameter( workflow_run_id=workflow_run.workflow_run_id, workflow_parameter_id=workflow_parameter.workflow_parameter_id, value=request_body_value, ) elif workflow_parameter.default_value is not None: workflow_run_parameter = await self.create_workflow_run_parameter( workflow_run_id=workflow_run.workflow_run_id, workflow_parameter_id=workflow_parameter.workflow_parameter_id, value=workflow_parameter.default_value, ) else: await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) raise MissingValueForParameter( parameter_key=workflow_parameter.key, workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, ) workflow_run_parameters.append(workflow_run_parameter) return workflow_run async def execute_workflow( self, workflow_run_id: str, api_key: str, organization_id: str | None = None, ) -> WorkflowRun: """Execute a workflow.""" workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) workflow = await self.get_workflow(workflow_id=workflow_run.workflow_id, organization_id=organization_id) # Set workflow run status to running, create workflow run parameters await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) # Get all context parameters from the workflow definition context_parameters = [ parameter for parameter in workflow.workflow_definition.parameters if isinstance(parameter, ContextParameter) ] # Get all tuples wp_wps_tuples = await self.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run.workflow_run_id) workflow_output_parameters = await self.get_workflow_output_parameters(workflow_id=workflow.workflow_id) app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context( workflow_run_id, wp_wps_tuples, workflow_output_parameters, context_parameters ) # Execute workflow blocks blocks = workflow.workflow_definition.blocks block_result = None for block_idx, block in enumerate(blocks): try: parameters = block.get_all_parameters(workflow_run_id) await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run( workflow_run_id, parameters ) LOG.info( f"Executing root block {block.block_type} at index {block_idx} for workflow run {workflow_run_id}", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, ) block_result = await block.execute_safe(workflow_run_id=workflow_run_id) if not block_result.success: LOG.error( f"Block with type {block.block_type} at index {block_idx} failed for workflow run {workflow_run_id}", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, block_result=block_result, ) if block.continue_on_failure: LOG.warning( f"Block with type {block.block_type} at index {block_idx} failed but will continue executing the workflow run {workflow_run_id}", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, block_result=block_result, continue_on_failure=block.continue_on_failure, ) else: await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key) return workflow_run except Exception: LOG.exception( f"Error while executing workflow run {workflow_run.workflow_run_id}", workflow_run_id=workflow_run.workflow_run_id, ) await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key) return workflow_run await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key) return workflow_run async def handle_workflow_status(self, workflow_run: WorkflowRun, tasks: list[Task]) -> WorkflowRun: task_counts_by_status = Counter(task.status for task in tasks) # Create a mapping of status to (action, log_func, log_message) status_action_mapping = { TaskStatus.running: (None, LOG.error, "has running tasks, this should not happen"), TaskStatus.terminated: ( self.mark_workflow_run_as_terminated, LOG.warning, "has terminated tasks, marking as terminated", ), TaskStatus.failed: (self.mark_workflow_run_as_failed, LOG.warning, "has failed tasks, marking as failed"), TaskStatus.completed: ( self.mark_workflow_run_as_completed, LOG.info, "tasks are completed, marking as completed", ), } for status, (action, log_func, log_message) in status_action_mapping.items(): if task_counts_by_status.get(status, 0) > 0: if action is not None: await action(workflow_run_id=workflow_run.workflow_run_id) if log_func and log_message: log_func( f"Workflow run {workflow_run.workflow_run_id} {log_message}", workflow_run_id=workflow_run.workflow_run_id, task_counts_by_status=task_counts_by_status, ) return workflow_run # Handle unexpected state LOG.error( f"Workflow run {workflow_run.workflow_run_id} has tasks in an unexpected state, marking as failed", workflow_run_id=workflow_run.workflow_run_id, task_counts_by_status=task_counts_by_status, ) await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) return workflow_run async def create_workflow( self, organization_id: str, title: str, workflow_definition: WorkflowDefinition, description: str | None = None, proxy_location: ProxyLocation | None = None, webhook_callback_url: str | None = None, workflow_permanent_id: str | None = None, version: int | None = None, ) -> Workflow: return await app.DATABASE.create_workflow( title=title, workflow_definition=workflow_definition.model_dump(), organization_id=organization_id, description=description, proxy_location=proxy_location, webhook_callback_url=webhook_callback_url, workflow_permanent_id=workflow_permanent_id, version=version, ) async def get_workflow(self, workflow_id: str, organization_id: str | None = None) -> Workflow: workflow = await app.DATABASE.get_workflow(workflow_id=workflow_id, organization_id=organization_id) if not workflow: raise WorkflowNotFound(workflow_id=workflow_id) return workflow async def get_workflow_by_permanent_id( self, workflow_permanent_id: str, organization_id: str | None = None, version: int | None = None, ) -> Workflow: workflow = await app.DATABASE.get_workflow_by_permanent_id( workflow_permanent_id, organization_id=organization_id, version=version, ) if not workflow: raise WorkflowNotFound(workflow_permanent_id=workflow_permanent_id, version=version) return workflow async def get_workflows_by_organization_id( self, organization_id: str, page: int = 1, page_size: int = 10, ) -> list[Workflow]: """ Get all workflows with the latest version for the organization. """ return await app.DATABASE.get_workflows_by_organization_id( organization_id=organization_id, page=page, page_size=page_size, ) async def update_workflow( self, workflow_id: str, organization_id: str | None = None, title: str | None = None, description: str | None = None, workflow_definition: WorkflowDefinition | None = None, ) -> Workflow: if workflow_definition: workflow_definition.validate() return await app.DATABASE.update_workflow( workflow_id=workflow_id, title=title, organization_id=organization_id, description=description, workflow_definition=workflow_definition.model_dump() if workflow_definition else None, ) async def delete_workflow_by_permanent_id( self, workflow_permanent_id: str, organization_id: str | None = None, ) -> None: await app.DATABASE.soft_delete_workflow_by_permanent_id( workflow_permanent_id=workflow_permanent_id, organization_id=organization_id, ) async def create_workflow_run(self, workflow_request: WorkflowRequestBody, workflow_id: str) -> WorkflowRun: return await app.DATABASE.create_workflow_run( workflow_id=workflow_id, proxy_location=workflow_request.proxy_location, webhook_callback_url=workflow_request.webhook_callback_url, ) async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> None: LOG.info( f"Marking workflow run {workflow_run_id} as completed", workflow_run_id=workflow_run_id, workflow_status="completed", ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.completed, ) async def mark_workflow_run_as_failed(self, workflow_run_id: str) -> None: LOG.info( f"Marking workflow run {workflow_run_id} as failed", workflow_run_id=workflow_run_id, workflow_status="failed", ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.failed, ) async def mark_workflow_run_as_running(self, workflow_run_id: str) -> None: LOG.info( f"Marking workflow run {workflow_run_id} as running", workflow_run_id=workflow_run_id, workflow_status="running", ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.running, ) async def mark_workflow_run_as_terminated(self, workflow_run_id: str) -> None: LOG.info( f"Marking workflow run {workflow_run_id} as terminated", workflow_run_id=workflow_run_id, workflow_status="terminated", ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.terminated, ) async def get_workflow_runs(self, workflow_id: str) -> list[WorkflowRun]: return await app.DATABASE.get_workflow_runs(workflow_id=workflow_id) async def get_workflow_run(self, workflow_run_id: str) -> WorkflowRun: workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id=workflow_run_id) if not workflow_run: raise WorkflowRunNotFound(workflow_run_id) return workflow_run async def create_workflow_parameter( self, workflow_id: str, workflow_parameter_type: WorkflowParameterType, key: str, default_value: bool | int | float | str | dict | list | None = None, description: str | None = None, ) -> WorkflowParameter: return await app.DATABASE.create_workflow_parameter( workflow_id=workflow_id, workflow_parameter_type=workflow_parameter_type, key=key, description=description, default_value=default_value, ) async def create_aws_secret_parameter( self, workflow_id: str, aws_key: str, key: str, description: str | None = None ) -> AWSSecretParameter: return await app.DATABASE.create_aws_secret_parameter( workflow_id=workflow_id, aws_key=aws_key, key=key, description=description ) async def create_bitwarden_login_credential_parameter( self, workflow_id: str, bitwarden_client_id_aws_secret_key: str, bitwarden_client_secret_aws_secret_key: str, bitwarden_master_password_aws_secret_key: str, url_parameter_key: str, key: str, description: str | None = None, ) -> Parameter: return await app.DATABASE.create_bitwarden_login_credential_parameter( workflow_id=workflow_id, bitwarden_client_id_aws_secret_key=bitwarden_client_id_aws_secret_key, bitwarden_client_secret_aws_secret_key=bitwarden_client_secret_aws_secret_key, bitwarden_master_password_aws_secret_key=bitwarden_master_password_aws_secret_key, url_parameter_key=url_parameter_key, key=key, description=description, ) async def create_output_parameter( self, workflow_id: str, key: str, description: str | None = None ) -> OutputParameter: return await app.DATABASE.create_output_parameter(workflow_id=workflow_id, key=key, description=description) async def get_workflow_parameters(self, workflow_id: str) -> list[WorkflowParameter]: return await app.DATABASE.get_workflow_parameters(workflow_id=workflow_id) async def create_workflow_run_parameter( self, workflow_run_id: str, workflow_parameter_id: str, value: bool | int | float | str | dict | list, ) -> WorkflowRunParameter: return await app.DATABASE.create_workflow_run_parameter( workflow_run_id=workflow_run_id, workflow_parameter_id=workflow_parameter_id, value=json.dumps(value) if isinstance(value, (dict, list)) else value, ) async def get_workflow_run_parameter_tuples( self, workflow_run_id: str ) -> list[tuple[WorkflowParameter, WorkflowRunParameter]]: return await app.DATABASE.get_workflow_run_parameters(workflow_run_id=workflow_run_id) @staticmethod async def get_workflow_output_parameters(workflow_id: str) -> list[OutputParameter]: return await app.DATABASE.get_workflow_output_parameters(workflow_id=workflow_id) @staticmethod async def get_workflow_run_output_parameters( workflow_run_id: str, ) -> list[WorkflowRunOutputParameter]: return await app.DATABASE.get_workflow_run_output_parameters(workflow_run_id=workflow_run_id) @staticmethod async def get_output_parameter_workflow_run_output_parameter_tuples( workflow_id: str, workflow_run_id: str, ) -> list[tuple[OutputParameter, WorkflowRunOutputParameter]]: workflow_run_output_parameters = await app.DATABASE.get_workflow_run_output_parameters( workflow_run_id=workflow_run_id ) output_parameters = await app.DATABASE.get_workflow_output_parameters(workflow_id=workflow_id) return [ (output_parameter, workflow_run_output_parameter) for output_parameter in output_parameters for workflow_run_output_parameter in workflow_run_output_parameters if output_parameter.output_parameter_id == workflow_run_output_parameter.output_parameter_id ] async def get_last_task_for_workflow_run(self, workflow_run_id: str) -> Task | None: return await app.DATABASE.get_last_task_for_workflow_run(workflow_run_id=workflow_run_id) async def get_tasks_by_workflow_run_id(self, workflow_run_id: str) -> list[Task]: return await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id) async def build_workflow_run_status_response( self, workflow_id: str, workflow_run_id: str, organization_id: str, ) -> WorkflowRunStatusResponse: workflow = await self.get_workflow(workflow_id=workflow_id, organization_id=organization_id) if workflow is None: LOG.error(f"Workflow {workflow_id} not found") raise WorkflowNotFound(workflow_id=workflow_id) if workflow.organization_id != organization_id: LOG.error(f"Workflow {workflow_id} does not belong to organization {organization_id}") raise WorkflowOrganizationMismatch(workflow_id=workflow_id, organization_id=organization_id) workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) workflow_run_tasks = await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id) screenshot_artifacts = [] screenshot_urls: list[str] | None = None # get the last screenshot for the last 3 tasks of the workflow run for task in workflow_run_tasks[::-1]: screenshot_artifact = await app.DATABASE.get_latest_artifact( task_id=task.task_id, artifact_types=[ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL], organization_id=organization_id, ) if screenshot_artifact: screenshot_artifacts.append(screenshot_artifact) if len(screenshot_artifacts) >= 3: break if screenshot_artifacts: screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(screenshot_artifacts) recording_url = None recording_artifact = await app.DATABASE.get_artifact_for_workflow_run( workflow_run_id=workflow_run_id, artifact_type=ArtifactType.RECORDING, organization_id=organization_id ) if recording_artifact: recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact) workflow_parameter_tuples = await app.DATABASE.get_workflow_run_parameters(workflow_run_id=workflow_run_id) parameters_with_value = {wfp.key: wfrp.value for wfp, wfrp in workflow_parameter_tuples} output_parameter_tuples: list[tuple[OutputParameter, WorkflowRunOutputParameter]] = ( await self.get_output_parameter_workflow_run_output_parameter_tuples( workflow_id=workflow_id, workflow_run_id=workflow_run_id ) ) if output_parameter_tuples: outputs = {output_parameter.key: output.value for output_parameter, output in output_parameter_tuples} else: LOG.error( "No output parameters found for workflow run", workflow_id=workflow_id, workflow_run_id=workflow_run_id, ) outputs = None return WorkflowRunStatusResponse( workflow_id=workflow_id, workflow_run_id=workflow_run_id, status=workflow_run.status, proxy_location=workflow_run.proxy_location, webhook_callback_url=workflow_run.webhook_callback_url, created_at=workflow_run.created_at, modified_at=workflow_run.modified_at, parameters=parameters_with_value, screenshot_urls=screenshot_urls, recording_url=recording_url, outputs=outputs, ) async def send_workflow_response( self, workflow: Workflow, workflow_run: WorkflowRun, api_key: str | None = None, close_browser_on_completion: bool = True, ) -> None: analytics.capture("skyvern-oss-agent-workflow-status", {"status": workflow_run.status}) tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id) all_workflow_task_ids = [task.task_id for task in tasks] browser_state = await app.BROWSER_MANAGER.cleanup_for_workflow_run( workflow_run.workflow_run_id, all_workflow_task_ids, close_browser_on_completion ) if browser_state: await self.persist_video_data(browser_state, workflow, workflow_run) await self.persist_debug_artifacts(browser_state, tasks[-1], workflow, workflow_run) await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_tasks(all_workflow_task_ids) workflow_run_status_response = await self.build_workflow_run_status_response( workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, organization_id=workflow.organization_id, ) LOG.info("Built workflow run status response", workflow_run_status_response=workflow_run_status_response) if not workflow_run.webhook_callback_url: LOG.warning( "Workflow has no webhook callback url. Not sending workflow response", workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, ) return if not api_key: LOG.warning( "Request has no api key. Not sending workflow response", workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, ) return # send webhook to the webhook callback url # TODO: use async requests (httpx) timestamp = str(int(datetime.utcnow().timestamp())) payload = workflow_run_status_response.model_dump_json() signature = generate_skyvern_signature( payload=payload, api_key=api_key, ) headers = { "x-skyvern-timestamp": timestamp, "x-skyvern-signature": signature, "Content-Type": "application/json", } LOG.info( "Sending webhook run status to webhook callback url", workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, webhook_callback_url=workflow_run.webhook_callback_url, payload=payload, headers=headers, ) try: resp = requests.post(workflow_run.webhook_callback_url, data=payload, headers=headers) if resp.ok: LOG.info( "Webhook sent successfully", workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, resp_code=resp.status_code, resp_text=resp.text, ) else: LOG.info( "Webhook failed", workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, resp=resp, resp_code=resp.status_code, resp_text=resp.text, resp_json=resp.json(), ) except Exception as e: raise FailedToSendWebhook( workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id ) from e async def persist_video_data( self, browser_state: BrowserState, workflow: Workflow, workflow_run: WorkflowRun ) -> None: # Create recording artifact after closing the browser, so we can get an accurate recording video_data = await app.BROWSER_MANAGER.get_video_data( workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, browser_state=browser_state, ) if video_data: await app.ARTIFACT_MANAGER.update_artifact_data( artifact_id=browser_state.browser_artifacts.video_artifact_id, organization_id=workflow.organization_id, data=video_data, ) async def persist_har_data( self, browser_state: BrowserState, last_step: Step, workflow: Workflow, workflow_run: WorkflowRun ) -> None: har_data = await app.BROWSER_MANAGER.get_har_data( workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, browser_state=browser_state ) if har_data: await app.ARTIFACT_MANAGER.create_artifact( step=last_step, artifact_type=ArtifactType.HAR, data=har_data, ) async def persist_tracing_data( self, browser_state: BrowserState, last_step: Step, workflow_run: WorkflowRun ) -> None: if browser_state.browser_context is None or browser_state.browser_artifacts.traces_dir is None: return trace_path = f"{browser_state.browser_artifacts.traces_dir}/{workflow_run.workflow_run_id}.zip" await app.ARTIFACT_MANAGER.create_artifact(step=last_step, artifact_type=ArtifactType.TRACE, path=trace_path) async def persist_debug_artifacts( self, browser_state: BrowserState, last_task: Task, workflow: Workflow, workflow_run: WorkflowRun ) -> None: last_step = await app.DATABASE.get_latest_step( task_id=last_task.task_id, organization_id=last_task.organization_id ) if not last_step: return await self.persist_har_data(browser_state, last_step, workflow, workflow_run) await self.persist_tracing_data(browser_state, last_step, workflow_run) async def create_workflow_from_request( self, organization_id: str, request: WorkflowCreateYAMLRequest, workflow_permanent_id: str | None = None, ) -> Workflow: LOG.info("Creating workflow from request", organization_id=organization_id, title=request.title) try: if workflow_permanent_id: existing_latest_workflow = await self.get_workflow_by_permanent_id( workflow_permanent_id=workflow_permanent_id, organization_id=organization_id, ) existing_version = existing_latest_workflow.version workflow = await self.create_workflow( title=request.title, workflow_definition=WorkflowDefinition(parameters=[], blocks=[]), description=request.description, organization_id=organization_id, proxy_location=request.proxy_location, webhook_callback_url=request.webhook_callback_url, workflow_permanent_id=workflow_permanent_id, version=existing_version + 1, ) else: workflow = await self.create_workflow( title=request.title, workflow_definition=WorkflowDefinition(parameters=[], blocks=[]), description=request.description, organization_id=organization_id, proxy_location=request.proxy_location, webhook_callback_url=request.webhook_callback_url, ) # Create parameters from the request parameters: dict[str, PARAMETER_TYPE] = {} duplicate_parameter_keys = set() # Check if user's trying to manually create an output parameter if any( parameter.parameter_type == ParameterType.OUTPUT for parameter in request.workflow_definition.parameters ): raise InvalidWorkflowDefinition(message="Cannot manually create output parameters") # Check if any parameter keys collide with automatically created output parameter keys block_labels = [block.label for block in request.workflow_definition.blocks] # TODO (kerem): Check if block labels are unique output_parameter_keys = [f"{block_label}_output" for block_label in block_labels] parameter_keys = [parameter.key for parameter in request.workflow_definition.parameters] if any(key in output_parameter_keys for key in parameter_keys): raise WorkflowDefinitionHasReservedParameterKeys( reserved_keys=output_parameter_keys, parameter_keys=parameter_keys ) # Create output parameters for all blocks block_output_parameters = await WorkflowService._create_all_output_parameters_for_workflow( workflow_id=workflow.workflow_id, block_yamls=request.workflow_definition.blocks ) for block_output_parameter in block_output_parameters.values(): parameters[block_output_parameter.key] = block_output_parameter # We're going to process context parameters after other parameters since they depend on the other parameters context_parameter_yamls = [] for parameter in request.workflow_definition.parameters: if parameter.key in parameters: LOG.error(f"Duplicate parameter key {parameter.key}") duplicate_parameter_keys.add(parameter.key) continue if parameter.parameter_type == ParameterType.AWS_SECRET: parameters[parameter.key] = await self.create_aws_secret_parameter( workflow_id=workflow.workflow_id, aws_key=parameter.aws_key, key=parameter.key, description=parameter.description, ) elif parameter.parameter_type == ParameterType.BITWARDEN_LOGIN_CREDENTIAL: parameters[parameter.key] = await self.create_bitwarden_login_credential_parameter( workflow_id=workflow.workflow_id, bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, url_parameter_key=parameter.url_parameter_key, key=parameter.key, description=parameter.description, ) elif parameter.parameter_type == ParameterType.WORKFLOW: parameters[parameter.key] = await self.create_workflow_parameter( workflow_id=workflow.workflow_id, workflow_parameter_type=parameter.workflow_parameter_type, key=parameter.key, default_value=parameter.default_value, description=parameter.description, ) elif parameter.parameter_type == ParameterType.OUTPUT: parameters[parameter.key] = await self.create_output_parameter( workflow_id=workflow.workflow_id, key=parameter.key, description=parameter.description, ) elif parameter.parameter_type == ParameterType.CONTEXT: context_parameter_yamls.append(parameter) else: LOG.error(f"Invalid parameter type {parameter.parameter_type}") # Now we can process the context parameters since all other parameters have been created for context_parameter in context_parameter_yamls: if context_parameter.source_parameter_key not in parameters: raise ContextParameterSourceNotDefined( context_parameter_key=context_parameter.key, source_key=context_parameter.source_parameter_key ) if context_parameter.key in parameters: LOG.error(f"Duplicate parameter key {context_parameter.key}") duplicate_parameter_keys.add(context_parameter.key) continue # We're only adding the context parameter to the parameters dict, we're not creating it in the database # It'll only be stored in the `workflow.workflow_definition` # todo (kerem): should we have a database table for context parameters? parameters[context_parameter.key] = ContextParameter( key=context_parameter.key, description=context_parameter.description, source=parameters[context_parameter.source_parameter_key], # Context parameters don't have a default value, the value always depends on the source parameter value=None, ) if duplicate_parameter_keys: raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys) # Create blocks from the request block_label_mapping = {} blocks = [] for block_yaml in request.workflow_definition.blocks: block = await self.block_yaml_to_block(workflow, block_yaml, parameters) blocks.append(block) block_label_mapping[block.label] = block # Set the blocks for the workflow definition workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks) workflow = await self.update_workflow( workflow_id=workflow.workflow_id, organization_id=organization_id, workflow_definition=workflow_definition, ) LOG.info( f"Created workflow from request, title: {request.title}", parameter_keys=[parameter.key for parameter in parameters.values()], block_labels=[block.label for block in blocks], organization_id=organization_id, title=request.title, workflow_id=workflow.workflow_id, ) return workflow except Exception as e: LOG.exception(f"Failed to create workflow from request, title: {request.title}") raise e @staticmethod async def _create_output_parameter_for_block(workflow_id: str, block_yaml: BLOCK_YAML_TYPES) -> OutputParameter: output_parameter_key = f"{block_yaml.label}_output" return await app.DATABASE.create_output_parameter( workflow_id=workflow_id, key=output_parameter_key, description=f"Output parameter for block {block_yaml.label}", ) @staticmethod async def _create_all_output_parameters_for_workflow( workflow_id: str, block_yamls: list[BLOCK_YAML_TYPES] ) -> dict[str, OutputParameter]: output_parameters = {} for block_yaml in block_yamls: output_parameter = await WorkflowService._create_output_parameter_for_block( workflow_id=workflow_id, block_yaml=block_yaml ) output_parameters[block_yaml.label] = output_parameter # Recursively create output parameters for for loop blocks if isinstance(block_yaml, ForLoopBlockYAML): output_parameters.update( await WorkflowService._create_all_output_parameters_for_workflow( workflow_id=workflow_id, block_yamls=block_yaml.loop_blocks ) ) return output_parameters @staticmethod async def block_yaml_to_block( workflow: Workflow, block_yaml: BLOCK_YAML_TYPES, parameters: dict[str, Parameter] ) -> BlockTypeVar: output_parameter = parameters[f"{block_yaml.label}_output"] if block_yaml.block_type == BlockType.TASK: task_block_parameters = ( [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] if block_yaml.parameter_keys else [] ) return TaskBlock( label=block_yaml.label, url=block_yaml.url, title=block_yaml.title, parameters=task_block_parameters, output_parameter=output_parameter, navigation_goal=block_yaml.navigation_goal, data_extraction_goal=block_yaml.data_extraction_goal, data_schema=block_yaml.data_schema, error_code_mapping=block_yaml.error_code_mapping, max_retries=block_yaml.max_retries, ) elif block_yaml.block_type == BlockType.FOR_LOOP: loop_blocks = [ await WorkflowService.block_yaml_to_block(workflow, loop_block, parameters) for loop_block in block_yaml.loop_blocks ] loop_over_parameter = parameters[block_yaml.loop_over_parameter_key] return ForLoopBlock( label=block_yaml.label, loop_over=loop_over_parameter, loop_blocks=loop_blocks, output_parameter=output_parameter, ) elif block_yaml.block_type == BlockType.CODE: return CodeBlock( label=block_yaml.label, code=block_yaml.code, parameters=( [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] if block_yaml.parameter_keys else [] ), output_parameter=output_parameter, ) elif block_yaml.block_type == BlockType.TEXT_PROMPT: return TextPromptBlock( label=block_yaml.label, llm_key=block_yaml.llm_key, prompt=block_yaml.prompt, parameters=( [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] if block_yaml.parameter_keys else [] ), json_schema=block_yaml.json_schema, output_parameter=output_parameter, ) elif block_yaml.block_type == BlockType.DOWNLOAD_TO_S3: return DownloadToS3Block( label=block_yaml.label, output_parameter=output_parameter, url=block_yaml.url, ) elif block_yaml.block_type == BlockType.UPLOAD_TO_S3: return UploadToS3Block( label=block_yaml.label, output_parameter=output_parameter, path=block_yaml.path, ) elif block_yaml.block_type == BlockType.SEND_EMAIL: return SendEmailBlock( label=block_yaml.label, output_parameter=output_parameter, smtp_host=parameters[block_yaml.smtp_host_secret_parameter_key], smtp_port=parameters[block_yaml.smtp_port_secret_parameter_key], smtp_username=parameters[block_yaml.smtp_username_secret_parameter_key], smtp_password=parameters[block_yaml.smtp_password_secret_parameter_key], sender=block_yaml.sender, recipients=block_yaml.recipients, subject=block_yaml.subject, body=block_yaml.body, file_attachments=block_yaml.file_attachments or [], ) raise ValueError(f"Invalid block type {block_yaml.block_type}")