remove workflow_permanent_id from projects table + add use_cache and cache_project_id to workflows table (#3090)
Some checks failed
Run tests and pre-commit / Run tests and pre-commit hooks (push) Has been cancelled
Run tests and pre-commit / Frontend Lint and Build (push) Has been cancelled
Publish Fern Docs / run (push) Has been cancelled

This commit is contained in:
Shuchang Zheng 2025-08-01 17:07:08 -07:00 committed by GitHub
parent 12ee2bf9b0
commit d4bdca174f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 67 additions and 26 deletions

View file

@ -0,0 +1,41 @@
"""Remove workflow_permanent_id from projects table and add project_id to workflows table
Revision ID: 1eedd7a957d1
Revises: 2e58ef1b3d8b
Create Date: 2025-08-01 23:06:18.433869+00:00
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "1eedd7a957d1"
down_revision: Union[str, None] = "2e58ef1b3d8b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("project_org_wpid_index"), table_name="projects")
op.drop_column("projects", "workflow_permanent_id")
op.add_column("workflows", sa.Column("use_cache", sa.Boolean(), nullable=False, server_default=sa.false()))
op.execute("UPDATE workflows SET use_cache = FALSE")
op.alter_column("workflows", "use_cache", server_default=None)
op.add_column("workflows", sa.Column("cache_project_id", sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("workflows", "cache_project_id")
op.drop_column("workflows", "use_cache")
op.add_column("projects", sa.Column("workflow_permanent_id", sa.VARCHAR(), autoincrement=False, nullable=True))
op.create_index(
op.f("project_org_wpid_index"), "projects", ["organization_id", "workflow_permanent_id"], unique=False
)
# ### end Alembic commands ###

View file

@ -1313,6 +1313,8 @@ class AgentDB:
version: int | None = None, version: int | None = None,
is_saved_task: bool = False, is_saved_task: bool = False,
status: WorkflowStatus = WorkflowStatus.published, status: WorkflowStatus = WorkflowStatus.published,
use_cache: bool = False,
cache_project_id: str | None = None,
) -> Workflow: ) -> Workflow:
async with self.Session() as session: async with self.Session() as session:
workflow = WorkflowModel( workflow = WorkflowModel(
@ -1330,6 +1332,8 @@ class AgentDB:
model=model, model=model,
is_saved_task=is_saved_task, is_saved_task=is_saved_task,
status=status, status=status,
use_cache=use_cache,
cache_project_id=cache_project_id,
) )
if workflow_permanent_id: if workflow_permanent_id:
workflow.workflow_permanent_id = workflow_permanent_id workflow.workflow_permanent_id = workflow_permanent_id
@ -1508,6 +1512,8 @@ class AgentDB:
description: str | None = None, description: str | None = None,
workflow_definition: dict[str, Any] | None = None, workflow_definition: dict[str, Any] | None = None,
version: int | None = None, version: int | None = None,
use_cache: bool | None = None,
cache_project_id: str | None = None,
) -> Workflow: ) -> Workflow:
try: try:
async with self.Session() as session: async with self.Session() as session:
@ -1517,14 +1523,18 @@ class AgentDB:
if organization_id: if organization_id:
get_workflow_query = get_workflow_query.filter_by(organization_id=organization_id) get_workflow_query = get_workflow_query.filter_by(organization_id=organization_id)
if workflow := (await session.scalars(get_workflow_query)).first(): if workflow := (await session.scalars(get_workflow_query)).first():
if title: if title is not None:
workflow.title = title workflow.title = title
if description: if description is not None:
workflow.description = description workflow.description = description
if workflow_definition: if workflow_definition is not None:
workflow.workflow_definition = workflow_definition workflow.workflow_definition = workflow_definition
if version: if version is not None:
workflow.version = version workflow.version = version
if use_cache is not None:
workflow.use_cache = use_cache
if cache_project_id is not None:
workflow.cache_project_id = cache_project_id
await session.commit() await session.commit()
await session.refresh(workflow) await session.refresh(workflow)
return convert_to_workflow(workflow, self.debug_enabled) return convert_to_workflow(workflow, self.debug_enabled)
@ -3489,7 +3499,6 @@ class AgentDB:
async def create_project( async def create_project(
self, self,
organization_id: str, organization_id: str,
workflow_permanent_id: str | None = None,
run_id: str | None = None, run_id: str | None = None,
project_id: str | None = None, project_id: str | None = None,
version: int | None = None, version: int | None = None,
@ -3498,7 +3507,6 @@ class AgentDB:
async with self.Session() as session: async with self.Session() as session:
project = ProjectModel( project = ProjectModel(
organization_id=organization_id, organization_id=organization_id,
workflow_permanent_id=workflow_permanent_id,
run_id=run_id, run_id=run_id,
) )
if project_id: if project_id:
@ -3518,7 +3526,6 @@ class AgentDB:
project_revision_id: str, project_revision_id: str,
organization_id: str, organization_id: str,
artifact_id: str | None = None, artifact_id: str | None = None,
workflow_permanent_id: str | None = None,
run_id: str | None = None, run_id: str | None = None,
version: int | None = None, version: int | None = None,
) -> Project: ) -> Project:
@ -3532,8 +3539,6 @@ class AgentDB:
if project := (await session.scalars(get_project_query)).first(): if project := (await session.scalars(get_project_query)).first():
if artifact_id: if artifact_id:
project.artifact_id = artifact_id project.artifact_id = artifact_id
if workflow_permanent_id:
project.workflow_permanent_id = workflow_permanent_id
if run_id: if run_id:
project.run_id = run_id project.run_id = run_id
if version: if version:

View file

@ -236,6 +236,8 @@ class WorkflowModel(Base):
persist_browser_session = Column(Boolean, default=False, nullable=False) persist_browser_session = Column(Boolean, default=False, nullable=False)
model = Column(JSON, nullable=True) model = Column(JSON, nullable=True)
status = Column(String, nullable=False, default="published") status = Column(String, nullable=False, default="published")
use_cache = Column(Boolean, default=False, nullable=False)
cache_project_id = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column( modified_at = Column(
@ -777,7 +779,6 @@ class ProjectModel(Base):
__tablename__ = "projects" __tablename__ = "projects"
__table_args__ = ( __table_args__ = (
Index("project_org_created_at_index", "organization_id", "created_at"), Index("project_org_created_at_index", "organization_id", "created_at"),
Index("project_org_wpid_index", "organization_id", "workflow_permanent_id"),
Index("project_org_run_id_index", "organization_id", "run_id"), Index("project_org_run_id_index", "organization_id", "run_id"),
UniqueConstraint("organization_id", "project_id", "version", name="uc_org_project_version"), UniqueConstraint("organization_id", "project_id", "version", name="uc_org_project_version"),
) )
@ -785,8 +786,6 @@ class ProjectModel(Base):
project_revision_id = Column(String, primary_key=True, default=generate_project_revision_id) project_revision_id = Column(String, primary_key=True, default=generate_project_revision_id)
project_id = Column(String, default=generate_project_id, nullable=False) # User-facing, consistent across versions project_id = Column(String, default=generate_project_id, nullable=False) # User-facing, consistent across versions
organization_id = Column(String, nullable=False) organization_id = Column(String, nullable=False)
# the wpid that this project is associated with
workflow_permanent_id = Column(String, nullable=True)
# The workflow run or task run id that this project is generated # The workflow run or task run id that this project is generated
run_id = Column(String, nullable=True) run_id = Column(String, nullable=True)
version = Column(Integer, default=1, nullable=False) version = Column(Integer, default=1, nullable=False)

View file

@ -499,7 +499,6 @@ def convert_to_project(project_model: ProjectModel) -> Project:
project_revision_id=project_model.project_revision_id, project_revision_id=project_model.project_revision_id,
project_id=project_model.project_id, project_id=project_model.project_id,
organization_id=project_model.organization_id, organization_id=project_model.organization_id,
workflow_id=project_model.workflow_permanent_id,
run_id=project_model.run_id, run_id=project_model.run_id,
version=project_model.version, version=project_model.version,
created_at=project_model.created_at, created_at=project_model.created_at,

View file

@ -4,7 +4,6 @@ import hashlib
import structlog import structlog
from fastapi import Depends, HTTPException, Path, Query from fastapi import Depends, HTTPException, Path, Query
from skyvern.exceptions import WorkflowNotFound
from skyvern.forge import app from skyvern.forge import app
from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router
from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.organizations import Organization
@ -41,12 +40,6 @@ async def create_project(
organization_id=organization_id, organization_id=organization_id,
file_count=len(data.files) if data.files else 0, file_count=len(data.files) if data.files else 0,
) )
# validate workflow_id and run_id
if data.workflow_id:
if not await app.DATABASE.get_workflow_by_permanent_id(
workflow_permanent_id=data.workflow_id, organization_id=organization_id
):
raise WorkflowNotFound(workflow_permanent_id=data.workflow_id)
if data.run_id: if data.run_id:
if not await app.DATABASE.get_run(run_id=data.run_id, organization_id=organization_id): if not await app.DATABASE.get_run(run_id=data.run_id, organization_id=organization_id):
raise HTTPException(status_code=404, detail=f"Run_id {data.run_id} not found") raise HTTPException(status_code=404, detail=f"Run_id {data.run_id} not found")
@ -54,7 +47,6 @@ async def create_project(
# Create the project in the database # Create the project in the database
project = await app.DATABASE.create_project( project = await app.DATABASE.create_project(
organization_id=organization_id, organization_id=organization_id,
workflow_permanent_id=data.workflow_id,
run_id=data.run_id, run_id=data.run_id,
) )
# Process files if provided # Process files if provided
@ -72,7 +64,6 @@ async def create_project(
return CreateProjectResponse( return CreateProjectResponse(
project_id=project.project_id, project_id=project.project_id,
version=project.version, version=project.version,
workflow_id=project.workflow_id,
run_id=project.run_id, run_id=project.run_id,
file_count=file_count, file_count=file_count,
created_at=project.created_at, created_at=project.created_at,
@ -221,7 +212,6 @@ async def deploy_project(
new_version = latest_project.version + 1 new_version = latest_project.version + 1
new_project_revision = await app.DATABASE.create_project( new_project_revision = await app.DATABASE.create_project(
organization_id=current_org.organization_id, organization_id=current_org.organization_id,
workflow_permanent_id=latest_project.workflow_id,
run_id=latest_project.run_id, run_id=latest_project.run_id,
project_id=project_id, # Use the same project_id for versioning project_id=project_id, # Use the same project_id for versioning
version=new_version, version=new_version,
@ -265,7 +255,6 @@ async def deploy_project(
return CreateProjectResponse( return CreateProjectResponse(
project_id=new_project_revision.project_id, project_id=new_project_revision.project_id,
version=new_project_revision.version, version=new_project_revision.version,
workflow_id=new_project_revision.workflow_id,
run_id=new_project_revision.run_id, run_id=new_project_revision.run_id,
file_count=file_count, file_count=file_count,
created_at=new_project_revision.created_at, created_at=new_project_revision.created_at,

View file

@ -443,3 +443,5 @@ class WorkflowCreateYAMLRequest(BaseModel):
max_screenshot_scrolls: int | None = None max_screenshot_scrolls: int | None = None
extra_http_headers: dict[str, str] | None = None extra_http_headers: dict[str, str] | None = None
status: WorkflowStatus = WorkflowStatus.published status: WorkflowStatus = WorkflowStatus.published
use_cache: bool = False
cache_project_id: str | None = None

View file

@ -635,6 +635,8 @@ class WorkflowService:
is_saved_task: bool = False, is_saved_task: bool = False,
status: WorkflowStatus = WorkflowStatus.published, status: WorkflowStatus = WorkflowStatus.published,
extra_http_headers: dict[str, str] | None = None, extra_http_headers: dict[str, str] | None = None,
use_cache: bool = False,
cache_project_id: str | None = None,
) -> Workflow: ) -> Workflow:
return await app.DATABASE.create_workflow( return await app.DATABASE.create_workflow(
title=title, title=title,
@ -653,6 +655,8 @@ class WorkflowService:
is_saved_task=is_saved_task, is_saved_task=is_saved_task,
status=status, status=status,
extra_http_headers=extra_http_headers, extra_http_headers=extra_http_headers,
use_cache=use_cache,
cache_project_id=cache_project_id,
) )
async def get_workflow(self, workflow_id: str, organization_id: str | None = None) -> Workflow: async def get_workflow(self, workflow_id: str, organization_id: str | None = None) -> Workflow:
@ -1534,6 +1538,8 @@ class WorkflowService:
version=existing_version + 1, version=existing_version + 1,
is_saved_task=request.is_saved_task, is_saved_task=request.is_saved_task,
status=request.status, status=request.status,
use_cache=request.use_cache,
cache_project_id=request.cache_project_id,
) )
else: else:
workflow = await self.create_workflow( workflow = await self.create_workflow(
@ -1551,6 +1557,8 @@ class WorkflowService:
extra_http_headers=request.extra_http_headers, extra_http_headers=request.extra_http_headers,
is_saved_task=request.is_saved_task, is_saved_task=request.is_saved_task,
status=request.status, status=request.status,
use_cache=request.use_cache,
cache_project_id=request.cache_project_id,
) )
# Keeping track of the new workflow id to delete it if an error occurs during the creation process # Keeping track of the new workflow id to delete it if an error occurs during the creation process
new_workflow_id = workflow.workflow_id new_workflow_id = workflow.workflow_id

View file

@ -75,7 +75,6 @@ class DeployProjectRequest(BaseModel):
class CreateProjectResponse(BaseModel): class CreateProjectResponse(BaseModel):
project_id: str = Field(..., description="Unique project identifier", examples=["proj_abc123"]) project_id: str = Field(..., description="Unique project identifier", examples=["proj_abc123"])
version: int = Field(..., description="Project version number", examples=[1]) version: int = Field(..., description="Project version number", examples=[1])
workflow_id: str | None = Field(default=None, description="ID of the workflow this project is associated with")
run_id: str | None = Field( run_id: str | None = Field(
default=None, description="ID of the workflow run or task run that generated this project" default=None, description="ID of the workflow run or task run that generated this project"
) )
@ -90,7 +89,6 @@ class Project(BaseModel):
project_revision_id: str = Field(description="Unique identifier for this specific project revision") project_revision_id: str = Field(description="Unique identifier for this specific project revision")
project_id: str = Field(description="User-facing project identifier, consistent across versions") project_id: str = Field(description="User-facing project identifier, consistent across versions")
organization_id: str = Field(description="ID of the organization that owns this project") organization_id: str = Field(description="ID of the organization that owns this project")
workflow_id: str | None = Field(default=None, description="ID of the workflow this project is associated with")
run_id: str | None = Field( run_id: str | None = Field(
default=None, description="ID of the workflow run or task run that generated this project" default=None, description="ID of the workflow run or task run that generated this project"
) )