recursively collect extracted information from nested workflow outputs (#2638)
Some checks are pending
Run tests and pre-commit / Run tests and pre-commit hooks (push) Waiting to run
Run tests and pre-commit / Frontend Lint and Build (push) Waiting to run
Publish Fern Docs / run (push) Waiting to run
Build Skyvern SDK and publish to PyPI / build-sdk (push) Blocked by required conditions
Build Skyvern SDK and publish to PyPI / check-version-change (push) Waiting to run
Build Skyvern SDK and publish to PyPI / run-ci (push) Blocked by required conditions

This commit is contained in:
Prakash Maheshwaran 2025-06-12 05:04:26 -04:00 committed by GitHub
parent 20f0fe9321
commit 83a8cf424d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -99,6 +99,25 @@ LOG = structlog.get_logger()
class WorkflowService:
@staticmethod
def _collect_extracted_information(value: Any) -> list[Any]:
"""Recursively collect extracted_information values from nested outputs."""
results: list[Any] = []
if isinstance(value, dict):
if "extracted_information" in value and value["extracted_information"] is not None:
extracted = value["extracted_information"]
if isinstance(extracted, list):
results.extend(extracted)
else:
results.append(extracted)
else:
for v in value.values():
results.extend(WorkflowService._collect_extracted_information(v))
elif isinstance(value, list):
for item in value:
results.extend(WorkflowService._collect_extracted_information(item))
return results
async def setup_workflow_run(
self,
request_id: str | None,
@ -1109,14 +1128,10 @@ class WorkflowService:
EXTRACTED_INFORMATION_KEY = "extracted_information"
if output_parameter_tuples:
outputs = {output_parameter.key: output.value for output_parameter, output in output_parameter_tuples}
extracted_information = {
output_parameter.key: output.value[EXTRACTED_INFORMATION_KEY]
for output_parameter, output in output_parameter_tuples
if output.value is not None
and isinstance(output.value, dict)
and EXTRACTED_INFORMATION_KEY in output.value
and output.value[EXTRACTED_INFORMATION_KEY] is not None
}
extracted_information: list[Any] = []
for _, output in output_parameter_tuples:
if output.value is not None:
extracted_information.extend(WorkflowService._collect_extracted_information(output.value))
outputs[EXTRACTED_INFORMATION_KEY] = extracted_information
total_steps = None