mirror of
https://github.com/Skyvern-AI/skyvern.git
synced 2026-04-28 03:30:10 +00:00
feat: NAT egress proxy for webhook delivery via static IPs (#5392)
This commit is contained in:
parent
e49a84daa9
commit
49e2bbd7b1
5 changed files with 62 additions and 27 deletions
|
|
@ -13,7 +13,6 @@ from datetime import UTC, datetime
|
|||
from pathlib import Path
|
||||
from typing import Any, Tuple, cast
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
from openai.types.responses.response import Response as OpenAIResponse
|
||||
from opentelemetry import trace as otel_trace
|
||||
|
|
@ -3612,13 +3611,14 @@ class ForgeAgent:
|
|||
headers=signed_data.headers,
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
task.webhook_callback_url,
|
||||
data=signed_data.signed_payload,
|
||||
headers=signed_data.headers,
|
||||
timeout=httpx.Timeout(30.0),
|
||||
)
|
||||
resp = await app.AGENT_FUNCTION.deliver_webhook(
|
||||
url=task.webhook_callback_url,
|
||||
payload=signed_data.signed_payload,
|
||||
headers=signed_data.headers,
|
||||
timeout_seconds=30.0,
|
||||
organization_id=task.organization_id,
|
||||
run_id=task.task_id,
|
||||
)
|
||||
if resp.status_code >= 200 and resp.status_code < 300:
|
||||
LOG.info(
|
||||
"Webhook sent successfully",
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import hashlib
|
|||
from datetime import timedelta
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
from playwright.async_api import Frame, Page
|
||||
|
||||
|
|
@ -618,6 +619,28 @@ class AgentFunction:
|
|||
async def post_action_execution(self, action: Action) -> None:
|
||||
pass
|
||||
|
||||
async def deliver_webhook(
|
||||
self,
|
||||
url: str,
|
||||
payload: str,
|
||||
headers: dict[str, str],
|
||||
timeout_seconds: float = 30.0,
|
||||
organization_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
) -> httpx.Response:
|
||||
"""Deliver a webhook POST request to *url*.
|
||||
|
||||
Returns the upstream ``httpx.Response``. Cloud override routes NAT-org
|
||||
traffic through the egress proxy so it egresses from a static IP.
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
return await client.post(
|
||||
url,
|
||||
content=payload,
|
||||
headers=headers,
|
||||
timeout=httpx.Timeout(timeout_seconds),
|
||||
)
|
||||
|
||||
def get_copilot_security_rules(self) -> str:
|
||||
"""Return security guardrails for the workflow copilot system prompt.
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ from datetime import UTC, datetime, timedelta
|
|||
from hashlib import sha256
|
||||
from typing import Any, Literal, cast
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
||||
|
||||
|
|
@ -4576,13 +4575,14 @@ class WorkflowService:
|
|||
headers=signed_data.headers,
|
||||
)
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
url=workflow_run.webhook_callback_url,
|
||||
data=signed_data.signed_payload,
|
||||
headers=signed_data.headers,
|
||||
timeout=httpx.Timeout(30.0),
|
||||
)
|
||||
resp = await app.AGENT_FUNCTION.deliver_webhook(
|
||||
url=workflow_run.webhook_callback_url,
|
||||
payload=signed_data.signed_payload,
|
||||
headers=signed_data.headers,
|
||||
timeout_seconds=30.0,
|
||||
organization_id=workflow_run.organization_id,
|
||||
run_id=workflow_run.workflow_run_id,
|
||||
)
|
||||
if resp.status_code >= 200 and resp.status_code < 300:
|
||||
LOG.info(
|
||||
"Webhook sent successfully",
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ import string
|
|||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
from opentelemetry import trace as otel_trace
|
||||
from sqlalchemy.exc import OperationalError
|
||||
|
|
@ -2067,13 +2066,14 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None:
|
|||
payload_length=len(payload),
|
||||
header_keys=sorted(headers.keys()),
|
||||
)
|
||||
timeout = httpx.Timeout(30.0)
|
||||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||
resp = await client.post(
|
||||
task_v2.webhook_callback_url,
|
||||
data=payload,
|
||||
headers=headers,
|
||||
)
|
||||
resp = await app.AGENT_FUNCTION.deliver_webhook(
|
||||
url=task_v2.webhook_callback_url,
|
||||
payload=payload,
|
||||
headers=headers,
|
||||
timeout_seconds=30.0,
|
||||
organization_id=task_v2.organization_id,
|
||||
run_id=task_v2.observer_cruise_id,
|
||||
)
|
||||
if resp.status_code >= 200 and resp.status_code < 300:
|
||||
LOG.info(
|
||||
"Task v2 webhook sent successfully",
|
||||
|
|
|
|||
|
|
@ -250,6 +250,8 @@ async def replay_run_webhook(
|
|||
url=validated_url,
|
||||
payload=signed_data.signed_payload,
|
||||
headers=signed_data.headers,
|
||||
organization_id=organization_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
return RunWebhookReplayResponse(
|
||||
|
|
@ -465,7 +467,11 @@ async def _get_api_key(organization_id: str) -> str:
|
|||
|
||||
|
||||
async def _deliver_webhook(
|
||||
url: str, payload: str, headers: dict[str, str]
|
||||
url: str,
|
||||
payload: str,
|
||||
headers: dict[str, str],
|
||||
organization_id: str | None = None,
|
||||
run_id: str | None = None,
|
||||
) -> tuple[int | None, int, str | None, str | None]:
|
||||
start = perf_counter()
|
||||
status_code: int | None = None
|
||||
|
|
@ -473,8 +479,14 @@ async def _deliver_webhook(
|
|||
error: str | None = None
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(url, content=payload, headers=headers, timeout=httpx.Timeout(60.0))
|
||||
response = await app.AGENT_FUNCTION.deliver_webhook(
|
||||
url=url,
|
||||
payload=payload,
|
||||
headers=headers,
|
||||
timeout_seconds=60.0,
|
||||
organization_id=organization_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
status_code = response.status_code
|
||||
body_text = response.text or ""
|
||||
if len(body_text) > RESPONSE_BODY_TRUNCATION_LIMIT:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue