task v2 refactor part 6 - observer_cruise_id -> task_v2_id (#1817)

This commit is contained in:
Shuchang Zheng 2025-02-23 16:03:49 -08:00 committed by GitHub
parent 2d24055c36
commit ffbc95e1b4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 238 additions and 250 deletions

View file

@ -25,6 +25,7 @@ class TaskOutput(BaseModel):
class SkyvernClient: class SkyvernClient:
def __init__(self, base_url: str, credentials: str): def __init__(self, base_url: str, credentials: str):
self.base_url = base_url self.base_url = base_url
self.v2_base_url = base_url.replace("/api/v1", "/api/v2")
self.credentials = credentials self.credentials = credentials
def generate_curl_params(self, request_body: BaseModel, max_steps: int | None = None) -> tuple[dict, dict]: def generate_curl_params(self, request_body: BaseModel, max_steps: int | None = None) -> tuple[dict, dict]:
@ -54,11 +55,11 @@ class SkyvernClient:
assert "workflow_run_id" in response.json(), f"Failed to create workflow run: {response.text}" assert "workflow_run_id" in response.json(), f"Failed to create workflow run: {response.text}"
return response.json()["workflow_run_id"] return response.json()["workflow_run_id"]
def create_cruise(self, cruise_request: ObserverTaskRequest, max_steps: int | None = None) -> ObserverTask: def create_task_v2(self, task_v2_request: ObserverTaskRequest, max_steps: int | None = None) -> ObserverTask:
url = f"{self.base_url}/cruise" url = f"{self.v2_base_url}/tasks"
payload, headers = self.generate_curl_params(cruise_request, max_steps=max_steps) payload, headers = self.generate_curl_params(task_v2_request, max_steps=max_steps)
response = requests.post(url, headers=headers, data=payload) response = requests.post(url, headers=headers, data=payload)
assert "observer_cruise_id" in response.json(), f"Failed to create observer cruise: {response.text}" assert "task_id" in response.json(), f"Failed to create task v2: {response.text}"
return ObserverTask.model_validate(response.json()) return ObserverTask.model_validate(response.json())
def get_task(self, task_id: str) -> TaskResponse: def get_task(self, task_id: str) -> TaskResponse:
@ -213,7 +214,7 @@ class Evaluator:
return workflow_run_id return workflow_run_id
def queue_skyvern_cruise(self, cruise_request: ObserverTaskRequest, max_step: int | None = None) -> ObserverTask: def queue_skyvern_cruise(self, cruise_request: ObserverTaskRequest, max_step: int | None = None) -> ObserverTask:
cruise = self.client.create_cruise(cruise_request=cruise_request, max_steps=max_step) cruise = self.client.create_task_v2(task_v2_request=cruise_request, max_steps=max_step)
self._save_artifact("cruise.json", cruise.model_dump_json(indent=2).encode()) self._save_artifact("cruise.json", cruise.model_dump_json(indent=2).encode())
return cruise return cruise
@ -259,7 +260,7 @@ class Evaluator:
) )
extracted_information: list | dict[str, Any] | str | None = None extracted_information: list | dict[str, Any] | str | None = None
if workflow_run_response.observer_cruise is None: if workflow_run_response.observer_task is None:
assert workflow_run_response.outputs and len(workflow_run_response.outputs) > 0, ( assert workflow_run_response.outputs and len(workflow_run_response.outputs) > 0, (
f"Expected {workflow_pid + '/' + workflow_run_id} with output, but got empty output" f"Expected {workflow_pid + '/' + workflow_run_id} with output, but got empty output"
) )
@ -271,10 +272,10 @@ class Evaluator:
# FIXME: improve this when the last block is loop block # FIXME: improve this when the last block is loop block
extracted_information = result extracted_information = result
else: else:
workflow_run_response.observer_cruise.summary workflow_run_response.observer_task.summary
workflow_run_response.observer_cruise.output workflow_run_response.observer_task.output
summary = f"{('summary:' + workflow_run_response.observer_cruise.summary) if workflow_run_response.observer_cruise.summary else ''}" summary = f"{('summary:' + workflow_run_response.observer_task.summary) if workflow_run_response.observer_task.summary else ''}"
output = f"{('output: ' + json.dumps(workflow_run_response.observer_cruise.output)) if workflow_run_response.observer_cruise.output else ''}" output = f"{('output: ' + json.dumps(workflow_run_response.observer_task.output)) if workflow_run_response.observer_task.output else ''}"
extracted_information = "" extracted_information = ""
if summary: if summary:
extracted_information = summary extracted_information = summary

View file

@ -48,8 +48,8 @@ def main(
{ {
"workflow_permanent_id": workflow_pid, "workflow_permanent_id": workflow_pid,
"status": str(workflow_run_response.status), "status": str(workflow_run_response.status),
"summary": workflow_run_response.observer_cruise.summary, "summary": workflow_run_response.observer_task.summary,
"output": workflow_run_response.observer_cruise.output, "output": workflow_run_response.observer_task.output,
"assertion": workflow_run_response.status == WorkflowRunStatus.completed, "assertion": workflow_run_response.status == WorkflowRunStatus.completed,
"failure_reason": workflow_run_response.failure_reason or "", "failure_reason": workflow_run_response.failure_reason or "",
} }

View file

@ -12,7 +12,7 @@ from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.schemas.task_v2 import ObserverTaskRequest from skyvern.forge.sdk.schemas.task_v2 import ObserverTaskRequest
async def create_observer_cruise( async def create_task_v2(
base_url: str, base_url: str,
cred: str, cred: str,
) -> None: ) -> None:
@ -42,7 +42,7 @@ async def create_observer_cruise(
dumped_data = case_data.model_dump() dumped_data = case_data.model_dump()
dumped_data.update( dumped_data.update(
{ {
"observer_cruise_id": cruise.observer_cruise_id, "task_v2_id": cruise.observer_cruise_id,
"workflow_run_id": cruise.workflow_run_id, "workflow_run_id": cruise.workflow_run_id,
"workflow_permanent_id": cruise.workflow_permanent_id, "workflow_permanent_id": cruise.workflow_permanent_id,
"cruise_url": str(cruise.url) if cruise.url else cruise.url, "cruise_url": str(cruise.url) if cruise.url else cruise.url,
@ -59,7 +59,7 @@ def main(
base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"), base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"),
cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"), cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"),
) -> None: ) -> None:
asyncio.run(create_observer_cruise(base_url=base_url, cred=cred)) asyncio.run(create_task_v2(base_url=base_url, cred=cred))
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -33,8 +33,8 @@ async def process_record(client: SkyvernClient, one_record: dict[str, Any]) -> d
one_record.update( one_record.update(
{ {
"status": str(workflow_run_response.status), "status": str(workflow_run_response.status),
"summary": workflow_run_response.observer_cruise.summary, "summary": workflow_run_response.observer_task.summary,
"output": workflow_run_response.observer_cruise.output, "output": workflow_run_response.observer_task.output,
} }
) )
if workflow_run_response.status != WorkflowRunStatus.completed: if workflow_run_response.status != WorkflowRunStatus.completed:

View file

@ -66,23 +66,23 @@ class Agent:
api_key=org_auth_token.token if org_auth_token else None, api_key=org_auth_token.token if org_auth_token else None,
) )
async def _run_observer_task(self, organization: Organization, observer_task: ObserverTask) -> None: async def _run_task_v2(self, organization: Organization, task_v2: ObserverTask) -> None:
# mark observer cruise as queued # mark observer cruise as queued
await app.DATABASE.update_observer_cruise( await app.DATABASE.update_task_v2(
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
status=ObserverTaskStatus.queued, status=ObserverTaskStatus.queued,
organization_id=organization.organization_id, organization_id=organization.organization_id,
) )
assert observer_task.workflow_run_id assert task_v2.workflow_run_id
await app.DATABASE.update_workflow_run( await app.DATABASE.update_workflow_run(
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
status=WorkflowRunStatus.queued, status=WorkflowRunStatus.queued,
) )
await task_v2_service.run_observer_task( await task_v2_service.run_observer_task(
organization=organization, organization=organization,
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
) )
async def create_task( async def create_task(
@ -170,12 +170,12 @@ class Agent:
if not observer_task.workflow_run_id: if not observer_task.workflow_run_id:
raise Exception("Observer cruise missing workflow run id") raise Exception("Observer cruise missing workflow run id")
asyncio.create_task(self._run_observer_task(organization, observer_task)) asyncio.create_task(self._run_task_v2(organization, observer_task))
return observer_task return observer_task
async def get_observer_task_v_2(self, task_id: str) -> ObserverTask | None: async def get_observer_task_v_2(self, task_id: str) -> ObserverTask | None:
organization = await self._get_organization() organization = await self._get_organization()
return await app.DATABASE.get_observer_cruise(task_id, organization.organization_id) return await app.DATABASE.get_task_v2(task_id, organization.organization_id)
async def run_observer_task_v_2( async def run_observer_task_v_2(
self, task_request: ObserverTaskRequest, timeout_seconds: int = 600 self, task_request: ObserverTaskRequest, timeout_seconds: int = 600

View file

@ -3115,7 +3115,7 @@ class AgentClient:
publish_workflow=publish_workflow, publish_workflow=publish_workflow,
request_options=request_options, request_options=request_options,
) )
observer_cruise_id = observer_task.get("task_id") task_id = observer_task.get("task_id")
start_time = time.time() start_time = time.time()
while True: while True:
@ -3123,7 +3123,7 @@ class AgentClient:
raise TimeoutError(f"Task timed out after {timeout_seconds} seconds") raise TimeoutError(f"Task timed out after {timeout_seconds} seconds")
task = self.get_observer_task_v_2( task = self.get_observer_task_v_2(
str(observer_cruise_id), api_key=api_key, authorization=authorization, request_options=request_options str(task_id), api_key=api_key, authorization=authorization, request_options=request_options
) )
if str(task.get("status")) in ["timed_out", "failed", "terminated", "completed", "canceled"]: if str(task.get("status")) in ["timed_out", "failed", "terminated", "completed", "canceled"]:
return task return task
@ -7358,11 +7358,11 @@ class AsyncAgentClient:
publish_workflow=publish_workflow, publish_workflow=publish_workflow,
request_options=request_options, request_options=request_options,
) )
observer_cruise_id = observer_task.get("task_id") task_id = observer_task.get("task_id")
async with asyncio.timeout(timeout_seconds): async with asyncio.timeout(timeout_seconds):
while True: while True:
task = await self.get_observer_task_v_2( task = await self.get_observer_task_v_2(
str(observer_cruise_id), str(task_id),
api_key=api_key, api_key=api_key,
authorization=authorization, authorization=authorization,
request_options=request_options, request_options=request_options,

View file

@ -31,13 +31,13 @@ class FailedToSendWebhook(SkyvernException):
task_id: str | None = None, task_id: str | None = None,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
workflow_id: str | None = None, workflow_id: str | None = None,
observer_cruise_id: str | None = None, task_v2_id: str | None = None,
): ):
workflow_run_str = f"workflow_run_id={workflow_run_id}" if workflow_run_id else "" workflow_run_str = f"workflow_run_id={workflow_run_id}" if workflow_run_id else ""
workflow_str = f"workflow_id={workflow_id}" if workflow_id else "" workflow_str = f"workflow_id={workflow_id}" if workflow_id else ""
task_str = f"task_id={task_id}" if task_id else "" task_str = f"task_id={task_id}" if task_id else ""
observer_cruise_str = f"observer_cruise_id={observer_cruise_id}" if observer_cruise_id else "" task_v2_str = f"task_v2_id={task_v2_id}" if task_v2_id else ""
super().__init__(f"Failed to send webhook. {workflow_run_str} {workflow_str} {task_str} {observer_cruise_str}") super().__init__(f"Failed to send webhook. {workflow_run_str} {workflow_str} {task_str} {task_v2_str}")
class ProxyLocationNotSupportedError(SkyvernException): class ProxyLocationNotSupportedError(SkyvernException):
@ -632,9 +632,9 @@ class UrlGenerationFailure(SkyvernHTTPException):
super().__init__("Failed to generate the url for the prompt") super().__init__("Failed to generate the url for the prompt")
class ObserverCruiseNotFound(SkyvernHTTPException): class TaskV2NotFound(SkyvernHTTPException):
def __init__(self, observer_cruise_id: str) -> None: def __init__(self, task_v2_id: str) -> None:
super().__init__(f"Observer task {observer_cruise_id} not found") super().__init__(f"Task v2 {task_v2_id} not found")
class NoTOTPVerificationCodeFound(SkyvernHTTPException): class NoTOTPVerificationCodeFound(SkyvernHTTPException):

View file

@ -63,7 +63,7 @@ class LLMAPIHandlerFactory:
prompt: str, prompt: str,
prompt_name: str, prompt_name: str,
step: Step | None = None, step: Step | None = None,
observer_cruise: ObserverTask | None = None, task_v2: ObserverTask | None = None,
observer_thought: ObserverThought | None = None, observer_thought: ObserverThought | None = None,
ai_suggestion: AISuggestion | None = None, ai_suggestion: AISuggestion | None = None,
screenshots: list[bytes] | None = None, screenshots: list[bytes] | None = None,
@ -92,7 +92,7 @@ class LLMAPIHandlerFactory:
data=json.dumps(context.hashed_href_map, indent=2).encode("utf-8"), data=json.dumps(context.hashed_href_map, indent=2).encode("utf-8"),
artifact_type=ArtifactType.HASHED_HREF_MAP, artifact_type=ArtifactType.HASHED_HREF_MAP,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -102,7 +102,7 @@ class LLMAPIHandlerFactory:
artifact_type=ArtifactType.LLM_PROMPT, artifact_type=ArtifactType.LLM_PROMPT,
screenshots=screenshots, screenshots=screenshots,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
) )
messages = await llm_messages_builder(prompt, screenshots, llm_config.add_assistant_prefix) messages = await llm_messages_builder(prompt, screenshots, llm_config.add_assistant_prefix)
@ -117,7 +117,7 @@ class LLMAPIHandlerFactory:
).encode("utf-8"), ).encode("utf-8"),
artifact_type=ArtifactType.LLM_REQUEST, artifact_type=ArtifactType.LLM_REQUEST,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -144,7 +144,7 @@ class LLMAPIHandlerFactory:
data=response.model_dump_json(indent=2).encode("utf-8"), data=response.model_dump_json(indent=2).encode("utf-8"),
artifact_type=ArtifactType.LLM_RESPONSE, artifact_type=ArtifactType.LLM_RESPONSE,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -184,7 +184,7 @@ class LLMAPIHandlerFactory:
data=json.dumps(parsed_response, indent=2).encode("utf-8"), data=json.dumps(parsed_response, indent=2).encode("utf-8"),
artifact_type=ArtifactType.LLM_RESPONSE_PARSED, artifact_type=ArtifactType.LLM_RESPONSE_PARSED,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -197,7 +197,7 @@ class LLMAPIHandlerFactory:
data=json.dumps(parsed_response, indent=2).encode("utf-8"), data=json.dumps(parsed_response, indent=2).encode("utf-8"),
artifact_type=ArtifactType.LLM_RESPONSE_RENDERED, artifact_type=ArtifactType.LLM_RESPONSE_RENDERED,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -234,7 +234,7 @@ class LLMAPIHandlerFactory:
prompt: str, prompt: str,
prompt_name: str, prompt_name: str,
step: Step | None = None, step: Step | None = None,
observer_cruise: ObserverTask | None = None, task_v2: ObserverTask | None = None,
observer_thought: ObserverThought | None = None, observer_thought: ObserverThought | None = None,
ai_suggestion: AISuggestion | None = None, ai_suggestion: AISuggestion | None = None,
screenshots: list[bytes] | None = None, screenshots: list[bytes] | None = None,
@ -255,7 +255,7 @@ class LLMAPIHandlerFactory:
data=json.dumps(context.hashed_href_map, indent=2).encode("utf-8"), data=json.dumps(context.hashed_href_map, indent=2).encode("utf-8"),
artifact_type=ArtifactType.HASHED_HREF_MAP, artifact_type=ArtifactType.HASHED_HREF_MAP,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -265,7 +265,7 @@ class LLMAPIHandlerFactory:
artifact_type=ArtifactType.LLM_PROMPT, artifact_type=ArtifactType.LLM_PROMPT,
screenshots=screenshots, screenshots=screenshots,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -285,7 +285,7 @@ class LLMAPIHandlerFactory:
).encode("utf-8"), ).encode("utf-8"),
artifact_type=ArtifactType.LLM_REQUEST, artifact_type=ArtifactType.LLM_REQUEST,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -319,7 +319,7 @@ class LLMAPIHandlerFactory:
data=response.model_dump_json(indent=2).encode("utf-8"), data=response.model_dump_json(indent=2).encode("utf-8"),
artifact_type=ArtifactType.LLM_RESPONSE, artifact_type=ArtifactType.LLM_RESPONSE,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -354,7 +354,7 @@ class LLMAPIHandlerFactory:
data=json.dumps(parsed_response, indent=2).encode("utf-8"), data=json.dumps(parsed_response, indent=2).encode("utf-8"),
artifact_type=ArtifactType.LLM_RESPONSE_PARSED, artifact_type=ArtifactType.LLM_RESPONSE_PARSED,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )
@ -367,7 +367,7 @@ class LLMAPIHandlerFactory:
data=json.dumps(parsed_response, indent=2).encode("utf-8"), data=json.dumps(parsed_response, indent=2).encode("utf-8"),
artifact_type=ArtifactType.LLM_RESPONSE_RENDERED, artifact_type=ArtifactType.LLM_RESPONSE_RENDERED,
step=step, step=step,
observer_cruise=observer_cruise, task_v2=task_v2,
observer_thought=observer_thought, observer_thought=observer_thought,
ai_suggestion=ai_suggestion, ai_suggestion=ai_suggestion,
) )

View file

@ -85,7 +85,7 @@ class LLMAPIHandler(Protocol):
prompt: str, prompt: str,
prompt_name: str, prompt_name: str,
step: Step | None = None, step: Step | None = None,
observer_cruise: ObserverTask | None = None, task_v2: ObserverTask | None = None,
observer_thought: ObserverThought | None = None, observer_thought: ObserverThought | None = None,
ai_suggestion: AISuggestion | None = None, ai_suggestion: AISuggestion | None = None,
screenshots: list[bytes] | None = None, screenshots: list[bytes] | None = None,

View file

@ -30,7 +30,7 @@ class ArtifactManager:
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None, workflow_run_block_id: str | None = None,
observer_thought_id: str | None = None, observer_thought_id: str | None = None,
observer_cruise_id: str | None = None, task_v2_id: str | None = None,
ai_suggestion_id: str | None = None, ai_suggestion_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
data: bytes | None = None, data: bytes | None = None,
@ -49,7 +49,7 @@ class ArtifactManager:
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id, workflow_run_block_id=workflow_run_block_id,
observer_thought_id=observer_thought_id, observer_thought_id=observer_thought_id,
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
ai_suggestion_id=ai_suggestion_id, ai_suggestion_id=ai_suggestion_id,
) )
@ -129,28 +129,28 @@ class ArtifactManager:
artifact_type=artifact_type, artifact_type=artifact_type,
uri=uri, uri=uri,
observer_thought_id=observer_thought.observer_thought_id, observer_thought_id=observer_thought.observer_thought_id,
observer_cruise_id=observer_thought.observer_cruise_id, task_v2_id=observer_thought.observer_cruise_id,
organization_id=observer_thought.organization_id, organization_id=observer_thought.organization_id,
data=data, data=data,
path=path, path=path,
) )
async def create_observer_cruise_artifact( async def create_task_v2_artifact(
self, self,
observer_cruise: ObserverTask, task_v2: ObserverTask,
artifact_type: ArtifactType, artifact_type: ArtifactType,
data: bytes | None = None, data: bytes | None = None,
path: str | None = None, path: str | None = None,
) -> str: ) -> str:
artifact_id = generate_artifact_id() artifact_id = generate_artifact_id()
uri = app.STORAGE.build_observer_cruise_uri(artifact_id, observer_cruise, artifact_type) uri = app.STORAGE.build_task_v2_uri(artifact_id, task_v2, artifact_type)
return await self._create_artifact( return await self._create_artifact(
aio_task_primary_key=observer_cruise.observer_cruise_id, aio_task_primary_key=task_v2.observer_cruise_id,
artifact_id=artifact_id, artifact_id=artifact_id,
artifact_type=artifact_type, artifact_type=artifact_type,
uri=uri, uri=uri,
observer_cruise_id=observer_cruise.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
organization_id=observer_cruise.organization_id, organization_id=task_v2.organization_id,
data=data, data=data,
path=path, path=path,
) )
@ -203,7 +203,7 @@ class ArtifactManager:
screenshots: list[bytes] | None = None, screenshots: list[bytes] | None = None,
step: Step | None = None, step: Step | None = None,
observer_thought: ObserverThought | None = None, observer_thought: ObserverThought | None = None,
observer_cruise: ObserverTask | None = None, task_v2: ObserverTask | None = None,
ai_suggestion: AISuggestion | None = None, ai_suggestion: AISuggestion | None = None,
) -> None: ) -> None:
if step: if step:
@ -218,15 +218,15 @@ class ArtifactManager:
artifact_type=ArtifactType.SCREENSHOT_LLM, artifact_type=ArtifactType.SCREENSHOT_LLM,
data=screenshot, data=screenshot,
) )
elif observer_cruise: elif task_v2:
await self.create_observer_cruise_artifact( await self.create_task_v2_artifact(
observer_cruise=observer_cruise, task_v2=task_v2,
artifact_type=artifact_type, artifact_type=artifact_type,
data=data, data=data,
) )
for screenshot in screenshots or []: for screenshot in screenshots or []:
await self.create_observer_cruise_artifact( await self.create_task_v2_artifact(
observer_cruise=observer_cruise, task_v2=task_v2,
artifact_type=ArtifactType.SCREENSHOT_LLM, artifact_type=ArtifactType.SCREENSHOT_LLM,
data=screenshot, data=screenshot,
) )

View file

@ -55,9 +55,7 @@ class BaseStorage(ABC):
pass pass
@abstractmethod @abstractmethod
def build_observer_cruise_uri( def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str:
self, artifact_id: str, observer_cruise: ObserverTask, artifact_type: ArtifactType
) -> str:
pass pass
@abstractmethod @abstractmethod

View file

@ -46,11 +46,9 @@ class LocalStorage(BaseStorage):
file_ext = FILE_EXTENTSION_MAP[artifact_type] file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"file://{self.artifact_path}/{settings.ENV}/observers/{observer_thought.observer_cruise_id}/{observer_thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" return f"file://{self.artifact_path}/{settings.ENV}/observers/{observer_thought.observer_cruise_id}/{observer_thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
def build_observer_cruise_uri( def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str:
self, artifact_id: str, observer_cruise: ObserverTask, artifact_type: ArtifactType
) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type] file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"file://{self.artifact_path}/{settings.ENV}/observers/{observer_cruise.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" return f"file://{self.artifact_path}/{settings.ENV}/observers/{task_v2.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
def build_workflow_run_block_uri( def build_workflow_run_block_uri(
self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType

View file

@ -46,11 +46,9 @@ class S3Storage(BaseStorage):
file_ext = FILE_EXTENTSION_MAP[artifact_type] file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{settings.ENV}/observers/{observer_thought.observer_cruise_id}/{observer_thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" return f"s3://{self.bucket}/{settings.ENV}/observers/{observer_thought.observer_cruise_id}/{observer_thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
def build_observer_cruise_uri( def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str:
self, artifact_id: str, observer_cruise: ObserverTask, artifact_type: ArtifactType
) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type] file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{settings.ENV}/observers/{observer_cruise.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" return f"s3://{self.bucket}/{settings.ENV}/observers/{task_v2.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
def build_workflow_run_block_uri( def build_workflow_run_block_uri(
self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType

View file

@ -12,7 +12,7 @@ class SkyvernContext:
task_id: str | None = None task_id: str | None = None
workflow_id: str | None = None workflow_id: str | None = None
workflow_run_id: str | None = None workflow_run_id: str | None = None
observer_cruise_id: str | None = None task_v2_id: str | None = None
max_steps_override: int | None = None max_steps_override: int | None = None
tz_info: ZoneInfo | None = None tz_info: ZoneInfo | None = None
totp_codes: dict[str, str | None] = field(default_factory=dict) totp_codes: dict[str, str | None] = field(default_factory=dict)
@ -22,7 +22,7 @@ class SkyvernContext:
frame_index_map: dict[Frame, int] = field(default_factory=dict) frame_index_map: dict[Frame, int] = field(default_factory=dict)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, max_steps_override={self.max_steps_override})" return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override})"
def __str__(self) -> str: def __str__(self) -> str:
return self.__repr__() return self.__repr__()

View file

@ -210,7 +210,7 @@ class AgentDB:
task_id: str | None = None, task_id: str | None = None,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None, workflow_run_block_id: str | None = None,
observer_cruise_id: str | None = None, task_v2_id: str | None = None,
observer_thought_id: str | None = None, observer_thought_id: str | None = None,
ai_suggestion_id: str | None = None, ai_suggestion_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
@ -225,7 +225,7 @@ class AgentDB:
step_id=step_id, step_id=step_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id, workflow_run_block_id=workflow_run_block_id,
observer_cruise_id=observer_cruise_id, observer_cruise_id=task_v2_id,
observer_thought_id=observer_thought_id, observer_thought_id=observer_thought_id,
ai_suggestion_id=ai_suggestion_id, ai_suggestion_id=ai_suggestion_id,
organization_id=organization_id, organization_id=organization_id,
@ -807,9 +807,9 @@ class AgentDB:
return convert_to_organization_auth_token(auth_token) return convert_to_organization_auth_token(auth_token)
async def get_artifacts_for_observer_cruise( async def get_artifacts_for_task_v2(
self, self,
observer_cruise_id: str, task_v2_id: str,
organization_id: str | None = None, organization_id: str | None = None,
artifact_types: list[ArtifactType] | None = None, artifact_types: list[ArtifactType] | None = None,
) -> list[Artifact]: ) -> list[Artifact]:
@ -817,7 +817,7 @@ class AgentDB:
async with self.Session() as session: async with self.Session() as session:
query = ( query = (
select(ArtifactModel) select(ArtifactModel)
.filter_by(observer_cruise_id=observer_cruise_id) .filter_by(observer_cruise_id=task_v2_id)
.filter_by(organization_id=organization_id) .filter_by(organization_id=organization_id)
) )
if artifact_types: if artifact_types:
@ -894,7 +894,7 @@ class AgentDB:
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None, workflow_run_block_id: str | None = None,
observer_thought_id: str | None = None, observer_thought_id: str | None = None,
observer_cruise_id: str | None = None, task_v2_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
) -> list[Artifact]: ) -> list[Artifact]:
try: try:
@ -913,8 +913,8 @@ class AgentDB:
query = query.filter_by(workflow_run_block_id=workflow_run_block_id) query = query.filter_by(workflow_run_block_id=workflow_run_block_id)
if observer_thought_id is not None: if observer_thought_id is not None:
query = query.filter_by(observer_thought_id=observer_thought_id) query = query.filter_by(observer_thought_id=observer_thought_id)
if observer_cruise_id is not None: if task_v2_id is not None:
query = query.filter_by(observer_cruise_id=observer_cruise_id) query = query.filter_by(observer_cruise_id=task_v2_id)
if organization_id is not None: if organization_id is not None:
query = query.filter_by(organization_id=organization_id) query = query.filter_by(organization_id=organization_id)
@ -938,7 +938,7 @@ class AgentDB:
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None, workflow_run_block_id: str | None = None,
observer_thought_id: str | None = None, observer_thought_id: str | None = None,
observer_cruise_id: str | None = None, task_v2_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
) -> Artifact | None: ) -> Artifact | None:
artifacts = await self.get_artifacts_by_entity_id( artifacts = await self.get_artifacts_by_entity_id(
@ -948,7 +948,7 @@ class AgentDB:
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id, workflow_run_block_id=workflow_run_block_id,
observer_thought_id=observer_thought_id, observer_thought_id=observer_thought_id,
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
) )
return artifacts[0] if artifacts else None return artifacts[0] if artifacts else None
@ -1915,13 +1915,11 @@ class AgentDB:
await session.execute(stmt) await session.execute(stmt)
await session.commit() await session.commit()
async def delete_observer_cruise_artifacts( async def delete_task_v2_artifacts(self, task_v2_id: str, organization_id: str | None = None) -> None:
self, observer_cruise_id: str, organization_id: str | None = None
) -> None:
async with self.Session() as session: async with self.Session() as session:
stmt = delete(ArtifactModel).where( stmt = delete(ArtifactModel).where(
and_( and_(
ArtifactModel.observer_cruise_id == observer_cruise_id, ArtifactModel.observer_cruise_id == task_v2_id,
ArtifactModel.organization_id == organization_id, ArtifactModel.organization_id == organization_id,
) )
) )
@ -2130,47 +2128,43 @@ class AgentDB:
await session.execute(stmt) await session.execute(stmt)
await session.commit() await session.commit()
async def get_observer_cruise( async def get_task_v2(self, task_v2_id: str, organization_id: str | None = None) -> ObserverTask | None:
self, observer_cruise_id: str, organization_id: str | None = None
) -> ObserverTask | None:
async with self.Session() as session: async with self.Session() as session:
if observer_cruise := ( if task_v2 := (
await session.scalars( await session.scalars(
select(ObserverCruiseModel) select(ObserverCruiseModel)
.filter_by(observer_cruise_id=observer_cruise_id) .filter_by(observer_cruise_id=task_v2_id)
.filter_by(organization_id=organization_id) .filter_by(organization_id=organization_id)
) )
).first(): ).first():
return ObserverTask.model_validate(observer_cruise) return ObserverTask.model_validate(task_v2)
return None return None
async def delete_observer_thoughts_for_cruise( async def delete_observer_thoughts_for_cruise(self, task_v2_id: str, organization_id: str | None = None) -> None:
self, observer_cruise_id: str, organization_id: str | None = None
) -> None:
async with self.Session() as session: async with self.Session() as session:
stmt = delete(ObserverThoughtModel).where( stmt = delete(ObserverThoughtModel).where(
and_( and_(
ObserverThoughtModel.observer_cruise_id == observer_cruise_id, ObserverThoughtModel.observer_cruise_id == task_v2_id,
ObserverThoughtModel.organization_id == organization_id, ObserverThoughtModel.organization_id == organization_id,
) )
) )
await session.execute(stmt) await session.execute(stmt)
await session.commit() await session.commit()
async def get_observer_cruise_by_workflow_run_id( async def get_task_v2_by_workflow_run_id(
self, self,
workflow_run_id: str, workflow_run_id: str,
organization_id: str | None = None, organization_id: str | None = None,
) -> ObserverTask | None: ) -> ObserverTask | None:
async with self.Session() as session: async with self.Session() as session:
if observer_cruise := ( if task_v2 := (
await session.scalars( await session.scalars(
select(ObserverCruiseModel) select(ObserverCruiseModel)
.filter_by(organization_id=organization_id) .filter_by(organization_id=organization_id)
.filter_by(workflow_run_id=workflow_run_id) .filter_by(workflow_run_id=workflow_run_id)
) )
).first(): ).first():
return ObserverTask.model_validate(observer_cruise) return ObserverTask.model_validate(task_v2)
return None return None
async def get_observer_thought( async def get_observer_thought(
@ -2189,14 +2183,14 @@ class AgentDB:
async def get_observer_thoughts( async def get_observer_thoughts(
self, self,
observer_cruise_id: str, task_v2_id: str,
observer_thought_types: list[ObserverThoughtType] | None = None, observer_thought_types: list[ObserverThoughtType] | None = None,
organization_id: str | None = None, organization_id: str | None = None,
) -> list[ObserverThought]: ) -> list[ObserverThought]:
async with self.Session() as session: async with self.Session() as session:
query = ( query = (
select(ObserverThoughtModel) select(ObserverThoughtModel)
.filter_by(observer_cruise_id=observer_cruise_id) .filter_by(observer_cruise_id=task_v2_id)
.filter_by(organization_id=organization_id) .filter_by(organization_id=organization_id)
.order_by(ObserverThoughtModel.created_at) .order_by(ObserverThoughtModel.created_at)
) )
@ -2205,7 +2199,7 @@ class AgentDB:
observer_thoughts = (await session.scalars(query)).all() observer_thoughts = (await session.scalars(query)).all()
return [ObserverThought.model_validate(thought) for thought in observer_thoughts] return [ObserverThought.model_validate(thought) for thought in observer_thoughts]
async def create_observer_cruise( async def create_task_v2(
self, self,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
workflow_id: str | None = None, workflow_id: str | None = None,
@ -2219,7 +2213,7 @@ class AgentDB:
webhook_callback_url: str | None = None, webhook_callback_url: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
async with self.Session() as session: async with self.Session() as session:
new_observer_cruise = ObserverCruiseModel( new_task_v2 = ObserverCruiseModel(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
@ -2231,14 +2225,14 @@ class AgentDB:
webhook_callback_url=webhook_callback_url, webhook_callback_url=webhook_callback_url,
organization_id=organization_id, organization_id=organization_id,
) )
session.add(new_observer_cruise) session.add(new_task_v2)
await session.commit() await session.commit()
await session.refresh(new_observer_cruise) await session.refresh(new_task_v2)
return ObserverTask.model_validate(new_observer_cruise) return ObserverTask.model_validate(new_task_v2)
async def create_observer_thought( async def create_observer_thought(
self, self,
observer_cruise_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
workflow_id: str | None = None, workflow_id: str | None = None,
workflow_permanent_id: str | None = None, workflow_permanent_id: str | None = None,
@ -2257,7 +2251,7 @@ class AgentDB:
) -> ObserverThought: ) -> ObserverThought:
async with self.Session() as session: async with self.Session() as session:
new_observer_thought = ObserverThoughtModel( new_observer_thought = ObserverThoughtModel(
observer_cruise_id=observer_cruise_id, observer_cruise_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
@ -2331,9 +2325,9 @@ class AgentDB:
return ObserverThought.model_validate(observer_thought) return ObserverThought.model_validate(observer_thought)
raise NotFoundError(f"ObserverThought {observer_thought_id}") raise NotFoundError(f"ObserverThought {observer_thought_id}")
async def update_observer_cruise( async def update_task_v2(
self, self,
observer_cruise_id: str, task_v2_id: str,
status: ObserverTaskStatus | None = None, status: ObserverTaskStatus | None = None,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
workflow_id: str | None = None, workflow_id: str | None = None,
@ -2345,34 +2339,34 @@ class AgentDB:
organization_id: str | None = None, organization_id: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
async with self.Session() as session: async with self.Session() as session:
observer_cruise = ( task_v2 = (
await session.scalars( await session.scalars(
select(ObserverCruiseModel) select(ObserverCruiseModel)
.filter_by(observer_cruise_id=observer_cruise_id) .filter_by(observer_cruise_id=task_v2_id)
.filter_by(organization_id=organization_id) .filter_by(organization_id=organization_id)
) )
).first() ).first()
if observer_cruise: if task_v2:
if status: if status:
observer_cruise.status = status task_v2.status = status
if workflow_run_id: if workflow_run_id:
observer_cruise.workflow_run_id = workflow_run_id task_v2.workflow_run_id = workflow_run_id
if workflow_id: if workflow_id:
observer_cruise.workflow_id = workflow_id task_v2.workflow_id = workflow_id
if workflow_permanent_id: if workflow_permanent_id:
observer_cruise.workflow_permanent_id = workflow_permanent_id task_v2.workflow_permanent_id = workflow_permanent_id
if url: if url:
observer_cruise.url = url task_v2.url = url
if prompt: if prompt:
observer_cruise.prompt = prompt task_v2.prompt = prompt
if summary: if summary:
observer_cruise.summary = summary task_v2.summary = summary
if output: if output:
observer_cruise.output = output task_v2.output = output
await session.commit() await session.commit()
await session.refresh(observer_cruise) await session.refresh(task_v2)
return ObserverTask.model_validate(observer_cruise) return ObserverTask.model_validate(task_v2)
raise NotFoundError(f"ObserverTask {observer_cruise_id} not found") raise NotFoundError(f"TaskV2 {task_v2_id} not found")
async def create_workflow_run_block( async def create_workflow_run_block(
self, self,

View file

@ -37,7 +37,7 @@ BITWARDEN_SENSITIVE_INFORMATION_PARAMETER_PREFIX = "bsi"
CREDENTIAL_PARAMETER_PREFIX = "cp" CREDENTIAL_PARAMETER_PREFIX = "cp"
CREDENTIAL_PREFIX = "cred" CREDENTIAL_PREFIX = "cred"
ORGANIZATION_BITWARDEN_COLLECTION_PREFIX = "obc" ORGANIZATION_BITWARDEN_COLLECTION_PREFIX = "obc"
OBSERVER_CRUISE_ID = "oc" TASK_V2_ID = "oc"
OBSERVER_THOUGHT_ID = "ot" OBSERVER_THOUGHT_ID = "ot"
ORGANIZATION_AUTH_TOKEN_PREFIX = "oat" ORGANIZATION_AUTH_TOKEN_PREFIX = "oat"
ORG_PREFIX = "o" ORG_PREFIX = "o"
@ -156,9 +156,9 @@ def generate_action_id() -> str:
return f"{ACTION_PREFIX}_{int_id}" return f"{ACTION_PREFIX}_{int_id}"
def generate_observer_cruise_id() -> str: def generate_task_v2_id() -> str:
int_id = generate_id() int_id = generate_id()
return f"{OBSERVER_CRUISE_ID}_{int_id}" return f"{TASK_V2_ID}_{int_id}"
def generate_observer_thought_id() -> str: def generate_observer_thought_id() -> str:

View file

@ -29,7 +29,6 @@ from skyvern.forge.sdk.db.id import (
generate_bitwarden_sensitive_information_parameter_id, generate_bitwarden_sensitive_information_parameter_id,
generate_credential_id, generate_credential_id,
generate_credential_parameter_id, generate_credential_parameter_id,
generate_observer_cruise_id,
generate_observer_thought_id, generate_observer_thought_id,
generate_org_id, generate_org_id,
generate_organization_auth_token_id, generate_organization_auth_token_id,
@ -40,6 +39,7 @@ from skyvern.forge.sdk.db.id import (
generate_task_generation_id, generate_task_generation_id,
generate_task_id, generate_task_id,
generate_task_run_id, generate_task_run_id,
generate_task_v2_id,
generate_totp_code_id, generate_totp_code_id,
generate_workflow_id, generate_workflow_id,
generate_workflow_parameter_id, generate_workflow_parameter_id,
@ -569,7 +569,8 @@ class ObserverCruiseModel(Base):
__tablename__ = "observer_cruises" __tablename__ = "observer_cruises"
__table_args__ = (Index("oc_org_wfr_index", "organization_id", "workflow_run_id"),) __table_args__ = (Index("oc_org_wfr_index", "organization_id", "workflow_run_id"),)
observer_cruise_id = Column(String, primary_key=True, default=generate_observer_cruise_id) # observer_cruise_id is the task_id for task v2
observer_cruise_id = Column(String, primary_key=True, default=generate_task_v2_id)
status = Column(String, nullable=False, default="created") status = Column(String, nullable=False, default="created")
organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=True) organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=True)
workflow_run_id = Column(String, ForeignKey("workflow_runs.workflow_run_id"), nullable=True) workflow_run_id = Column(String, ForeignKey("workflow_runs.workflow_run_id"), nullable=True)

View file

@ -46,12 +46,12 @@ class AsyncExecutor(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
async def execute_cruise( async def execute_task_v2(
self, self,
request: Request | None, request: Request | None,
background_tasks: BackgroundTasks | None, background_tasks: BackgroundTasks | None,
organization_id: str, organization_id: str,
observer_cruise_id: str, task_v2_id: str,
max_iterations_override: int | str | None, max_iterations_override: int | str | None,
browser_session_id: str | None, browser_session_id: str | None,
**kwargs: dict, **kwargs: dict,
@ -138,39 +138,37 @@ class BackgroundTaskExecutor(AsyncExecutor):
browser_session_id=browser_session_id, browser_session_id=browser_session_id,
) )
async def execute_cruise( async def execute_task_v2(
self, self,
request: Request | None, request: Request | None,
background_tasks: BackgroundTasks | None, background_tasks: BackgroundTasks | None,
organization_id: str, organization_id: str,
observer_cruise_id: str, task_v2_id: str,
max_iterations_override: int | str | None, max_iterations_override: int | str | None,
browser_session_id: str | None, browser_session_id: str | None,
**kwargs: dict, **kwargs: dict,
) -> None: ) -> None:
LOG.info( LOG.info(
"Executing cruise using background task executor", "Executing cruise using background task executor",
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
) )
organization = await app.DATABASE.get_organization(organization_id) organization = await app.DATABASE.get_organization(organization_id)
if organization is None: if organization is None:
raise OrganizationNotFound(organization_id) raise OrganizationNotFound(organization_id)
observer_cruise = await app.DATABASE.get_observer_cruise( task_v2 = await app.DATABASE.get_task_v2(task_v2_id=task_v2_id, organization_id=organization_id)
observer_cruise_id=observer_cruise_id, organization_id=organization_id if not task_v2 or not task_v2.workflow_run_id:
)
if not observer_cruise or not observer_cruise.workflow_run_id:
raise ValueError("No observer cruise or no workflow run associated with observer cruise") raise ValueError("No observer cruise or no workflow run associated with observer cruise")
# mark observer cruise as queued # mark observer cruise as queued
await app.DATABASE.update_observer_cruise( await app.DATABASE.update_task_v2(
observer_cruise_id, task_v2_id=task_v2_id,
status=ObserverTaskStatus.queued, status=ObserverTaskStatus.queued,
organization_id=organization_id, organization_id=organization_id,
) )
await app.DATABASE.update_workflow_run( await app.DATABASE.update_workflow_run(
workflow_run_id=observer_cruise.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
status=WorkflowRunStatus.queued, status=WorkflowRunStatus.queued,
) )
@ -178,7 +176,7 @@ class BackgroundTaskExecutor(AsyncExecutor):
background_tasks.add_task( background_tasks.add_task(
task_v2_service.run_observer_task, task_v2_service.run_observer_task,
organization=organization, organization=organization,
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
max_iterations_override=max_iterations_override, max_iterations_override=max_iterations_override,
browser_session_id=browser_session_id, browser_session_id=browser_session_id,
) )

View file

@ -32,8 +32,8 @@ def add_kv_pairs_to_msg(logger: logging.Logger, method_name: str, event_dict: Ev
event_dict["workflow_id"] = context.workflow_id event_dict["workflow_id"] = context.workflow_id
if context.workflow_run_id: if context.workflow_run_id:
event_dict["workflow_run_id"] = context.workflow_run_id event_dict["workflow_run_id"] = context.workflow_run_id
if context.observer_cruise_id: if context.task_v2_id:
event_dict["observer_cruise_id"] = context.observer_cruise_id event_dict["task_v2_id"] = context.task_v2_id
# Add env to the log # Add env to the log
event_dict["env"] = settings.ENV event_dict["env"] = settings.ENV

View file

@ -799,12 +799,12 @@ async def get_workflow_run(
include_cost=True, include_cost=True,
) )
return_dict = workflow_run_status_response.model_dump() return_dict = workflow_run_status_response.model_dump()
observer_cruise = await app.DATABASE.get_observer_cruise_by_workflow_run_id( task_v2 = await app.DATABASE.get_task_v2_by_workflow_run_id(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=current_org.organization_id, organization_id=current_org.organization_id,
) )
if observer_cruise: if task_v2:
return_dict["observer_task"] = observer_cruise.model_dump(by_alias=True) return_dict["observer_task"] = task_v2.model_dump(by_alias=True)
return return_dict return return_dict
@ -1251,11 +1251,11 @@ async def observer_task(
status_code=500, detail="Skyvern LLM failure to initialize observer cruise. Please try again later." status_code=500, detail="Skyvern LLM failure to initialize observer cruise. Please try again later."
) )
analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": observer_task.url}) analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": observer_task.url})
await AsyncExecutorFactory.get_executor().execute_cruise( await AsyncExecutorFactory.get_executor().execute_task_v2(
request=request, request=request,
background_tasks=background_tasks, background_tasks=background_tasks,
organization_id=organization.organization_id, organization_id=organization.organization_id,
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=observer_task.observer_cruise_id,
max_iterations_override=x_max_iterations_override, max_iterations_override=x_max_iterations_override,
browser_session_id=data.browser_session_id, browser_session_id=data.browser_session_id,
) )
@ -1268,10 +1268,10 @@ async def get_observer_task(
task_id: str, task_id: str,
organization: Organization = Depends(org_auth_service.get_current_org), organization: Organization = Depends(org_auth_service.get_current_org),
) -> dict[str, Any]: ) -> dict[str, Any]:
observer_task = await task_v2_service.get_observer_cruise(task_id, organization.organization_id) task_v2 = await task_v2_service.get_task_v2(task_id, organization.organization_id)
if not observer_task: if not task_v2:
raise HTTPException(status_code=404, detail=f"Observer task {task_id} not found") raise HTTPException(status_code=404, detail=f"Task v2 {task_id} not found")
return observer_task.model_dump(by_alias=True) return task_v2.model_dump(by_alias=True)
@base_router.get( @base_router.get(
@ -1374,7 +1374,7 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id:
""" """
# get observer task by workflow run id # get observer task by workflow run id
observer_task_obj = await app.DATABASE.get_observer_cruise_by_workflow_run_id( task_v2_obj = await app.DATABASE.get_task_v2_by_workflow_run_id(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=organization_id, organization_id=organization_id,
) )
@ -1397,7 +1397,7 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id:
"Block workflow run id is not set for task_v2 block", "Block workflow run id is not set for task_v2 block",
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=organization_id, organization_id=organization_id,
observer_cruise_id=observer_task_obj.observer_cruise_id if observer_task_obj else None, task_v2_id=task_v2_obj.observer_cruise_id if task_v2_obj else None,
) )
continue continue
# in the future if we want to nested taskv2 shows up as a nested block, we should not flatten the timeline # in the future if we want to nested taskv2 shows up as a nested block, we should not flatten the timeline
@ -1407,9 +1407,9 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id:
) )
final_workflow_run_block_timeline.extend(workflow_blocks) final_workflow_run_block_timeline.extend(workflow_blocks)
if observer_task_obj and observer_task_obj.observer_cruise_id: if task_v2_obj and task_v2_obj.observer_cruise_id:
observer_thought_timeline = await task_v2_service.get_observer_thought_timelines( observer_thought_timeline = await task_v2_service.get_observer_thought_timelines(
observer_cruise_id=observer_task_obj.observer_cruise_id, task_v2_id=task_v2_obj.observer_cruise_id,
organization_id=organization_id, organization_id=organization_id,
) )
final_workflow_run_block_timeline.extend(observer_thought_timeline) final_workflow_run_block_timeline.extend(observer_thought_timeline)

View file

@ -8,7 +8,7 @@ import httpx
import structlog import structlog
from sqlalchemy.exc import OperationalError from sqlalchemy.exc import OperationalError
from skyvern.exceptions import FailedToSendWebhook, ObserverCruiseNotFound, TaskTerminationError, UrlGenerationFailure from skyvern.exceptions import FailedToSendWebhook, TaskTerminationError, TaskV2NotFound, UrlGenerationFailure
from skyvern.forge import app from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.artifact.models import ArtifactType
@ -101,7 +101,7 @@ async def initialize_observer_task(
parent_workflow_run_id: str | None = None, parent_workflow_run_id: str | None = None,
create_task_run: bool = False, create_task_run: bool = False,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.create_observer_cruise( task_v2 = await app.DATABASE.create_task_v2(
prompt=user_prompt, prompt=user_prompt,
organization_id=organization.organization_id, organization_id=organization.organization_id,
totp_verification_url=totp_verification_url, totp_verification_url=totp_verification_url,
@ -112,10 +112,10 @@ async def initialize_observer_task(
# set observer cruise id in context # set observer cruise id in context
context = skyvern_context.current() context = skyvern_context.current()
if context: if context:
context.observer_cruise_id = observer_task.observer_cruise_id context.task_v2_id = task_v2.observer_cruise_id
observer_thought = await app.DATABASE.create_observer_thought( observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
organization_id=organization.organization_id, organization_id=organization.organization_id,
observer_thought_type=ObserverThoughtType.metadata, observer_thought_type=ObserverThoughtType.metadata,
observer_thought_scenario=ObserverThoughtScenario.generate_metadata, observer_thought_scenario=ObserverThoughtScenario.generate_metadata,
@ -164,8 +164,8 @@ async def initialize_observer_task(
LOG.error("Failed to setup cruise workflow run", exc_info=True) LOG.error("Failed to setup cruise workflow run", exc_info=True)
# fail the workflow run # fail the workflow run
await mark_observer_task_as_failed( await mark_observer_task_as_failed(
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
failure_reason="Skyvern failed to setup the workflow run", failure_reason="Skyvern failed to setup the workflow run",
organization_id=organization.organization_id, organization_id=organization.organization_id,
) )
@ -186,8 +186,8 @@ async def initialize_observer_task(
# update oserver cruise # update oserver cruise
try: try:
observer_task = await app.DATABASE.update_observer_cruise( task_v2 = await app.DATABASE.update_task_v2(
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=workflow_run.workflow_run_id, workflow_run_id=workflow_run.workflow_run_id,
workflow_id=new_workflow.workflow_id, workflow_id=new_workflow.workflow_id,
workflow_permanent_id=new_workflow.workflow_permanent_id, workflow_permanent_id=new_workflow.workflow_permanent_id,
@ -198,7 +198,7 @@ async def initialize_observer_task(
await app.DATABASE.create_task_run( await app.DATABASE.create_task_run(
task_run_type=TaskRunType.task_v2, task_run_type=TaskRunType.task_v2,
organization_id=organization.organization_id, organization_id=organization.organization_id,
run_id=observer_task.observer_cruise_id, run_id=task_v2.observer_cruise_id,
title=new_workflow.title, title=new_workflow.title,
url=url, url=url,
url_hash=generate_url_hash(url), url_hash=generate_url_hash(url),
@ -207,41 +207,41 @@ async def initialize_observer_task(
LOG.warning("Failed to update task 2.0", exc_info=True) LOG.warning("Failed to update task 2.0", exc_info=True)
# fail the workflow run # fail the workflow run
await mark_observer_task_as_failed( await mark_observer_task_as_failed(
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=workflow_run.workflow_run_id, workflow_run_id=workflow_run.workflow_run_id,
failure_reason="Skyvern failed to update the task 2.0 after initializing the workflow run", failure_reason="Skyvern failed to update the task 2.0 after initializing the workflow run",
organization_id=organization.organization_id, organization_id=organization.organization_id,
) )
raise raise
return observer_task return task_v2
async def run_observer_task( async def run_observer_task(
organization: Organization, organization: Organization,
observer_cruise_id: str, task_v2_id: str,
request_id: str | None = None, request_id: str | None = None,
max_iterations_override: str | int | None = None, max_iterations_override: str | int | None = None,
browser_session_id: str | None = None, browser_session_id: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
organization_id = organization.organization_id organization_id = organization.organization_id
try: try:
observer_task = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) observer_task = await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id)
except Exception: except Exception:
LOG.error( LOG.error(
"Failed to get observer task", "Failed to get observer task",
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
exc_info=True, exc_info=True,
) )
return await mark_observer_task_as_failed( return await mark_observer_task_as_failed(
observer_cruise_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
failure_reason="Failed to get task v2", failure_reason="Failed to get task v2",
) )
if not observer_task: if not observer_task:
LOG.error("Task v2 not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id) LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id)
raise ObserverCruiseNotFound(observer_cruise_id=observer_cruise_id) raise TaskV2NotFound(task_v2_id=task_v2_id)
workflow, workflow_run = None, None workflow, workflow_run = None, None
try: try:
@ -254,17 +254,17 @@ async def run_observer_task(
) )
except TaskTerminationError as e: except TaskTerminationError as e:
observer_task = await mark_observer_task_as_terminated( observer_task = await mark_observer_task_as_terminated(
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=observer_task.workflow_run_id,
organization_id=organization_id, organization_id=organization_id,
failure_reason=e.message, failure_reason=e.message,
) )
LOG.info("Task v2 is terminated", observer_cruise_id=observer_cruise_id, failure_reason=e.message) LOG.info("Task v2 is terminated", task_v2_id=task_v2_id, failure_reason=e.message)
return observer_task return observer_task
except OperationalError: except OperationalError:
LOG.error("Database error when running observer cruise", exc_info=True) LOG.error("Database error when running observer cruise", exc_info=True)
observer_task = await mark_observer_task_as_failed( observer_task = await mark_observer_task_as_failed(
observer_cruise_id, task_v2_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=observer_task.workflow_run_id,
failure_reason="Database error when running task 2.0", failure_reason="Database error when running task 2.0",
organization_id=organization_id, organization_id=organization_id,
@ -273,7 +273,7 @@ async def run_observer_task(
LOG.error("Failed to run observer cruise", exc_info=True) LOG.error("Failed to run observer cruise", exc_info=True)
failure_reason = f"Failed to run task 2.0: {str(e)}" failure_reason = f"Failed to run task 2.0: {str(e)}"
observer_task = await mark_observer_task_as_failed( observer_task = await mark_observer_task_as_failed(
observer_cruise_id, task_v2_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=observer_task.workflow_run_id,
failure_reason=failure_reason, failure_reason=failure_reason,
organization_id=organization_id, organization_id=organization_id,
@ -302,26 +302,26 @@ async def run_observer_task_helper(
browser_session_id: str | None = None, browser_session_id: str | None = None,
) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]: ) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]:
organization_id = organization.organization_id organization_id = organization.organization_id
observer_cruise_id = observer_task.observer_cruise_id task_v2_id = observer_task.observer_cruise_id
if observer_task.status != ObserverTaskStatus.queued: if observer_task.status != ObserverTaskStatus.queued:
LOG.error( LOG.error(
"Observer cruise is not queued. Duplicate observer cruise", "Task v2 is not queued. Duplicate task v2",
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
status=observer_task.status, status=observer_task.status,
organization_id=organization_id, organization_id=organization_id,
) )
return None, None, observer_task return None, None, observer_task
if not observer_task.url or not observer_task.prompt: if not observer_task.url or not observer_task.prompt:
LOG.error( LOG.error(
"Observer cruise url or prompt not found", "Task v2 url or prompt not found",
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
) )
return None, None, observer_task return None, None, observer_task
if not observer_task.workflow_run_id: if not observer_task.workflow_run_id:
LOG.error( LOG.error(
"Workflow run id not found in observer cruise", "Workflow run id not found in task v2",
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
) )
return None, None, observer_task return None, None, observer_task
@ -364,12 +364,12 @@ async def run_observer_task_helper(
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
request_id=request_id, request_id=request_id,
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
) )
) )
observer_task = await app.DATABASE.update_observer_cruise( observer_task = await app.DATABASE.update_task_v2(
observer_cruise_id=observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.running task_v2_id=task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.running
) )
await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id)
await _set_up_workflow_context(workflow_id, workflow_run_id, organization) await _set_up_workflow_context(workflow_id, workflow_run_id, organization)
@ -385,7 +385,7 @@ async def run_observer_task_helper(
# validate the task execution # validate the task execution
await app.AGENT_FUNCTION.validate_task_execution( await app.AGENT_FUNCTION.validate_task_execution(
organization_id=organization_id, organization_id=organization_id,
task_id=observer_cruise_id, task_id=task_v2_id,
task_version="v2", task_version="v2",
) )
@ -399,10 +399,10 @@ async def run_observer_task_helper(
LOG.info( LOG.info(
"Task v2 is canceled. Stopping task v2", "Task v2 is canceled. Stopping task v2",
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
observer_task_id=observer_cruise_id, task_v2_id=task_v2_id,
) )
await mark_observer_task_as_canceled( await mark_observer_task_as_canceled(
observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=organization_id, organization_id=organization_id,
) )
@ -457,7 +457,7 @@ async def run_observer_task_helper(
local_datetime=datetime.now(context.tz_info).isoformat(), local_datetime=datetime.now(context.tz_info).isoformat(),
) )
observer_thought = await app.DATABASE.create_observer_thought( observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
workflow_run_id=workflow_run.workflow_run_id, workflow_run_id=workflow_run.workflow_run_id,
workflow_id=workflow.workflow_id, workflow_id=workflow.workflow_id,
@ -516,7 +516,7 @@ async def run_observer_task_helper(
if not task_type: if not task_type:
LOG.error("No task type found in observer response", observer_response=observer_response) LOG.error("No task type found in observer response", observer_response=observer_response)
await mark_observer_task_as_failed( await mark_observer_task_as_failed(
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
failure_reason="Skyvern failed to generate a task. Please try again later.", failure_reason="Skyvern failed to generate a task. Please try again later.",
) )
@ -524,7 +524,7 @@ async def run_observer_task_helper(
if task_type == "extract": if task_type == "extract":
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
observer_cruise=observer_task, task_v2=observer_task,
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id, workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
@ -550,7 +550,7 @@ async def run_observer_task_helper(
elif task_type == "loop": elif task_type == "loop":
try: try:
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
observer_cruise=observer_task, task_v2=observer_task,
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id, workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
@ -568,7 +568,7 @@ async def run_observer_task_helper(
except Exception: except Exception:
LOG.exception("Failed to generate loop task") LOG.exception("Failed to generate loop task")
await mark_observer_task_as_failed( await mark_observer_task_as_failed(
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
failure_reason="Failed to generate the loop.", failure_reason="Failed to generate the loop.",
) )
@ -576,7 +576,7 @@ async def run_observer_task_helper(
else: else:
LOG.info("Unsupported task type", task_type=task_type) LOG.info("Unsupported task type", task_type=task_type)
await mark_observer_task_as_failed( await mark_observer_task_as_failed(
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
failure_reason=f"Unsupported task block type gets generated: {task_type}", failure_reason=f"Unsupported task block type gets generated: {task_type}",
) )
@ -594,7 +594,7 @@ async def run_observer_task_helper(
extracted_data = _get_extracted_data_from_block_result( extracted_data = _get_extracted_data_from_block_result(
block_result, block_result,
task_type, task_type,
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
) )
if extracted_data is not None: if extracted_data is not None:
@ -626,7 +626,7 @@ async def run_observer_task_helper(
# execute the extraction task # execute the extraction task
workflow_run = await handle_block_result( workflow_run = await handle_block_result(
observer_cruise_id, task_v2_id,
block, block,
block_result, block_result,
workflow, workflow,
@ -666,7 +666,7 @@ async def run_observer_task_helper(
local_datetime=datetime.now(context.tz_info).isoformat(), local_datetime=datetime.now(context.tz_info).isoformat(),
) )
observer_thought = await app.DATABASE.create_observer_thought( observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow_id, workflow_id=workflow_id,
@ -716,7 +716,7 @@ async def run_observer_task_helper(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
) )
observer_task = await mark_observer_task_as_failed( observer_task = await mark_observer_task_as_failed(
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
# TODO: add a better failure reason with LLM # TODO: add a better failure reason with LLM
failure_reason="Max iterations reached", failure_reason="Max iterations reached",
@ -727,7 +727,7 @@ async def run_observer_task_helper(
async def handle_block_result( async def handle_block_result(
observer_cruise_id: str, task_v2_id: str,
block: BlockTypeVar, block: BlockTypeVar,
block_result: BlockResult, block_result: BlockResult,
workflow: Workflow, workflow: Workflow,
@ -746,7 +746,7 @@ async def handle_block_result(
block_label=block.label, block_label=block.label,
) )
await mark_observer_task_as_canceled( await mark_observer_task_as_canceled(
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=workflow_run.organization_id, organization_id=workflow_run.organization_id,
) )
@ -815,7 +815,7 @@ async def _set_up_workflow_context(workflow_id: str, workflow_run_id: str, organ
async def _generate_loop_task( async def _generate_loop_task(
observer_cruise: ObserverTask, task_v2: ObserverTask,
workflow_id: str, workflow_id: str,
workflow_permanent_id: str, workflow_permanent_id: str,
workflow_run_id: str, workflow_run_id: str,
@ -831,8 +831,8 @@ async def _generate_loop_task(
) )
data_extraction_thought = f"Going to generate a list of values to go through based on the plan: {plan}." data_extraction_thought = f"Going to generate a list of values to go through based on the plan: {plan}."
observer_thought = await app.DATABASE.create_observer_thought( observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
organization_id=observer_cruise.organization_id, organization_id=task_v2.organization_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
@ -870,7 +870,7 @@ async def _generate_loop_task(
# execute the extraction block # execute the extraction block
extraction_block_result = await extraction_block_for_loop.execute_safe( extraction_block_result = await extraction_block_for_loop.execute_safe(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=observer_cruise.organization_id, organization_id=task_v2.organization_id,
) )
LOG.info("Extraction block result", extraction_block_result=extraction_block_result) LOG.info("Extraction block result", extraction_block_result=extraction_block_result)
if extraction_block_result.success is False: if extraction_block_result.success is False:
@ -901,7 +901,7 @@ async def _generate_loop_task(
# update the observer thought # update the observer thought
await app.DATABASE.update_observer_thought( await app.DATABASE.update_observer_thought(
observer_thought_id=observer_thought.observer_thought_id, observer_thought_id=observer_thought.observer_thought_id,
organization_id=observer_cruise.organization_id, organization_id=task_v2.organization_id,
output=output_value_obj, output=output_value_obj,
) )
@ -960,8 +960,8 @@ async def _generate_loop_task(
loop_values=loop_values, loop_values=loop_values,
) )
observer_thought_task_in_loop = await app.DATABASE.create_observer_thought( observer_thought_task_in_loop = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
organization_id=observer_cruise.organization_id, organization_id=task_v2.organization_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
@ -981,7 +981,7 @@ async def _generate_loop_task(
thought = task_in_loop_metadata_response.get("thoughts") thought = task_in_loop_metadata_response.get("thoughts")
await app.DATABASE.update_observer_thought( await app.DATABASE.update_observer_thought(
observer_thought_id=observer_thought_task_in_loop.observer_thought_id, observer_thought_id=observer_thought_task_in_loop.observer_thought_id,
organization_id=observer_cruise.organization_id, organization_id=task_v2.organization_id,
thought=thought, thought=thought,
output=task_in_loop_metadata_response, output=task_in_loop_metadata_response,
) )
@ -1046,7 +1046,7 @@ async def _generate_loop_task(
async def _generate_extraction_task( async def _generate_extraction_task(
observer_cruise: ObserverTask, task_v2: ObserverTask,
workflow_id: str, workflow_id: str,
workflow_permanent_id: str, workflow_permanent_id: str,
workflow_run_id: str, workflow_run_id: str,
@ -1067,7 +1067,7 @@ async def _generate_extraction_task(
) )
generate_extraction_task_response = await app.LLM_API_HANDLER( generate_extraction_task_response = await app.LLM_API_HANDLER(
generate_extraction_task_prompt, generate_extraction_task_prompt,
observer_cruise=observer_cruise, task_v2=task_v2,
prompt_name="task_v2_generate_extraction_task", prompt_name="task_v2_generate_extraction_task",
) )
LOG.info("Data extraction response", data_extraction_response=generate_extraction_task_response) LOG.info("Data extraction response", data_extraction_response=generate_extraction_task_response)
@ -1174,11 +1174,11 @@ def _generate_random_string(length: int = 5) -> str:
async def get_observer_thought_timelines( async def get_observer_thought_timelines(
observer_cruise_id: str, task_v2_id: str,
organization_id: str | None = None, organization_id: str | None = None,
) -> list[WorkflowRunTimeline]: ) -> list[WorkflowRunTimeline]:
observer_thoughts = await app.DATABASE.get_observer_thoughts( observer_thoughts = await app.DATABASE.get_observer_thoughts(
observer_cruise_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
observer_thought_types=[ observer_thought_types=[
ObserverThoughtType.plan, ObserverThoughtType.plan,
@ -1196,18 +1196,18 @@ async def get_observer_thought_timelines(
] ]
async def get_observer_cruise(observer_cruise_id: str, organization_id: str | None = None) -> ObserverTask | None: async def get_task_v2(task_v2_id: str, organization_id: str | None = None) -> ObserverTask | None:
return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id)
async def mark_observer_task_as_failed( async def mark_observer_task_as_failed(
observer_cruise_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
failure_reason: str | None = None, failure_reason: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.update_observer_cruise( observer_task = await app.DATABASE.update_task_v2(
observer_cruise_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
status=ObserverTaskStatus.failed, status=ObserverTaskStatus.failed,
) )
@ -1220,14 +1220,14 @@ async def mark_observer_task_as_failed(
async def mark_observer_task_as_completed( async def mark_observer_task_as_completed(
observer_cruise_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
summary: str | None = None, summary: str | None = None,
output: dict[str, Any] | None = None, output: dict[str, Any] | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.update_observer_cruise( observer_task = await app.DATABASE.update_task_v2(
observer_cruise_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
status=ObserverTaskStatus.completed, status=ObserverTaskStatus.completed,
summary=summary, summary=summary,
@ -1240,7 +1240,7 @@ async def mark_observer_task_as_completed(
duration_seconds = (datetime.now(UTC) - observer_task.created_at.replace(tzinfo=UTC)).total_seconds() duration_seconds = (datetime.now(UTC) - observer_task.created_at.replace(tzinfo=UTC)).total_seconds()
LOG.info( LOG.info(
"Observer task duration metrics", "Observer task duration metrics",
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
duration_seconds=duration_seconds, duration_seconds=duration_seconds,
observer_task_status=ObserverTaskStatus.completed, observer_task_status=ObserverTaskStatus.completed,
@ -1252,12 +1252,12 @@ async def mark_observer_task_as_completed(
async def mark_observer_task_as_canceled( async def mark_observer_task_as_canceled(
observer_cruise_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.update_observer_cruise( observer_task = await app.DATABASE.update_task_v2(
observer_cruise_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
status=ObserverTaskStatus.canceled, status=ObserverTaskStatus.canceled,
) )
@ -1268,13 +1268,13 @@ async def mark_observer_task_as_canceled(
async def mark_observer_task_as_terminated( async def mark_observer_task_as_terminated(
observer_cruise_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
failure_reason: str | None = None, failure_reason: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.update_observer_cruise( observer_task = await app.DATABASE.update_task_v2(
observer_cruise_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
status=ObserverTaskStatus.terminated, status=ObserverTaskStatus.terminated,
) )
@ -1287,7 +1287,7 @@ async def mark_observer_task_as_terminated(
def _get_extracted_data_from_block_result( def _get_extracted_data_from_block_result(
block_result: BlockResult, block_result: BlockResult,
task_type: str, task_type: str,
observer_cruise_id: str | None = None, task_v2_id: str | None = None,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
) -> Any | None: ) -> Any | None:
"""Extract data from block result based on task type. """Extract data from block result based on task type.
@ -1295,7 +1295,7 @@ def _get_extracted_data_from_block_result(
Args: Args:
block_result: The result from block execution block_result: The result from block execution
task_type: Type of task ("extract" or "loop") task_type: Type of task ("extract" or "loop")
observer_cruise_id: Optional ID for logging task_v2_id: Optional ID for logging
workflow_run_id: Optional ID for logging workflow_run_id: Optional ID for logging
Returns: Returns:
@ -1320,7 +1320,7 @@ def _get_extracted_data_from_block_result(
LOG.warning( LOG.warning(
"Inner loop output is not a list", "Inner loop output is not a list",
inner_loop_output=inner_loop_output, inner_loop_output=inner_loop_output,
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_run_block_id=block_result.workflow_run_block_id, workflow_run_block_id=block_result.workflow_run_block_id,
) )
@ -1330,7 +1330,7 @@ def _get_extracted_data_from_block_result(
LOG.warning( LOG.warning(
"inner output is not a dict", "inner output is not a dict",
inner_output=inner_output, inner_output=inner_output,
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_run_block_id=block_result.workflow_run_block_id, workflow_run_block_id=block_result.workflow_run_block_id,
) )
@ -1340,7 +1340,7 @@ def _get_extracted_data_from_block_result(
LOG.warning( LOG.warning(
"output_value is not a dict", "output_value is not a dict",
output_value=output_value, output_value=output_value,
observer_cruise_id=observer_cruise_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_run_block_id=block_result.workflow_run_block_id, workflow_run_block_id=block_result.workflow_run_block_id,
) )
@ -1360,7 +1360,7 @@ async def _summarize_observer_task(
screenshots: list[bytes] | None = None, screenshots: list[bytes] | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_thought = await app.DATABASE.create_observer_thought( observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=observer_task.observer_cruise_id,
organization_id=observer_task.organization_id, organization_id=observer_task.organization_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=observer_task.workflow_run_id,
workflow_id=observer_task.workflow_id, workflow_id=observer_task.workflow_id,
@ -1393,7 +1393,7 @@ async def _summarize_observer_task(
) )
return await mark_observer_task_as_completed( return await mark_observer_task_as_completed(
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=observer_task.observer_cruise_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=observer_task.workflow_run_id,
organization_id=observer_task.organization_id, organization_id=observer_task.organization_id,
summary=thought, summary=thought,
@ -1414,7 +1414,7 @@ async def send_observer_task_webhook(observer_task: ObserverTask) -> None:
if not api_key: if not api_key:
LOG.warning( LOG.warning(
"No valid API key found for the organization of observer cruise", "No valid API key found for the organization of observer cruise",
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=observer_task.observer_cruise_id,
) )
return return
# build the observer cruise response # build the observer cruise response
@ -1422,7 +1422,7 @@ async def send_observer_task_webhook(observer_task: ObserverTask) -> None:
headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token)
LOG.info( LOG.info(
"Sending observer cruise response to webhook callback url", "Sending observer cruise response to webhook callback url",
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=observer_task.observer_cruise_id,
webhook_callback_url=observer_task.webhook_callback_url, webhook_callback_url=observer_task.webhook_callback_url,
payload=payload, payload=payload,
headers=headers, headers=headers,
@ -1434,17 +1434,17 @@ async def send_observer_task_webhook(observer_task: ObserverTask) -> None:
if resp.status_code == 200: if resp.status_code == 200:
LOG.info( LOG.info(
"Observer cruise webhook sent successfully", "Observer cruise webhook sent successfully",
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=observer_task.observer_cruise_id,
resp_code=resp.status_code, resp_code=resp.status_code,
resp_text=resp.text, resp_text=resp.text,
) )
else: else:
LOG.info( LOG.info(
"Observer cruise webhook failed", "Observer cruise webhook failed",
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=observer_task.observer_cruise_id,
resp=resp, resp=resp,
resp_code=resp.status_code, resp_code=resp.status_code,
resp_text=resp.text, resp_text=resp.text,
) )
except Exception as e: except Exception as e:
raise FailedToSendWebhook(observer_cruise_id=observer_task.observer_cruise_id) from e raise FailedToSendWebhook(task_v2_id=observer_task.observer_cruise_id) from e

View file

@ -2157,7 +2157,7 @@ class TaskV2Block(Block):
parent_workflow_run_id=workflow_run_id, parent_workflow_run_id=workflow_run_id,
proxy_location=workflow_run.proxy_location, proxy_location=workflow_run.proxy_location,
) )
await app.DATABASE.update_observer_cruise( await app.DATABASE.update_task_v2(
observer_task.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization_id observer_task.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization_id
) )
if observer_task.workflow_run_id: if observer_task.workflow_run_id:
@ -2173,7 +2173,7 @@ class TaskV2Block(Block):
observer_task = await task_v2_service.run_observer_task( observer_task = await task_v2_service.run_observer_task(
organization=organization, organization=organization,
observer_cruise_id=observer_task.observer_cruise_id, task_v2_id=observer_task.observer_cruise_id,
request_id=None, request_id=None,
max_iterations_override=self.max_iterations, max_iterations_override=self.max_iterations,
browser_session_id=browser_session_id, browser_session_id=browser_session_id,