do not auto publish workflow with observer (#1640)

This commit is contained in:
Shuchang Zheng 2025-01-25 04:08:51 +08:00 committed by GitHub
parent d41bae2383
commit 4db5906d04
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 82 additions and 3 deletions

View file

@ -0,0 +1,33 @@
"""Add status to workflow table
Revision ID: 26dc22efaf0b
Revises: 3a37869686bd
Create Date: 2025-01-24 20:03:14.509740+00:00
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "26dc22efaf0b"
down_revision: Union[str, None] = "3a37869686bd"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("workflows", sa.Column("status", sa.String(), nullable=True))
op.execute("UPDATE workflows SET status = 'published'")
op.alter_column("workflows", "status", nullable=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("workflows", "status")
# ### end Alembic commands ###

View file

@ -81,6 +81,7 @@ from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRunOutputParameter, WorkflowRunOutputParameter,
WorkflowRunParameter, WorkflowRunParameter,
WorkflowRunStatus, WorkflowRunStatus,
WorkflowStatus,
) )
from skyvern.webeye.actions.actions import Action from skyvern.webeye.actions.actions import Action
from skyvern.webeye.actions.models import AgentStepOutput from skyvern.webeye.actions.models import AgentStepOutput
@ -1090,6 +1091,7 @@ class AgentDB:
workflow_permanent_id: str | None = None, workflow_permanent_id: str | None = None,
version: int | None = None, version: int | None = None,
is_saved_task: bool = False, is_saved_task: bool = False,
status: WorkflowStatus = WorkflowStatus.published,
) -> Workflow: ) -> Workflow:
async with self.Session() as session: async with self.Session() as session:
workflow = WorkflowModel( workflow = WorkflowModel(
@ -1103,6 +1105,7 @@ class AgentDB:
totp_identifier=totp_identifier, totp_identifier=totp_identifier,
persist_browser_session=persist_browser_session, persist_browser_session=persist_browser_session,
is_saved_task=is_saved_task, is_saved_task=is_saved_task,
status=status,
) )
if workflow_permanent_id: if workflow_permanent_id:
workflow.workflow_permanent_id = workflow_permanent_id workflow.workflow_permanent_id = workflow_permanent_id
@ -1177,6 +1180,7 @@ class AgentDB:
only_saved_tasks: bool = False, only_saved_tasks: bool = False,
only_workflows: bool = False, only_workflows: bool = False,
title: str = "", title: str = "",
statuses: list[WorkflowStatus] | None = None,
) -> list[Workflow]: ) -> list[Workflow]:
""" """
Get all workflows with the latest version for the organization. Get all workflows with the latest version for the organization.
@ -1212,6 +1216,8 @@ class AgentDB:
main_query = main_query.where(WorkflowModel.is_saved_task.is_(False)) main_query = main_query.where(WorkflowModel.is_saved_task.is_(False))
if title: if title:
main_query = main_query.where(WorkflowModel.title.ilike(f"%{title}%")) main_query = main_query.where(WorkflowModel.title.ilike(f"%{title}%"))
if statuses:
main_query = main_query.where(WorkflowModel.status.in_(statuses))
main_query = ( main_query = (
main_query.order_by(WorkflowModel.created_at.desc()).limit(page_size).offset(db_page * page_size) main_query.order_by(WorkflowModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
) )

View file

@ -207,6 +207,7 @@ class WorkflowModel(Base):
totp_verification_url = Column(String) totp_verification_url = Column(String)
totp_identifier = Column(String) totp_identifier = Column(String)
persist_browser_session = Column(Boolean, default=False, nullable=False) persist_browser_session = Column(Boolean, default=False, nullable=False)
status = Column(String, nullable=False, default="published")
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column( modified_at = Column(

View file

@ -43,6 +43,7 @@ from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRunOutputParameter, WorkflowRunOutputParameter,
WorkflowRunParameter, WorkflowRunParameter,
WorkflowRunStatus, WorkflowRunStatus,
WorkflowStatus,
) )
LOG = structlog.get_logger() LOG = structlog.get_logger()
@ -187,6 +188,7 @@ def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = Fal
created_at=workflow_model.created_at, created_at=workflow_model.created_at,
modified_at=workflow_model.modified_at, modified_at=workflow_model.modified_at,
deleted_at=workflow_model.deleted_at, deleted_at=workflow_model.deleted_at,
status=WorkflowStatus(workflow_model.status),
) )

View file

@ -69,6 +69,7 @@ from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRun, WorkflowRun,
WorkflowRunStatus, WorkflowRunStatus,
WorkflowRunStatusResponse, WorkflowRunStatusResponse,
WorkflowStatus,
) )
from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest
from skyvern.webeye.actions.actions import Action from skyvern.webeye.actions.actions import Action
@ -932,6 +933,7 @@ async def get_workflows(
only_saved_tasks=only_saved_tasks, only_saved_tasks=only_saved_tasks,
only_workflows=only_workflows, only_workflows=only_workflows,
title=title, title=title,
statuses=[WorkflowStatus.published, WorkflowStatus.draft],
) )
@ -1158,6 +1160,7 @@ async def observer_task(
totp_verification_url=data.totp_verification_url, totp_verification_url=data.totp_verification_url,
webhook_callback_url=data.webhook_callback_url, webhook_callback_url=data.webhook_callback_url,
proxy_location=data.proxy_location, proxy_location=data.proxy_location,
publish_workflow=data.publish_workflow,
) )
except LLMProviderError: except LLMProviderError:
LOG.error("LLM failure to initialize observer cruise", exc_info=True) LOG.error("LLM failure to initialize observer cruise", exc_info=True)

View file

@ -113,6 +113,7 @@ class ObserverTaskRequest(BaseModel):
totp_verification_url: str | None = None totp_verification_url: str | None = None
totp_identifier: str | None = None totp_identifier: str | None = None
proxy_location: ProxyLocation | None = None proxy_location: ProxyLocation | None = None
publish_workflow: bool = False
@field_validator("url", "webhook_callback_url", "totp_verification_url") @field_validator("url", "webhook_callback_url", "totp_verification_url")
@classmethod @classmethod

View file

@ -36,7 +36,13 @@ from skyvern.forge.sdk.workflow.models.block import (
TaskBlock, TaskBlock,
) )
from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, ContextParameter from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, ContextParameter
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRequestBody, WorkflowRun, WorkflowRunStatus from skyvern.forge.sdk.workflow.models.workflow import (
Workflow,
WorkflowRequestBody,
WorkflowRun,
WorkflowRunStatus,
WorkflowStatus,
)
from skyvern.forge.sdk.workflow.models.yaml import ( from skyvern.forge.sdk.workflow.models.yaml import (
BLOCK_YAML_TYPES, BLOCK_YAML_TYPES,
PARAMETER_YAML_TYPES, PARAMETER_YAML_TYPES,
@ -87,6 +93,7 @@ async def initialize_observer_cruise(
totp_identifier: str | None = None, totp_identifier: str | None = None,
totp_verification_url: str | None = None, totp_verification_url: str | None = None,
webhook_callback_url: str | None = None, webhook_callback_url: str | None = None,
publish_workflow: bool = False,
) -> ObserverTask: ) -> ObserverTask:
observer_cruise = await app.DATABASE.create_observer_cruise( observer_cruise = await app.DATABASE.create_observer_cruise(
prompt=user_prompt, prompt=user_prompt,
@ -127,8 +134,12 @@ async def initialize_observer_cruise(
# create workflow and workflow run # create workflow and workflow run
max_steps_override = 10 max_steps_override = 10
try: try:
workflow_status = WorkflowStatus.published if publish_workflow else WorkflowStatus.auto_generated
new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow( new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow(
organization, metadata.workflow_title, proxy_location=proxy_location organization,
metadata.workflow_title,
proxy_location=proxy_location,
status=workflow_status,
) )
workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run( workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run(
request_id=None, request_id=None,
@ -519,6 +530,7 @@ async def run_observer_cruise_helper(
description=workflow.description, description=workflow.description,
proxy_location=observer_cruise.proxy_location or ProxyLocation.RESIDENTIAL, proxy_location=observer_cruise.proxy_location or ProxyLocation.RESIDENTIAL,
workflow_definition=workflow_definition_yaml, workflow_definition=workflow_definition_yaml,
status=workflow.status,
) )
LOG.info("Creating workflow from request", workflow_create_request=workflow_create_request) LOG.info("Creating workflow from request", workflow_create_request=workflow_create_request)
workflow = await app.WORKFLOW_SERVICE.create_workflow_from_request( workflow = await app.WORKFLOW_SERVICE.create_workflow_from_request(

View file

@ -50,6 +50,12 @@ class WorkflowDefinition(BaseModel):
raise WorkflowDefinitionHasDuplicateBlockLabels(duplicate_labels) raise WorkflowDefinitionHasDuplicateBlockLabels(duplicate_labels)
class WorkflowStatus(StrEnum):
published = "published"
draft = "draft"
auto_generated = "auto_generated"
class Workflow(BaseModel): class Workflow(BaseModel):
workflow_id: str workflow_id: str
organization_id: str organization_id: str
@ -64,6 +70,7 @@ class Workflow(BaseModel):
totp_verification_url: str | None = None totp_verification_url: str | None = None
totp_identifier: str | None = None totp_identifier: str | None = None
persist_browser_session: bool = False persist_browser_session: bool = False
status: WorkflowStatus = WorkflowStatus.published
created_at: datetime created_at: datetime
modified_at: datetime modified_at: datetime

View file

@ -7,6 +7,7 @@ from skyvern.config import settings
from skyvern.forge.sdk.schemas.tasks import ProxyLocation from skyvern.forge.sdk.schemas.tasks import ProxyLocation
from skyvern.forge.sdk.workflow.models.block import BlockType, FileType from skyvern.forge.sdk.workflow.models.block import BlockType, FileType
from skyvern.forge.sdk.workflow.models.parameter import ParameterType, WorkflowParameterType from skyvern.forge.sdk.workflow.models.parameter import ParameterType, WorkflowParameterType
from skyvern.forge.sdk.workflow.models.workflow import WorkflowStatus
class ParameterYAML(BaseModel, abc.ABC): class ParameterYAML(BaseModel, abc.ABC):
@ -370,3 +371,4 @@ class WorkflowCreateYAMLRequest(BaseModel):
persist_browser_session: bool = False persist_browser_session: bool = False
workflow_definition: WorkflowDefinitionYAML workflow_definition: WorkflowDefinitionYAML
is_saved_task: bool = False is_saved_task: bool = False
status: WorkflowStatus = WorkflowStatus.published

View file

@ -73,6 +73,7 @@ from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRunParameter, WorkflowRunParameter,
WorkflowRunStatus, WorkflowRunStatus,
WorkflowRunStatusResponse, WorkflowRunStatusResponse,
WorkflowStatus,
) )
from skyvern.forge.sdk.workflow.models.yaml import ( from skyvern.forge.sdk.workflow.models.yaml import (
BLOCK_YAML_TYPES, BLOCK_YAML_TYPES,
@ -478,6 +479,7 @@ class WorkflowService:
workflow_permanent_id: str | None = None, workflow_permanent_id: str | None = None,
version: int | None = None, version: int | None = None,
is_saved_task: bool = False, is_saved_task: bool = False,
status: WorkflowStatus = WorkflowStatus.published,
) -> Workflow: ) -> Workflow:
return await app.DATABASE.create_workflow( return await app.DATABASE.create_workflow(
title=title, title=title,
@ -492,6 +494,7 @@ class WorkflowService:
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
version=version, version=version,
is_saved_task=is_saved_task, is_saved_task=is_saved_task,
status=status,
) )
async def get_workflow(self, workflow_id: str, organization_id: str | None = None) -> Workflow: async def get_workflow(self, workflow_id: str, organization_id: str | None = None) -> Workflow:
@ -525,6 +528,7 @@ class WorkflowService:
only_saved_tasks: bool = False, only_saved_tasks: bool = False,
only_workflows: bool = False, only_workflows: bool = False,
title: str = "", title: str = "",
statuses: list[WorkflowStatus] | None = None,
) -> list[Workflow]: ) -> list[Workflow]:
""" """
Get all workflows with the latest version for the organization. Get all workflows with the latest version for the organization.
@ -536,6 +540,7 @@ class WorkflowService:
only_saved_tasks=only_saved_tasks, only_saved_tasks=only_saved_tasks,
only_workflows=only_workflows, only_workflows=only_workflows,
title=title, title=title,
statuses=statuses,
) )
async def update_workflow( async def update_workflow(
@ -1203,6 +1208,7 @@ class WorkflowService:
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
version=existing_version + 1, version=existing_version + 1,
is_saved_task=request.is_saved_task, is_saved_task=request.is_saved_task,
status=request.status,
) )
else: else:
workflow = await self.create_workflow( workflow = await self.create_workflow(
@ -1216,6 +1222,7 @@ class WorkflowService:
totp_identifier=request.totp_identifier, totp_identifier=request.totp_identifier,
persist_browser_session=request.persist_browser_session, persist_browser_session=request.persist_browser_session,
is_saved_task=request.is_saved_task, is_saved_task=request.is_saved_task,
status=request.status,
) )
# Keeping track of the new workflow id to delete it if an error occurs during the creation process # Keeping track of the new workflow id to delete it if an error occurs during the creation process
new_workflow_id = workflow.workflow_id new_workflow_id = workflow.workflow_id
@ -1707,7 +1714,11 @@ class WorkflowService:
raise ValueError(f"Invalid block type {block_yaml.block_type}") raise ValueError(f"Invalid block type {block_yaml.block_type}")
async def create_empty_workflow( async def create_empty_workflow(
self, organization: Organization, title: str, proxy_location: ProxyLocation | None = None self,
organization: Organization,
title: str,
proxy_location: ProxyLocation | None = None,
status: WorkflowStatus = WorkflowStatus.published,
) -> Workflow: ) -> Workflow:
""" """
Create a blank workflow with no blocks Create a blank workflow with no blocks
@ -1720,6 +1731,7 @@ class WorkflowService:
blocks=[], blocks=[],
), ),
proxy_location=proxy_location, proxy_location=proxy_location,
status=status,
) )
return await app.WORKFLOW_SERVICE.create_workflow_from_request( return await app.WORKFLOW_SERVICE.create_workflow_from_request(
organization=organization, organization=organization,