Implement FileURLParserBlock and FILE_URL WorkflowParameterType (#559)

This commit is contained in:
Kerem Yilmaz 2024-07-05 17:08:20 -07:00 committed by GitHub
parent 8be94d7928
commit 6929a1d24d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 135 additions and 19 deletions

View file

@ -9,10 +9,18 @@ import structlog
from skyvern.constants import REPO_ROOT_DIR
from skyvern.exceptions import DownloadFileMaxSizeExceeded
from skyvern.forge.sdk.api.aws import AsyncAWSClient
LOG = structlog.get_logger()
async def download_from_s3(client: AsyncAWSClient, s3_uri: str) -> str:
downloaded_bytes = await client.download_file(uri=s3_uri)
file_path = tempfile.NamedTemporaryFile(delete=False)
file_path.write(downloaded_bytes)
return file_path.name
async def download_file(url: str, max_size_mb: int | None = None) -> str:
try:
async with aiohttp.ClientSession(raise_for_status=True) as session:

View file

@ -255,16 +255,21 @@ class WorkflowRunContext:
old_value=parameter.value,
new_value=value,
)
if not isinstance(value, dict):
if not isinstance(value, dict) and not isinstance(value, list):
raise ValueError(
f"ContextParameter can't depend on an OutputParameter with a non-dict value. "
f"ContextParameter can only depend on an OutputParameter with a dict or list value. "
f"ContextParameter key: {parameter.key}, "
f"OutputParameter key: {output_parameter.key}, "
f"OutputParameter value: {value}"
)
if isinstance(value, dict):
parameter.value = value.get(parameter.key)
self.parameters[parameter.key] = parameter
self.values[parameter.key] = parameter.value
else:
parameter.value = value
self.parameters[parameter.key] = parameter
self.values[parameter.key] = parameter.value
async def register_block_parameters(
self,

View file

@ -75,3 +75,11 @@ class ContextParameterSourceNotDefined(BaseWorkflowHTTPException):
f"Source parameter key {source_key} for context parameter {context_parameter_key} does not exist.",
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
class InvalidFileType(BaseWorkflowHTTPException):
def __init__(self, file_url: str, file_type: str, error: str) -> None:
super().__init__(
f"File URL {file_url} is not a valid {file_type} file. Error: {error}",
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)

View file

@ -1,4 +1,5 @@
import abc
import csv
import json
import os
import smtplib
@ -25,12 +26,16 @@ from skyvern.exceptions import (
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.aws import AsyncAWSClient
from skyvern.forge.sdk.api.files import download_file, get_path_for_workflow_download_directory
from skyvern.forge.sdk.api.files import download_file, download_from_s3, get_path_for_workflow_download_directory
from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory
from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.exceptions import InvalidEmailClientConfiguration, NoValidEmailRecipient
from skyvern.forge.sdk.workflow.exceptions import (
InvalidEmailClientConfiguration,
InvalidFileType,
NoValidEmailRecipient,
)
from skyvern.forge.sdk.workflow.models.parameter import (
PARAMETER_TYPE,
AWSSecretParameter,
@ -50,6 +55,7 @@ class BlockType(StrEnum):
DOWNLOAD_TO_S3 = "download_to_s3"
UPLOAD_TO_S3 = "upload_to_s3"
SEND_EMAIL = "send_email"
FILE_URL_PARSER = "file_url_parser"
@dataclass(frozen=True)
@ -353,9 +359,8 @@ class ForLoopBlock(Block):
return list(parameters)
def get_loop_block_context_parameters(self, workflow_run_id: str, loop_data: Any) -> list[ContextParameter]:
if not isinstance(loop_data, dict):
# TODO (kerem): Should we add support for other types?
raise ValueError("loop_data should be a dict")
if not isinstance(loop_data, dict) and not isinstance(loop_data, list):
raise ValueError("loop_data should be a dict or a list.")
context_parameters = []
for loop_block in self.loop_blocks:
@ -369,13 +374,19 @@ class ForLoopBlock(Block):
for context_parameter in context_parameters:
if context_parameter.source.key != self.loop_over.key:
continue
if context_parameter.key not in loop_data:
# If the loop_data is a dict, we need to check if the key exists in the loop_data
if isinstance(loop_data, dict):
if context_parameter.key in loop_data:
context_parameter.value = loop_data[context_parameter.key]
else:
raise ContextParameterValueNotFound(
parameter_key=context_parameter.key,
existing_keys=list(loop_data.keys()),
workflow_run_id=workflow_run_id,
)
context_parameter.value = loop_data[context_parameter.key]
else:
# If the loop_data is a list, we can directly assign the loop_data to the context_parameter value
context_parameter.value = loop_data
return context_parameters
@ -859,7 +870,7 @@ class SendEmailBlock(Block):
path = None
try:
if filename.startswith("s3://"):
path = await self._download_from_s3(filename)
path = await download_from_s3(self.get_async_aws_client(), filename)
elif filename.startswith("http://") or filename.startswith("https://"):
path = await download_file(filename)
else:
@ -947,6 +958,69 @@ class SendEmailBlock(Block):
return self.build_block_result(success=True, output_parameter_value=result_dict)
class FileType(StrEnum):
CSV = "csv"
class FileParserBlock(Block):
block_type: Literal[BlockType.FILE_URL_PARSER] = BlockType.FILE_URL_PARSER
file_url: str
file_type: FileType
def get_all_parameters(
self,
workflow_run_id: str,
) -> list[PARAMETER_TYPE]:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
if self.file_url and workflow_run_context.has_parameter(self.file_url):
return [workflow_run_context.get_parameter(self.file_url)]
return []
def validate_file_type(self, file_url_used: str, file_path: str) -> None:
if self.file_type == FileType.CSV:
try:
with open(file_path, "r") as file:
csv.Sniffer().sniff(file.read(1024))
except csv.Error as e:
raise InvalidFileType(file_url=file_url_used, file_type=self.file_type, error=str(e))
async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
file_url_to_use = self.file_url
if (
self.file_url
and workflow_run_context.has_parameter(self.file_url)
and workflow_run_context.has_value(self.file_url)
):
file_url_parameter_value = workflow_run_context.get_value(self.file_url)
if file_url_parameter_value:
LOG.info(
"FileParserBlock: File URL is parameterized, using parameter value",
file_url_parameter_value=file_url_parameter_value,
file_url_parameter_key=self.file_url,
)
file_url_to_use = file_url_parameter_value
# Download the file
if file_url_to_use.startswith("s3://"):
file_path = await download_from_s3(self.get_async_aws_client(), file_url_to_use)
else:
file_path = await download_file(file_url_to_use)
# Validate the file type
self.validate_file_type(file_url_to_use, file_path)
# Parse the file into a list of dictionaries where each dictionary represents a row in the file
parsed_data = []
with open(file_path, "r") as file:
if self.file_type == FileType.CSV:
reader = csv.DictReader(file)
for row in reader:
parsed_data.append(row)
# Record the parsed data
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, parsed_data)
return self.build_block_result(success=True, output_parameter_value=parsed_data)
BlockSubclasses = Union[
ForLoopBlock,
TaskBlock,
@ -955,5 +1029,6 @@ BlockSubclasses = Union[
DownloadToS3Block,
UploadToS3Block,
SendEmailBlock,
FileParserBlock,
]
BlockTypeVar = Annotated[BlockSubclasses, Field(discriminator="block_type")]

View file

@ -67,6 +67,7 @@ class WorkflowParameterType(StrEnum):
FLOAT = "float"
BOOLEAN = "boolean"
JSON = "json"
FILE_URL = "file_url"
def convert_value(self, value: str | None) -> str | int | float | bool | dict | list | None:
if value is None:
@ -81,6 +82,8 @@ class WorkflowParameterType(StrEnum):
return value.lower() in ["true", "1"]
elif self == WorkflowParameterType.JSON:
return json.loads(value)
elif self == WorkflowParameterType.FILE_URL:
return value
class WorkflowParameter(Parameter):

View file

@ -4,7 +4,7 @@ from typing import Annotated, Any, Literal
from pydantic import BaseModel, Field
from skyvern.forge.sdk.schemas.tasks import ProxyLocation
from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.forge.sdk.workflow.models.block import BlockType, FileType
from skyvern.forge.sdk.workflow.models.parameter import ParameterType, WorkflowParameterType
@ -162,6 +162,13 @@ class SendEmailBlockYAML(BlockYAML):
file_attachments: list[str] | None = None
class FileParserBlockYAML(BlockYAML):
block_type: Literal[BlockType.FILE_URL_PARSER] = BlockType.FILE_URL_PARSER # type: ignore
file_url: str
file_type: FileType
PARAMETER_YAML_SUBCLASSES = (
AWSSecretParameterYAML
| BitwardenLoginCredentialParameterYAML
@ -179,6 +186,7 @@ BLOCK_YAML_SUBCLASSES = (
| DownloadToS3BlockYAML
| UploadToS3BlockYAML
| SendEmailBlockYAML
| FileParserBlockYAML
)
BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")]

View file

@ -25,6 +25,7 @@ from skyvern.forge.sdk.workflow.models.block import (
BlockTypeVar,
CodeBlock,
DownloadToS3Block,
FileParserBlock,
ForLoopBlock,
SendEmailBlock,
TaskBlock,
@ -1052,4 +1053,12 @@ class WorkflowService:
file_attachments=block_yaml.file_attachments or [],
continue_on_failure=block_yaml.continue_on_failure,
)
elif block_yaml.block_type == BlockType.FILE_URL_PARSER:
return FileParserBlock(
label=block_yaml.label,
output_parameter=output_parameter,
file_url=block_yaml.file_url,
file_type=block_yaml.file_type,
continue_on_failure=block_yaml.continue_on_failure,
)
raise ValueError(f"Invalid block type {block_yaml.block_type}")