support global workflow (#1664)

This commit is contained in:
Shuchang Zheng 2025-01-28 15:04:18 +08:00 committed by GitHub
parent 833cd8194e
commit 1b79ef9ca3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 154 additions and 26 deletions

View file

@ -156,7 +156,7 @@ class ForgeAgent:
navigation_goal=task_block.navigation_goal, navigation_goal=task_block.navigation_goal,
data_extraction_goal=task_block.data_extraction_goal, data_extraction_goal=task_block.data_extraction_goal,
navigation_payload=navigation_payload, navigation_payload=navigation_payload,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
proxy_location=workflow_run.proxy_location, proxy_location=workflow_run.proxy_location,
extracted_information_schema=task_block.data_schema, extracted_information_schema=task_block.data_schema,
workflow_run_id=workflow_run.workflow_run_id, workflow_run_id=workflow_run.workflow_run_id,

View file

@ -40,6 +40,10 @@ class BaseStorage(ABC):
def build_uri(self, artifact_id: str, step: Step, artifact_type: ArtifactType) -> str: def build_uri(self, artifact_id: str, step: Step, artifact_type: ArtifactType) -> str:
pass pass
@abstractmethod
async def retrieve_global_workflows(self) -> list[str]:
pass
@abstractmethod @abstractmethod
def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str: def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str:
pass pass

View file

@ -26,6 +26,17 @@ class LocalStorage(BaseStorage):
file_ext = FILE_EXTENTSION_MAP[artifact_type] file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"file://{self.artifact_path}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" return f"file://{self.artifact_path}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
async def retrieve_global_workflows(self) -> list[str]:
file_path = Path(f"{self.artifact_path}/{settings.ENV}/global_workflows.txt")
self._create_directories_if_not_exists(file_path)
if not file_path.exists():
return []
try:
with open(file_path, "r") as f:
return [line.strip() for line in f.readlines() if line.strip()]
except Exception:
return []
def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str: def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type] file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"file://{self.artifact_path}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}" return f"file://{self.artifact_path}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}"

View file

@ -29,6 +29,13 @@ class S3Storage(BaseStorage):
file_ext = FILE_EXTENTSION_MAP[artifact_type] file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{settings.ENV}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" return f"s3://{self.bucket}/{settings.ENV}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
async def retrieve_global_workflows(self) -> list[str]:
uri = f"s3://{self.bucket}/{settings.ENV}/global_workflows.txt"
data = await self.async_client.download_file(uri, log_exception=False)
if not data:
return []
return [line.strip() for line in data.decode("utf-8").split("\n") if line.strip()]
def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str: def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type] file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{settings.ENV}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}" return f"s3://{self.bucket}/{settings.ENV}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}"

View file

@ -1172,6 +1172,55 @@ class AgentDB:
LOG.error("SQLAlchemyError", exc_info=True) LOG.error("SQLAlchemyError", exc_info=True)
raise raise
async def get_workflows_by_permanent_ids(
self,
workflow_permanent_ids: list[str],
organization_id: str | None = None,
page: int = 1,
page_size: int = 10,
title: str = "",
statuses: list[WorkflowStatus] | None = None,
) -> list[Workflow]:
"""
Get all workflows with the latest version for the organization.
"""
if page < 1:
raise ValueError(f"Page must be greater than 0, got {page}")
db_page = page - 1
try:
async with self.Session() as session:
subquery = (
select(
WorkflowModel.workflow_permanent_id,
func.max(WorkflowModel.version).label("max_version"),
)
.where(WorkflowModel.workflow_permanent_id.in_(workflow_permanent_ids))
.where(WorkflowModel.deleted_at.is_(None))
.group_by(
WorkflowModel.workflow_permanent_id,
)
.subquery()
)
main_query = select(WorkflowModel).join(
subquery,
(WorkflowModel.workflow_permanent_id == subquery.c.workflow_permanent_id)
& (WorkflowModel.version == subquery.c.max_version),
)
if organization_id:
main_query = main_query.where(WorkflowModel.organization_id == organization_id)
if title:
main_query = main_query.where(WorkflowModel.title.ilike(f"%{title}%"))
if statuses:
main_query = main_query.where(WorkflowModel.status.in_(statuses))
main_query = (
main_query.order_by(WorkflowModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
)
workflows = (await session.scalars(main_query)).all()
return [convert_to_workflow(workflow, self.debug_enabled) for workflow in workflows]
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
async def get_workflows_by_organization_id( async def get_workflows_by_organization_id(
self, self,
organization_id: str, organization_id: str,

View file

@ -60,6 +60,7 @@ from skyvern.forge.sdk.services import observer_service, org_auth_service
from skyvern.forge.sdk.workflow.exceptions import ( from skyvern.forge.sdk.workflow.exceptions import (
FailedToCreateWorkflow, FailedToCreateWorkflow,
FailedToUpdateWorkflow, FailedToUpdateWorkflow,
InvalidTemplateWorkflowPermanentId,
WorkflowParameterMissingRequiredValue, WorkflowParameterMissingRequiredValue,
) )
from skyvern.forge.sdk.workflow.models.workflow import ( from skyvern.forge.sdk.workflow.models.workflow import (
@ -635,12 +636,18 @@ async def execute_workflow(
workflow_request: WorkflowRequestBody, workflow_request: WorkflowRequestBody,
version: int | None = None, version: int | None = None,
current_org: Organization = Depends(org_auth_service.get_current_org), current_org: Organization = Depends(org_auth_service.get_current_org),
template: bool = Query(False),
x_api_key: Annotated[str | None, Header()] = None, x_api_key: Annotated[str | None, Header()] = None,
x_max_steps_override: Annotated[int | None, Header()] = None, x_max_steps_override: Annotated[int | None, Header()] = None,
) -> RunWorkflowResponse: ) -> RunWorkflowResponse:
analytics.capture("skyvern-oss-agent-workflow-execute") analytics.capture("skyvern-oss-agent-workflow-execute")
context = skyvern_context.ensure_context() context = skyvern_context.ensure_context()
request_id = context.request_id request_id = context.request_id
if template:
if workflow_id not in await app.STORAGE.retrieve_global_workflows():
raise InvalidTemplateWorkflowPermanentId(workflow_permanent_id=workflow_id)
workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run( workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run(
request_id=request_id, request_id=request_id,
workflow_request=workflow_request, workflow_request=workflow_request,
@ -648,6 +655,7 @@ async def execute_workflow(
organization_id=current_org.organization_id, organization_id=current_org.organization_id,
version=version, version=version,
max_steps_override=x_max_steps_override, max_steps_override=x_max_steps_override,
is_template_workflow=template,
) )
if x_max_steps_override: if x_max_steps_override:
LOG.info("Overriding max steps per run", max_steps_override=x_max_steps_override) LOG.info("Overriding max steps per run", max_steps_override=x_max_steps_override)
@ -914,12 +922,26 @@ async def get_workflows(
only_workflows: bool = Query(False), only_workflows: bool = Query(False),
title: str = Query(""), title: str = Query(""),
current_org: Organization = Depends(org_auth_service.get_current_org), current_org: Organization = Depends(org_auth_service.get_current_org),
template: bool = Query(False),
) -> list[Workflow]: ) -> list[Workflow]:
""" """
Get all workflows with the latest version for the organization. Get all workflows with the latest version for the organization.
""" """
analytics.capture("skyvern-oss-agent-workflows-get") analytics.capture("skyvern-oss-agent-workflows-get")
if template:
global_workflows_permanent_ids = await app.STORAGE.retrieve_global_workflows()
if not global_workflows_permanent_ids:
return []
workflows = await app.WORKFLOW_SERVICE.get_workflows_by_permanent_ids(
workflow_permanent_ids=global_workflows_permanent_ids,
page=page,
page_size=page_size,
title=title,
statuses=[WorkflowStatus.published, WorkflowStatus.draft],
)
return workflows
if only_saved_tasks and only_workflows: if only_saved_tasks and only_workflows:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
@ -943,11 +965,16 @@ async def get_workflow(
workflow_permanent_id: str, workflow_permanent_id: str,
version: int | None = None, version: int | None = None,
current_org: Organization = Depends(org_auth_service.get_current_org), current_org: Organization = Depends(org_auth_service.get_current_org),
template: bool = Query(False),
) -> Workflow: ) -> Workflow:
analytics.capture("skyvern-oss-agent-workflows-get") analytics.capture("skyvern-oss-agent-workflows-get")
if template:
if workflow_permanent_id not in await app.STORAGE.retrieve_global_workflows():
raise InvalidTemplateWorkflowPermanentId(workflow_permanent_id=workflow_permanent_id)
return await app.WORKFLOW_SERVICE.get_workflow_by_permanent_id( return await app.WORKFLOW_SERVICE.get_workflow_by_permanent_id(
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id, organization_id=None if template else current_org.organization_id,
version=version, version=version,
) )

View file

@ -704,7 +704,7 @@ async def handle_block_result(
# refresh workflow run model # refresh workflow run model
return await app.WORKFLOW_SERVICE.get_workflow_run( return await app.WORKFLOW_SERVICE.get_workflow_run(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
) )

View file

@ -124,3 +124,11 @@ class FailedToFormatJinjaStyleParameter(SkyvernException):
class NoIterableValueFound(SkyvernException): class NoIterableValueFound(SkyvernException):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__("No iterable value found for the loop block") super().__init__("No iterable value found for the loop block")
class InvalidTemplateWorkflowPermanentId(SkyvernHTTPException):
def __init__(self, workflow_permanent_id: str) -> None:
super().__init__(
message=f"Invalid template workflow permanent id: {workflow_permanent_id}. Please make sure the workflow is a valid template.",
status_code=status.HTTP_400_BAD_REQUEST,
)

View file

@ -441,9 +441,8 @@ class BaseTaskBlock(Block):
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=organization_id, organization_id=organization_id,
) )
workflow = await app.WORKFLOW_SERVICE.get_workflow( workflow = await app.WORKFLOW_SERVICE.get_workflow_by_permanent_id(
workflow_id=workflow_run.workflow_id, workflow_permanent_id=workflow_run.workflow_permanent_id,
organization_id=organization_id,
) )
# if the task url is parameterized, we need to get the value from the workflow run context # if the task url is parameterized, we need to get the value from the workflow run context
if self.url and workflow_run_context.has_parameter(self.url) and workflow_run_context.has_value(self.url): if self.url and workflow_run_context.has_parameter(self.url) and workflow_run_context.has_value(self.url):
@ -512,12 +511,12 @@ class BaseTaskBlock(Block):
workflow_run_block = await app.DATABASE.update_workflow_run_block( workflow_run_block = await app.DATABASE.update_workflow_run_block(
workflow_run_block_id=workflow_run_block_id, workflow_run_block_id=workflow_run_block_id,
task_id=task.task_id, task_id=task.task_id,
organization_id=workflow.organization_id, organization_id=organization_id,
) )
current_running_task = task current_running_task = task
organization = await app.DATABASE.get_organization(organization_id=workflow.organization_id) organization = await app.DATABASE.get_organization(organization_id=workflow_run.organization_id)
if not organization: if not organization:
raise Exception(f"Organization is missing organization_id={workflow.organization_id}") raise Exception(f"Organization is missing organization_id={workflow_run.organization_id}")
browser_state: BrowserState | None = None browser_state: BrowserState | None = None
if is_first_task: if is_first_task:
@ -544,7 +543,7 @@ class BaseTaskBlock(Block):
await app.DATABASE.update_task( await app.DATABASE.update_task(
task.task_id, task.task_id,
status=TaskStatus.failed, status=TaskStatus.failed,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
failure_reason=str(e), failure_reason=str(e),
) )
raise e raise e
@ -569,7 +568,7 @@ class BaseTaskBlock(Block):
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
task_id=task.task_id, task_id=task.task_id,
workflow_id=workflow.workflow_id, workflow_id=workflow.workflow_id,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
step_id=step.step_id, step_id=step.step_id,
) )
try: try:
@ -578,7 +577,7 @@ class BaseTaskBlock(Block):
await app.DATABASE.update_task( await app.DATABASE.update_task(
task.task_id, task.task_id,
status=TaskStatus.failed, status=TaskStatus.failed,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
failure_reason=str(e), failure_reason=str(e),
) )
raise e raise e
@ -597,13 +596,15 @@ class BaseTaskBlock(Block):
await app.DATABASE.update_task( await app.DATABASE.update_task(
task.task_id, task.task_id,
status=TaskStatus.failed, status=TaskStatus.failed,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
failure_reason=str(e), failure_reason=str(e),
) )
raise e raise e
# Check task status # Check task status
updated_task = await app.DATABASE.get_task(task_id=task.task_id, organization_id=workflow.organization_id) updated_task = await app.DATABASE.get_task(
task_id=task.task_id, organization_id=workflow_run.organization_id
)
if not updated_task: if not updated_task:
raise TaskNotFound(task.task_id) raise TaskNotFound(task.task_id)
if not updated_task.status.is_final(): if not updated_task.status.is_final():
@ -624,7 +625,7 @@ class BaseTaskBlock(Block):
task_status=updated_task.status, task_status=updated_task.status,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow.workflow_id, workflow_id=workflow.workflow_id,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
) )
success = updated_task.status == TaskStatus.completed success = updated_task.status == TaskStatus.completed
task_output = TaskOutput.from_task(updated_task) task_output = TaskOutput.from_task(updated_task)
@ -645,7 +646,7 @@ class BaseTaskBlock(Block):
task_status=updated_task.status, task_status=updated_task.status,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow.workflow_id, workflow_id=workflow.workflow_id,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
) )
return await self.build_block_result( return await self.build_block_result(
success=False, success=False,
@ -662,7 +663,7 @@ class BaseTaskBlock(Block):
task_status=updated_task.status, task_status=updated_task.status,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow.workflow_id, workflow_id=workflow.workflow_id,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
) )
return await self.build_block_result( return await self.build_block_result(
success=False, success=False,
@ -683,7 +684,7 @@ class BaseTaskBlock(Block):
status=updated_task.status, status=updated_task.status,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow.workflow_id, workflow_id=workflow.workflow_id,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
current_retry=current_retry, current_retry=current_retry,
max_retries=self.max_retries, max_retries=self.max_retries,
task_output=task_output.model_dump_json(), task_output=task_output.model_dump_json(),

View file

@ -93,6 +93,7 @@ class WorkflowService:
workflow_request: WorkflowRequestBody, workflow_request: WorkflowRequestBody,
workflow_permanent_id: str, workflow_permanent_id: str,
organization_id: str, organization_id: str,
is_template_workflow: bool = False,
version: int | None = None, version: int | None = None,
max_steps_override: int | None = None, max_steps_override: int | None = None,
) -> WorkflowRun: ) -> WorkflowRun:
@ -109,7 +110,7 @@ class WorkflowService:
# Validate the workflow and the organization # Validate the workflow and the organization
workflow = await self.get_workflow_by_permanent_id( workflow = await self.get_workflow_by_permanent_id(
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id, organization_id=None if is_template_workflow else organization_id,
version=version, version=version,
) )
if workflow is None: if workflow is None:
@ -125,7 +126,7 @@ class WorkflowService:
workflow_request=workflow_request, workflow_request=workflow_request,
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
workflow_id=workflow_id, workflow_id=workflow_id,
organization_id=workflow.organization_id, organization_id=organization_id,
) )
LOG.info( LOG.info(
f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}", f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}",
@ -202,7 +203,7 @@ class WorkflowService:
browser_session_id=browser_session_id, browser_session_id=browser_session_id,
) )
workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id) workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id)
workflow = await self.get_workflow(workflow_id=workflow_run.workflow_id, organization_id=organization_id) workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id)
# Set workflow run status to running, create workflow run parameters # Set workflow run status to running, create workflow run parameters
await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id)
@ -520,6 +521,24 @@ class WorkflowService:
raise WorkflowNotFound(workflow_permanent_id=workflow_permanent_id, version=version) raise WorkflowNotFound(workflow_permanent_id=workflow_permanent_id, version=version)
return workflow return workflow
async def get_workflows_by_permanent_ids(
self,
workflow_permanent_ids: list[str],
organization_id: str | None = None,
page: int = 1,
page_size: int = 10,
title: str = "",
statuses: list[WorkflowStatus] | None = None,
) -> list[Workflow]:
return await app.DATABASE.get_workflows_by_permanent_ids(
workflow_permanent_ids,
organization_id=organization_id,
page=page,
page_size=page_size,
title=title,
statuses=statuses,
)
async def get_workflows_by_organization_id( async def get_workflows_by_organization_id(
self, self,
organization_id: str, organization_id: str,
@ -864,7 +883,7 @@ class WorkflowService:
organization_id: str, organization_id: str,
include_cost: bool = False, include_cost: bool = False,
) -> WorkflowRunStatusResponse: ) -> WorkflowRunStatusResponse:
workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id, organization_id=organization_id) workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id)
if workflow is None: if workflow is None:
LOG.error(f"Workflow {workflow_permanent_id} not found") LOG.error(f"Workflow {workflow_permanent_id} not found")
raise WorkflowNotFound(workflow_permanent_id=workflow_permanent_id) raise WorkflowNotFound(workflow_permanent_id=workflow_permanent_id)
@ -903,7 +922,9 @@ class WorkflowService:
try: try:
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
downloaded_file_urls = await app.STORAGE.get_downloaded_files( downloaded_file_urls = await app.STORAGE.get_downloaded_files(
organization_id=workflow.organization_id, task_id=None, workflow_run_id=workflow_run.workflow_run_id organization_id=workflow_run.organization_id,
task_id=None,
workflow_run_id=workflow_run.workflow_run_id,
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
LOG.warning( LOG.warning(
@ -989,7 +1010,7 @@ class WorkflowService:
await self.persist_debug_artifacts(browser_state, tasks[-1], workflow, workflow_run) await self.persist_debug_artifacts(browser_state, tasks[-1], workflow, workflow_run)
if workflow.persist_browser_session and browser_state.browser_artifacts.browser_session_dir: if workflow.persist_browser_session and browser_state.browser_artifacts.browser_session_dir:
await app.STORAGE.store_browser_session( await app.STORAGE.store_browser_session(
workflow.organization_id, workflow_run.organization_id,
workflow.workflow_permanent_id, workflow.workflow_permanent_id,
browser_state.browser_artifacts.browser_session_dir, browser_state.browser_artifacts.browser_session_dir,
) )
@ -1000,7 +1021,7 @@ class WorkflowService:
try: try:
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT): async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
await app.STORAGE.save_downloaded_files( await app.STORAGE.save_downloaded_files(
workflow.organization_id, task_id=None, workflow_run_id=workflow_run.workflow_run_id workflow_run.organization_id, task_id=None, workflow_run_id=workflow_run.workflow_run_id
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
LOG.warning( LOG.warning(
@ -1106,7 +1127,7 @@ class WorkflowService:
for video_artifact in video_artifacts: for video_artifact in video_artifacts:
await app.ARTIFACT_MANAGER.update_artifact_data( await app.ARTIFACT_MANAGER.update_artifact_data(
artifact_id=video_artifact.video_artifact_id, artifact_id=video_artifact.video_artifact_id,
organization_id=workflow.organization_id, organization_id=workflow_run.organization_id,
data=video_artifact.video_data, data=video_artifact.video_data,
) )