Implement SendEmailBlock (#137)

This commit is contained in:
Kerem Yilmaz 2024-03-31 01:58:11 -07:00 committed by GitHub
parent 3d1b146470
commit 7562cd9c25
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 309 additions and 56 deletions

View file

@ -1,12 +1,16 @@
import abc
import json
import os
import smtplib
import uuid
from email.message import EmailMessage
from enum import StrEnum
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Annotated, Any, Literal, Union
from urllib.parse import urlparse
import aiohttp
import filetype
import structlog
from pydantic import BaseModel, Field
@ -19,13 +23,15 @@ from skyvern.exceptions import (
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.aws import AsyncAWSClient
from skyvern.forge.sdk.api.files import download_file
from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory
from skyvern.forge.sdk.schemas.tasks import TaskStatus
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.exceptions import DownloadFileMaxSizeExceeded
from skyvern.forge.sdk.workflow.exceptions import InvalidEmailClientConfiguration
from skyvern.forge.sdk.workflow.models.parameter import (
PARAMETER_TYPE,
AWSSecretParameter,
ContextParameter,
OutputParameter,
WorkflowParameter,
@ -40,6 +46,7 @@ class BlockType(StrEnum):
CODE = "code"
TEXT_PROMPT = "text_prompt"
DOWNLOAD_TO_S3 = "download_to_s3"
SEND_EMAIL = "send_email"
class Block(BaseModel, abc.ABC):
@ -439,29 +446,6 @@ class DownloadToS3Block(Block):
) -> list[PARAMETER_TYPE]:
return []
async def _download_file(self, max_size_mb: int = 5) -> str:
async with aiohttp.ClientSession() as session:
LOG.info("Downloading file", url=self.url)
async with session.get(self.url) as response:
# Check the content length if available
if response.content_length and response.content_length > max_size_mb * 1024 * 1024:
raise DownloadFileMaxSizeExceeded(max_size_mb)
# Don't forget to delete the temporary file after we're done with it
temp_file = NamedTemporaryFile(delete=False)
total_bytes_downloaded = 0
async for chunk in response.content.iter_chunked(8192):
temp_file.write(chunk)
total_bytes_downloaded += len(chunk)
if total_bytes_downloaded > max_size_mb * 1024 * 1024:
raise DownloadFileMaxSizeExceeded(max_size_mb)
# Seek back to the start of the file
temp_file.seek(0)
return temp_file.name
async def _upload_file_to_s3(self, uri: str, file_path: str) -> None:
try:
client = self.get_async_aws_client()
@ -485,7 +469,7 @@ class DownloadToS3Block(Block):
self.url = task_url_parameter_value
try:
file_path = await self._download_file()
file_path = await download_file(self.url, max_size_mb=10)
except Exception as e:
LOG.error("DownloadToS3Block: Failed to download file", url=self.url, error=str(e))
raise e
@ -516,5 +500,200 @@ class DownloadToS3Block(Block):
return None
BlockSubclasses = Union[ForLoopBlock, TaskBlock, CodeBlock, TextPromptBlock, DownloadToS3Block]
class SendEmailBlock(Block):
block_type: Literal[BlockType.SEND_EMAIL] = BlockType.SEND_EMAIL
smtp_host: AWSSecretParameter
smtp_port: AWSSecretParameter
smtp_username: AWSSecretParameter
# if you're using a Gmail account, you need to pass in an app password instead of your regular password
smtp_password: AWSSecretParameter
sender: str
recipients: list[str]
subject: str
body: str
file_attachments: list[str] = []
def get_all_parameters(
self,
) -> list[PARAMETER_TYPE]:
return [self.smtp_host, self.smtp_port, self.smtp_username, self.smtp_password]
def _decrypt_smtp_parameters(self, workflow_run_context: WorkflowRunContext) -> tuple[str, int, str, str]:
obfuscated_smtp_host_value = workflow_run_context.get_value(self.smtp_host.key)
obfuscated_smtp_port_value = workflow_run_context.get_value(self.smtp_port.key)
obfuscated_smtp_username_value = workflow_run_context.get_value(self.smtp_username.key)
obfuscated_smtp_password_value = workflow_run_context.get_value(self.smtp_password.key)
smtp_host_value = workflow_run_context.get_original_secret_value_or_none(obfuscated_smtp_host_value)
smtp_port_value = workflow_run_context.get_original_secret_value_or_none(obfuscated_smtp_port_value)
smtp_username_value = workflow_run_context.get_original_secret_value_or_none(obfuscated_smtp_username_value)
smtp_password_value = workflow_run_context.get_original_secret_value_or_none(obfuscated_smtp_password_value)
email_config_problems = []
if smtp_host_value is None:
email_config_problems.append("Missing SMTP server")
if smtp_port_value is None:
email_config_problems.append("Missing SMTP port")
elif not smtp_port_value.isdigit():
email_config_problems.append("SMTP port should be a number")
if smtp_username_value is None:
email_config_problems.append("Missing SMTP username")
if smtp_password_value is None:
email_config_problems.append("Missing SMTP password")
if email_config_problems:
raise InvalidEmailClientConfiguration(email_config_problems)
return smtp_host_value, smtp_port_value, smtp_username_value, smtp_password_value
def _get_file_paths(self, workflow_run_context: WorkflowRunContext) -> list[str]:
file_paths = []
for file_path in self.file_attachments:
if not workflow_run_context.has_parameter(file_path):
file_paths.append(file_path)
continue
file_path_parameter_value = workflow_run_context.get_value(file_path)
file_path_parameter_secret_value = workflow_run_context.get_original_secret_value_or_none(
file_path_parameter_value
)
if file_path_parameter_secret_value:
file_paths.append(file_path_parameter_secret_value)
else:
file_paths.append(file_path_parameter_value)
return file_paths
async def _download_from_s3(self, s3_uri: str) -> str:
client = self.get_async_aws_client()
downloaded_bytes = await client.download_file(uri=s3_uri)
file_path = NamedTemporaryFile(delete=False)
file_path.write(downloaded_bytes)
return file_path.name
async def _build_email_message(
self, workflow_run_context: WorkflowRunContext, workflow_run_id: str
) -> EmailMessage:
msg = EmailMessage()
msg["Subject"] = self.subject + f" - Workflow Run ID: {workflow_run_id}"
msg["To"] = ", ".join(self.recipients)
msg["From"] = self.sender
msg.set_content(self.body)
for filename in self._get_file_paths(workflow_run_context):
path = None
try:
if filename.startswith("s3://"):
path = await self._download_from_s3(filename)
elif filename.startswith("http://") or filename.startswith("https://"):
path = await download_file(filename)
else:
LOG.error("SendEmailBlock: Looking for file locally", filename=filename)
if not os.path.exists(filename):
raise FileNotFoundError(f"File not found: {filename}")
if not os.path.isfile(filename):
raise IsADirectoryError(f"Path is a directory: {filename}")
LOG.info("SendEmailBlock: Found file locally", path=path)
path = filename
if not path:
raise FileNotFoundError(f"File not found: {filename}")
# Guess the content type based on the file's extension. Encoding
# will be ignored, although we should check for simple things like
# gzip'd or compressed files.
kind = filetype.guess(path)
if kind:
ctype = kind.mime
extension = kind.extension
else:
# No guess could be made, or the file is encoded (compressed), so
# use a generic bag-of-bits type.
ctype = "application/octet-stream"
extension = None
maintype, subtype = ctype.split("/", 1)
attachment_filename = urlparse(filename).path.replace("/", "_")
# Check if the filename has an extension
if not Path(attachment_filename).suffix:
# If no extension, guess it based on the MIME type
if extension:
attachment_filename += f".{extension}"
LOG.info(
"SendEmailBlock: Adding attachment",
filename=attachment_filename,
maintype=maintype,
subtype=subtype,
)
with open(path, "rb") as fp:
msg.add_attachment(fp.read(), maintype=maintype, subtype=subtype, filename=attachment_filename)
finally:
if path:
os.unlink(path)
return msg
async def execute(self, workflow_run_id: str, **kwargs: dict) -> OutputParameter | None:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
smtp_host_value, smtp_port_value, smtp_username_value, smtp_password_value = self._decrypt_smtp_parameters(
workflow_run_context
)
smtp_host = None
try:
smtp_host = smtplib.SMTP(smtp_host_value, smtp_port_value)
LOG.info("SendEmailBlock: Connected to SMTP server")
smtp_host.starttls()
smtp_host.login(smtp_username_value, smtp_password_value)
LOG.info("SendEmailBlock: Logged in to SMTP server")
message = await self._build_email_message(workflow_run_context, workflow_run_id)
smtp_host.send_message(message)
LOG.info("SendEmailBlock: Email sent")
except Exception as e:
LOG.error("SendEmailBlock: Failed to send email", error=str(e))
if self.output_parameter:
await workflow_run_context.register_output_parameter_value_post_execution(
parameter=self.output_parameter,
value={
"success": False,
"error": str(e),
},
)
await app.DATABASE.create_workflow_run_output_parameter(
workflow_run_id=workflow_run_id,
output_parameter_id=self.output_parameter.output_parameter_id,
value={
"success": False,
"error": str(e),
},
)
return self.output_parameter
raise e
finally:
if smtp_host:
smtp_host.quit()
if self.output_parameter:
await workflow_run_context.register_output_parameter_value_post_execution(
parameter=self.output_parameter,
value={
"success": True,
},
)
await app.DATABASE.create_workflow_run_output_parameter(
workflow_run_id=workflow_run_id,
output_parameter_id=self.output_parameter.output_parameter_id,
value={
"success": True,
},
)
return self.output_parameter
return None
BlockSubclasses = Union[ForLoopBlock, TaskBlock, CodeBlock, TextPromptBlock, DownloadToS3Block, SendEmailBlock]
BlockTypeVar = Annotated[BlockSubclasses, Field(discriminator="block_type")]