Add workflow_title to WorkflowRun (#1731)

This commit is contained in:
Shuchang Zheng 2025-02-06 03:10:17 +08:00 committed by GitHub
parent cc449dc9de
commit c330c6a455
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 30 additions and 11 deletions

View file

@ -1398,15 +1398,18 @@ class AgentDB:
limit = page * page_size limit = page * page_size
workflow_run_query = select(WorkflowRunModel).filter( workflow_run_query = (
WorkflowRunModel.organization_id == organization_id select(WorkflowRunModel, WorkflowModel.title)
.join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id)
.filter(WorkflowRunModel.organization_id == organization_id)
) )
if status: if status:
workflow_run_query = workflow_run_query.filter(WorkflowRunModel.status.in_(status)) workflow_run_query = workflow_run_query.filter(WorkflowRunModel.status.in_(status))
workflow_run_query = workflow_run_query.order_by(WorkflowRunModel.created_at.desc()).limit(limit) workflow_run_query = workflow_run_query.order_by(WorkflowRunModel.created_at.desc()).limit(limit)
workflow_run_query_result = (await session.scalars(workflow_run_query)).all() workflow_run_query_result = (await session.execute(workflow_run_query)).all()
workflow_runs = [ workflow_runs = [
convert_to_workflow_run(run, debug_enabled=self.debug_enabled) for run in workflow_run_query_result convert_to_workflow_run(run, workflow_title=title, debug_enabled=self.debug_enabled)
for run, title in workflow_run_query_result
] ]
task_query = ( task_query = (
@ -1453,15 +1456,19 @@ class AgentDB:
async with self.Session() as session: async with self.Session() as session:
db_page = page - 1 # offset logic is 0 based db_page = page - 1 # offset logic is 0 based
query = ( query = (
select(WorkflowRunModel) select(WorkflowRunModel, WorkflowModel.title)
.join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id)
.filter(WorkflowRunModel.organization_id == organization_id) .filter(WorkflowRunModel.organization_id == organization_id)
.filter(WorkflowRunModel.parent_workflow_run_id.is_(None)) .filter(WorkflowRunModel.parent_workflow_run_id.is_(None))
) )
if status: if status:
query = query.filter(WorkflowRunModel.status.in_(status)) query = query.filter(WorkflowRunModel.status.in_(status))
query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size) query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
workflow_runs = (await session.scalars(query)).all() workflow_runs = (await session.execute(query)).all()
return [convert_to_workflow_run(run) for run in workflow_runs] return [
convert_to_workflow_run(run, workflow_title=title, debug_enabled=self.debug_enabled)
for run, title in workflow_runs
]
except SQLAlchemyError: except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True) LOG.error("SQLAlchemyError", exc_info=True)
raise raise
@ -1478,15 +1485,21 @@ class AgentDB:
async with self.Session() as session: async with self.Session() as session:
db_page = page - 1 # offset logic is 0 based db_page = page - 1 # offset logic is 0 based
query = ( query = (
select(WorkflowRunModel) select(WorkflowRunModel, WorkflowModel.title)
.join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id)
.filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id) .filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id)
.filter(WorkflowRunModel.organization_id == organization_id) .filter(WorkflowRunModel.organization_id == organization_id)
) )
if status: if status:
query = query.filter(WorkflowRunModel.status.in_(status)) query = query.filter(WorkflowRunModel.status.in_(status))
query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size) query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
workflow_runs = (await session.scalars(query)).all() workflow_runs_and_titles_tuples = (await session.execute(query)).all()
return [convert_to_workflow_run(run) for run in workflow_runs] workflow_runs = [
convert_to_workflow_run(run, workflow_title=title, debug_enabled=self.debug_enabled)
for run, title in workflow_runs_and_titles_tuples
]
return workflow_runs
except SQLAlchemyError: except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True) LOG.error("SQLAlchemyError", exc_info=True)
raise raise

View file

@ -192,7 +192,9 @@ def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = Fal
) )
def convert_to_workflow_run(workflow_run_model: WorkflowRunModel, debug_enabled: bool = False) -> WorkflowRun: def convert_to_workflow_run(
workflow_run_model: WorkflowRunModel, workflow_title: str | None = None, debug_enabled: bool = False
) -> WorkflowRun:
if debug_enabled: if debug_enabled:
LOG.debug( LOG.debug(
"Converting WorkflowRunModel to WorkflowRun", "Converting WorkflowRunModel to WorkflowRun",
@ -215,6 +217,7 @@ def convert_to_workflow_run(workflow_run_model: WorkflowRunModel, debug_enabled:
totp_identifier=workflow_run_model.totp_identifier, totp_identifier=workflow_run_model.totp_identifier,
created_at=workflow_run_model.created_at, created_at=workflow_run_model.created_at,
modified_at=workflow_run_model.modified_at, modified_at=workflow_run_model.modified_at,
workflow_title=workflow_title,
) )

View file

@ -109,6 +109,7 @@ class WorkflowRun(BaseModel):
totp_identifier: str | None = None totp_identifier: str | None = None
failure_reason: str | None = None failure_reason: str | None = None
parent_workflow_run_id: str | None = None parent_workflow_run_id: str | None = None
workflow_title: str | None = None
created_at: datetime created_at: datetime
modified_at: datetime modified_at: datetime
@ -147,3 +148,4 @@ class WorkflowRunStatusResponse(BaseModel):
total_steps: int | None = None total_steps: int | None = None
total_cost: float | None = None total_cost: float | None = None
observer_task: ObserverTask | None = None observer_task: ObserverTask | None = None
workflow_title: str | None = None

View file

@ -1050,6 +1050,7 @@ class WorkflowService:
outputs=outputs, outputs=outputs,
total_steps=total_steps, total_steps=total_steps,
total_cost=total_cost, total_cost=total_cost,
workflow_title=workflow.title,
) )
async def clean_up_workflow( async def clean_up_workflow(