feat: schedule and webhook triggers (#823)

Co-authored-by: Douglas <douglas.ym.lai@gmail.com>
Co-authored-by: a7m-1st <ahmed.jimi.awelkair500@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Tong Chen <web_chentong@163.com>
This commit is contained in:
Ahmed Awelkair A 2026-03-02 12:38:02 +00:00 committed by GitHub
parent c8f6f7e63c
commit 4fb2e5db9a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
200 changed files with 24538 additions and 2126 deletions

View file

@ -0,0 +1,54 @@
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
"""
Trigger Service Package
Contains services for managing triggers including:
- TriggerService: Main service for trigger operations
- TriggerScheduleService: Service for scheduled trigger operations
- App Handlers: Handlers for different trigger types (Slack, Webhook, Schedule)
"""
from app.service.trigger.trigger_service import TriggerService, get_trigger_service
from app.service.trigger.trigger_schedule_service import TriggerScheduleService
from app.service.trigger.app_handler_service import (
BaseAppHandler,
SlackAppHandler,
DefaultWebhookHandler,
ScheduleAppHandler,
AppHandlerResult,
get_app_handler,
get_schedule_handler,
register_app_handler,
get_supported_trigger_types,
)
__all__ = [
# Services
"TriggerService",
"get_trigger_service",
"TriggerScheduleService",
# Handlers
"BaseAppHandler",
"SlackAppHandler",
"DefaultWebhookHandler",
"ScheduleAppHandler",
"AppHandlerResult",
# Handler functions
"get_app_handler",
"get_schedule_handler",
"register_app_handler",
"get_supported_trigger_types",
]

View file

@ -0,0 +1,446 @@
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
"""
Trigger App Handler Service
Modular service for handling app-specific webhook authentication,
filtering, and payload normalization based on trigger_type.
"""
import re
from typing import Optional
from dataclasses import dataclass
from fastapi import Request
from sqlmodel import Session, select, and_
import logging
from app.model.trigger.trigger import Trigger
from app.model.config.config import Config
from app.model.trigger.app_configs import SlackTriggerConfig, WebhookTriggerConfig, ScheduleTriggerConfig
from app.type.trigger_types import TriggerType, ExecutionType, TriggerStatus
from app.type.config_group import ConfigGroup
@dataclass
class AppHandlerResult:
"""Result from app handler operations."""
success: bool
data: Optional[dict] = None
reason: Optional[str] = None
class BaseAppHandler:
"""Base class for app-specific handlers."""
trigger_type: TriggerType
execution_type: ExecutionType = ExecutionType.webhook
config_group: Optional[str] = None
async def get_credentials(self, session: Session, user_id: str) -> dict:
"""Get user credentials from config table."""
if not self.config_group:
return {}
configs = session.exec(
select(Config).where(
and_(
Config.user_id == int(user_id),
Config.config_group == self.config_group
)
)
).all()
return {config.config_name: config.config_value for config in configs}
async def authenticate(
self,
request: Request,
body: bytes,
trigger: Trigger,
session: Session
) -> AppHandlerResult:
"""
Authenticate the incoming webhook request.
Returns (success, challenge_response or None)
"""
return AppHandlerResult(success=True)
async def filter_event(
self,
payload: dict,
trigger: Trigger
) -> AppHandlerResult:
"""
Filter events based on trigger configuration.
Returns (should_process, reason)
"""
return AppHandlerResult(success=True, reason="ok")
def normalize_payload(
self,
payload: dict,
trigger: Trigger,
request_meta: dict = None
) -> dict:
"""Normalize the payload for execution input."""
return payload
class SlackAppHandler(BaseAppHandler):
"""Handler for Slack triggers."""
trigger_type = TriggerType.slack_trigger
execution_type = ExecutionType.slack
config_group = ConfigGroup.SLACK.value
async def authenticate(
self,
request: Request,
body: bytes,
trigger: Trigger,
session: Session
) -> AppHandlerResult:
"""Handle Slack authentication and URL verification."""
from camel.auth.slack_auth import SlackAuth
credentials = await self.get_credentials(session, trigger.user_id)
slack_auth = SlackAuth(
signing_secret=credentials.get("SLACK_SIGNING_SECRET"),
bot_token=credentials.get("SLACK_BOT_TOKEN"),
api_token=credentials.get("SLACK_API_TOKEN"),
)
# Check for URL verification challenge
challenge_response = slack_auth.get_verification_response(request, body)
if challenge_response:
# Return the challenge response (already in correct format: {"challenge": "..."})
logger.info(f"Slack URL verification - challenge_response: {challenge_response}")
return AppHandlerResult(success=True, data=challenge_response)
# Verify webhook signature
if not slack_auth.verify_webhook_request(request, body):
logger.warning("Invalid Slack webhook signature", extra={
"trigger_id": trigger.id
})
return AppHandlerResult(success=False, reason="invalid_signature")
return AppHandlerResult(success=True)
async def filter_event(
self,
payload: dict,
trigger: Trigger
) -> AppHandlerResult:
"""Filter Slack events based on trigger config."""
# Prefer 'config' field
config_data = trigger.config or {}
config = SlackTriggerConfig(**config_data)
event = payload.get("event", {})
event_type = event.get("type", "")
# Check event type
if not config.should_trigger(event_type):
return AppHandlerResult(success=False, reason="event_type_not_configured")
# Check channel filter (if channel_id is set, only trigger for that channel)
if config.channel_id:
if event.get("channel") != config.channel_id:
return AppHandlerResult(success=False, reason="channel_not_matched")
# Check bot message filter
if config.ignore_bot_messages:
if event.get("bot_id") or event.get("subtype") == "bot_message":
return AppHandlerResult(success=False, reason="bot_message_ignored")
# Check user filter
if config.ignore_users and event.get("user") in config.ignore_users:
return AppHandlerResult(success=False, reason="user_filtered")
# Check message filter regex
if config.message_filter and event.get("text"):
if not re.search(config.message_filter, event.get("text", ""), re.IGNORECASE):
return AppHandlerResult(success=False, reason="message_filter_not_matched")
return AppHandlerResult(success=True, reason="ok")
def normalize_payload(
self,
payload: dict,
trigger: Trigger,
request_meta: dict = None
) -> dict:
"""Normalize Slack event payload."""
logger.info("Normalizing payload", extra={"payload": payload})
# Prefer 'config' field
config_data = trigger.config or {}
config = SlackTriggerConfig(**config_data)
event = payload.get("event", {})
normalized = {
"event_type": event.get("type"),
"event_ts": event.get("event_ts"),
"team_id": payload.get("team_id"),
"user_id": event.get("user"),
"channel_id": event.get("channel"),
"text": event.get("text"),
"message_ts": event.get("ts"),
"thread_ts": event.get("thread_ts"),
"reaction": event.get("reaction"),
"files": event.get("files"),
"event_id": payload.get("event_id") or payload.get("id")
}
# if config.include_raw_payload:
# normalized["raw_payload"] = payload
return normalized
class DefaultWebhookHandler(BaseAppHandler):
"""Default handler for generic webhooks with config-based filtering."""
trigger_type = TriggerType.webhook
execution_type = ExecutionType.webhook
async def filter_event(
self,
payload: dict,
trigger: Trigger,
headers: dict = None,
body_raw: str = None
) -> AppHandlerResult:
"""Filter webhook events based on trigger config."""
config_data = trigger.config or {}
config = WebhookTriggerConfig(**config_data)
# Get text content for message_filter (check body for text field or stringify)
text = None
if isinstance(payload, dict):
text = payload.get("text") or payload.get("message") or payload.get("content")
if text is None and body_raw:
text = body_raw
# Use the config's should_trigger method
should_trigger, reason = config.should_trigger(
body=body_raw or "",
headers=headers or {},
text=text
)
if not should_trigger:
return AppHandlerResult(success=False, reason=reason)
return AppHandlerResult(success=True, reason="ok")
def normalize_payload(
self,
payload: dict,
trigger: Trigger,
request_meta: dict = None
) -> dict:
"""Normalize generic webhook payload with full request metadata."""
config_data = trigger.config or {}
config = WebhookTriggerConfig(**config_data)
result = {"body": payload}
if request_meta:
# Include headers if configured
if config.include_headers and "headers" in request_meta:
result["headers"] = request_meta["headers"]
# Include query params if configured
if config.include_query_params and "query_params" in request_meta:
result["query_params"] = request_meta["query_params"]
# Include request metadata if configured
if config.include_request_metadata:
if "method" in request_meta:
result["method"] = request_meta["method"]
if "url" in request_meta:
result["url"] = request_meta["url"]
if "client_ip" in request_meta:
result["client_ip"] = request_meta["client_ip"]
return result
class ScheduleAppHandler(BaseAppHandler):
"""
Handler for scheduled triggers.
Manages schedule-specific logic including:
- Expiration checking (expirationDate for recurring schedules)
- Date validation for one-time executions (date field)
"""
trigger_type = TriggerType.schedule
execution_type = ExecutionType.scheduled
async def filter_event(
self,
payload: dict,
trigger: Trigger
) -> AppHandlerResult:
"""
Filter scheduled events based on trigger config.
Checks:
- If one-time (date set) and date has passed
- If recurring with expirationDate and it has passed
"""
config_data = trigger.config or {}
try:
config = ScheduleTriggerConfig(**config_data)
except Exception as e:
logger.warning(
"Invalid schedule config",
extra={"trigger_id": trigger.id, "error": str(e)}
)
# Allow execution if config is missing/invalid (backwards compatibility)
return AppHandlerResult(success=True, reason="ok")
# Check if schedule should execute
should_execute, reason = config.should_execute()
if not should_execute:
return AppHandlerResult(success=False, reason=reason)
return AppHandlerResult(success=True, reason="ok")
def normalize_payload(
self,
payload: dict,
trigger: Trigger,
request_meta: dict = None
) -> dict:
"""Normalize scheduled trigger payload."""
config_data = trigger.config or {}
normalized = {
"scheduled_at": payload.get("scheduled_at"),
"trigger_id": trigger.id,
"trigger_name": trigger.name,
"is_single_execution": trigger.is_single_execution,
}
# Include config details if present
if config_data:
if config_data.get("date"):
normalized["date"] = config_data.get("date")
if config_data.get("expirationDate"):
normalized["expirationDate"] = config_data.get("expirationDate")
return normalized
def check_and_handle_expiration(
self,
trigger: Trigger,
session: Session
) -> bool:
"""
Check if a schedule has expired and handle accordingly.
Args:
trigger: The trigger to check
session: Database session for updates
Returns:
True if trigger is expired and was deactivated, False otherwise
"""
config_data = trigger.config or {}
try:
config = ScheduleTriggerConfig(**config_data)
except Exception as e:
logger.warning(
"Invalid schedule config during expiration check",
extra={"trigger_id": trigger.id, "error": str(e)}
)
return False
if config.is_expired():
# Deactivate the trigger
trigger.status = TriggerStatus.completed
session.add(trigger)
session.commit()
logger.info(
"Schedule trigger expired and deactivated",
extra={
"trigger_id": trigger.id,
"trigger_name": trigger.name,
"expiration_date": config.expirationDate or config.date
}
)
return True
return False
def validate_schedule_for_execution(
self,
trigger: Trigger
) -> tuple[bool, str]:
"""
Validate that a scheduled trigger is valid for execution.
Args:
trigger: The trigger to validate
Returns:
Tuple of (is_valid, reason)
"""
config_data = trigger.config or {}
try:
config = ScheduleTriggerConfig(**config_data)
except Exception as e:
return False, f"invalid_config: {str(e)}"
# Check expiration
if config.is_expired():
return False, "schedule_expired"
return True, "ok"
# Registry of handlers by trigger_type
_HANDLERS: dict[TriggerType, BaseAppHandler] = {
TriggerType.slack_trigger: SlackAppHandler(),
TriggerType.webhook: DefaultWebhookHandler(),
TriggerType.schedule: ScheduleAppHandler(),
}
def get_app_handler(trigger_type: TriggerType) -> Optional[BaseAppHandler]:
"""Get the handler for a trigger type."""
return _HANDLERS.get(trigger_type)
def register_app_handler(trigger_type: TriggerType, handler: BaseAppHandler):
"""Register a new app handler."""
_HANDLERS[trigger_type] = handler
def get_supported_trigger_types() -> list[TriggerType]:
"""Get list of trigger types with webhook support."""
return list(_HANDLERS.keys())
def get_schedule_handler() -> ScheduleAppHandler:
"""Get the schedule handler instance."""
return _HANDLERS.get(TriggerType.schedule)

View file

@ -0,0 +1,428 @@
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
from datetime import datetime, timedelta, timezone
from typing import List, Tuple, Optional
import logging
from croniter import croniter
from uuid import uuid4
import asyncio
from sqlmodel import select
from app.model.trigger.trigger import Trigger
from app.model.trigger.trigger_execution import TriggerExecution
from app.type.trigger_types import TriggerStatus, ExecutionType, ExecutionStatus, TriggerType
from app.component.trigger_utils import check_rate_limits, MAX_DISPATCH_PER_TICK
from app.model.trigger.app_configs import ScheduleTriggerConfig
class TriggerScheduleService:
"""Service for managing scheduled trigger operations.
This service mainly delegates schedule business logic
from the main trigger_service.py.
Handles tasks from the Celery beat scheduler.
Mainly handles:
- Polling for due schedules
- Dispatching scheduled triggers
- Calculating next run times based on cron expressions
"""
def __init__(self, session):
"""
Initialize the schedule service with a database session.
Args:
session: SQLModel session for database operations
"""
self.session = session
def fetch_due_schedules(self, limit: Optional[int] = 100) -> List[Trigger]:
"""
Fetch triggers that are due for execution.
Args:
limit: Maximum number of triggers to fetch
Returns:
List of triggers that need to be executed
"""
now = datetime.now(timezone.utc)
try:
statement = (
select(Trigger)
.where(Trigger.trigger_type == TriggerType.schedule)
.where(Trigger.status == TriggerStatus.active)
.where(Trigger.next_run_at <= now)
.order_by(Trigger.next_run_at)
.limit(limit)
)
results = self.session.exec(statement).all()
logger.debug(
"Fetched due schedules",
extra={
"count": len(results),
"current_time": now.isoformat()
}
)
return list(results)
except Exception as e:
logger.error(
"Failed to fetch due schedules",
extra={"error": str(e)},
exc_info=True
)
return []
def calculate_next_run_at(
self,
trigger: Trigger,
base_time: Optional[datetime] = None
) -> datetime:
"""
Calculate the next run time for a trigger based on its cron expression.
Args:
trigger: The trigger to calculate next run time for
base_time: Base time to calculate from (defaults to now)
Returns:
The next scheduled run time
Raises:
ValueError: If trigger has no cron expression or invalid expression
"""
if not trigger.custom_cron_expression:
raise ValueError(f"Trigger {trigger.id} has no cron expression")
if base_time is None:
base_time = datetime.now(timezone.utc)
try:
cron = croniter(trigger.custom_cron_expression, base_time)
next_run = cron.get_next(datetime)
return next_run
except Exception as e:
logger.error(
"Failed to calculate next run time",
extra={
"trigger_id": trigger.id,
"cron_expression": trigger.custom_cron_expression,
"error": str(e)
}
)
raise
def dispatch_trigger(self, trigger: Trigger) -> bool:
"""
Dispatch a trigger for execution.
Args:
trigger: The trigger to dispatch
Returns:
True if dispatched successfully, False otherwise
"""
try:
# Check schedule expiration before dispatching
if not self._check_schedule_valid(trigger):
logger.info(
"Schedule trigger expired, skipping dispatch",
extra={"trigger_id": trigger.id, "trigger_name": trigger.name}
)
return False
# Create execution record
execution_id = str(uuid4())
execution = TriggerExecution(
trigger_id=trigger.id,
execution_id=execution_id,
execution_type=ExecutionType.scheduled,
status=ExecutionStatus.pending,
input_data={"scheduled_at": datetime.now(timezone.utc).isoformat()},
started_at=datetime.now(timezone.utc)
)
self.session.add(execution)
# Update trigger statistics
trigger.last_executed_at = datetime.now(timezone.utc)
trigger.last_execution_status = "pending"
# Calculate and set next run time
try:
trigger.next_run_at = self.calculate_next_run_at(trigger, datetime.now(timezone.utc))
except Exception as e:
logger.error(
"Failed to calculate next run time, trigger will be skipped",
extra={"trigger_id": trigger.id, "error": str(e)}
)
# Set next_run_at far in the future to prevent immediate re-execution
trigger.next_run_at = datetime.now(timezone.utc) + timedelta(days=365)
# If single execution, deactivate the trigger
if trigger.is_single_execution:
trigger.status = TriggerStatus.inactive
logger.info(
"Trigger deactivated after single execution",
extra={"trigger_id": trigger.id}
)
self.session.add(trigger)
self.session.commit()
# TODO: Queue the actual task execution
# This would integrate with a task queue (e.g., Celery) to execute the trigger's action
# For now event is sent to client for execution
logger.info(
"Trigger dispatched successfully",
extra={
"trigger_id": trigger.id,
"trigger_name": trigger.name,
"execution_id": execution_id,
"next_run_at": trigger.next_run_at.isoformat() if trigger.next_run_at else None
}
)
# Notify WebSocket subscribers
# Using asyncio.run() to run async code from sync Celery worker context
try:
# Notify WebSocket subscribers via Redis pub/sub
from app.component.redis_utils import get_redis_manager
redis_manager = get_redis_manager()
redis_manager.publish_execution_event({
"type": "execution_created",
"execution_id": execution_id,
"trigger_id": trigger.id,
"trigger_type": "schedule",
"status": "pending",
"input_data": execution.input_data,
"task_prompt": trigger.task_prompt,
"execution_type": "schedule",
"user_id": str(trigger.user_id),
"project_id": str(trigger.project_id)
})
logger.debug("WebSocket notification sent", extra={
"execution_id": execution_id,
"trigger_id": trigger.id
})
except Exception as e:
# Don't fail the trigger dispatch if notification fails
logger.warning("Failed to send WebSocket notification", extra={
"trigger_id": trigger.id,
"execution_id": execution_id,
"error": str(e)
})
return True
except Exception as e:
logger.error(
"Failed to dispatch trigger",
extra={
"trigger_id": trigger.id,
"error": str(e)
},
exc_info=True
)
self.session.rollback()
return False
def process_schedules(self, due_schedules: List[Trigger]) -> Tuple[int, int]:
"""
Process due schedules, checking rate limits and dispatching.
Args:
due_schedules: List of triggers that are due for execution
Returns:
Tuple of (dispatched_count, rate_limited_count)
"""
dispatched_count = 0
rate_limited_count = 0
for trigger in due_schedules:
# Check rate limits
if not check_rate_limits(self.session, trigger):
rate_limited_count += 1
# Still update next_run_at even if rate limited, so we don't keep checking
try:
trigger.next_run_at = self.calculate_next_run_at(trigger, datetime.now(timezone.utc))
self.session.add(trigger)
self.session.commit()
except Exception as e:
logger.error(
"Failed to update next_run_at for rate limited trigger",
extra={"trigger_id": trigger.id, "error": str(e)}
)
continue
# Dispatch the trigger
if self.dispatch_trigger(trigger):
dispatched_count += 1
return dispatched_count, rate_limited_count
def poll_and_execute_due_triggers(
self,
max_dispatch_per_tick: Optional[int] = None
) -> Tuple[int, int]:
"""
Poll for due triggers and execute them in batches.
Args:
max_dispatch_per_tick: Maximum number of triggers to dispatch in this tick
(defaults to MAX_DISPATCH_PER_TICK)
Returns:
Tuple of (total_dispatched, total_rate_limited)
"""
max_dispatch = max_dispatch_per_tick or MAX_DISPATCH_PER_TICK
total_dispatched = 0
total_rate_limited = 0
# Process in batches until we've handled all due schedules or hit the limit
while True:
due_schedules = self.fetch_due_schedules()
if not due_schedules:
break
dispatched_count, rate_limited_count = self.process_schedules(due_schedules)
total_dispatched += dispatched_count
total_rate_limited += rate_limited_count
logger.debug(
"Batch processed",
extra={
"dispatched": dispatched_count,
"rate_limited": rate_limited_count
}
)
# Check if we've hit the per-tick limit (if enabled)
if max_dispatch > 0 and total_dispatched >= max_dispatch:
logger.warning(
"Circuit breaker activated: reached dispatch limit, will continue next tick",
extra={"limit": max_dispatch}
)
break
if total_dispatched > 0 or total_rate_limited > 0:
logger.info(
"Trigger schedule poll completed",
extra={
"total_dispatched": total_dispatched,
"total_rate_limited": total_rate_limited
}
)
return total_dispatched, total_rate_limited
def _check_schedule_valid(self, trigger: Trigger) -> bool:
"""
Check if a scheduled trigger is valid for execution.
Validates:
- For one-time (date set): Checks if the scheduled date has passed
- For recurring (expirationDate set): Checks if expirationDate has passed
If expired, the trigger will be marked as completed.
Args:
trigger: The trigger to check
Returns:
True if trigger is valid for execution, False if expired
"""
config_data = trigger.config or {}
# If no config or empty config, allow execution (no expiration)
if not config_data:
return True
try:
config = ScheduleTriggerConfig(**config_data)
except Exception as e:
logger.warning(
"Invalid schedule config",
extra={"trigger_id": trigger.id, "error": str(e)}
)
return False
# Check if schedule has expired
if config.is_expired():
# Mark trigger as completed
trigger.status = TriggerStatus.completed
self.session.add(trigger)
self.session.commit()
logger.info(
"Schedule trigger expired and marked as completed",
extra={
"trigger_id": trigger.id,
"trigger_name": trigger.name,
"expiration_info": config.expirationDate or config.date
}
)
return False
return True
def update_trigger_next_run(self, trigger: Trigger) -> None:
"""
Update a trigger's next_run_at based on its cron expression.
Args:
trigger: The trigger to update
"""
try:
# Check if schedule is expired before updating next run
if not self._check_schedule_valid(trigger):
logger.info(
"Trigger expired, not updating next_run_at",
extra={"trigger_id": trigger.id}
)
return
trigger.next_run_at = self.calculate_next_run_at(trigger)
self.session.add(trigger)
self.session.commit()
logger.info(
"Trigger next_run_at updated",
extra={
"trigger_id": trigger.id,
"next_run_at": trigger.next_run_at.isoformat()
}
)
except Exception as e:
logger.error(
"Failed to update trigger next_run_at",
extra={
"trigger_id": trigger.id,
"error": str(e)
}
)
self.session.rollback()

View file

@ -0,0 +1,391 @@
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
from datetime import datetime, timedelta, timezone
from typing import Optional, List, Dict, Any
from sqlmodel import select, and_, or_
from uuid import uuid4
import logging
from app.model.trigger.trigger import Trigger
from app.model.trigger.trigger_execution import TriggerExecution
from app.type.trigger_types import TriggerType, TriggerStatus, ExecutionType, ExecutionStatus
from app.component.database import session_make
from app.service.trigger.trigger_schedule_service import TriggerScheduleService
from app.component.trigger_utils import SCHEDULED_FETCH_BATCH_SIZE, check_rate_limits
from app.model.trigger.app_configs import ScheduleTriggerConfig, WebhookTriggerConfig
from app.model.trigger.app_configs.base_config import BaseTriggerConfig
class TriggerService:
"""Service for managing trigger operations and scheduling."""
def __init__(self, session=None):
self.session = session or session_make()
self.schedule_service = TriggerScheduleService(self.session)
def create_execution(
self,
trigger: Trigger,
execution_type: ExecutionType,
input_data: Optional[Dict[str, Any]] = None
) -> TriggerExecution:
"""Create a new trigger execution."""
execution_id = str(uuid4())
execution = TriggerExecution(
trigger_id=trigger.id,
execution_id=execution_id,
execution_type=execution_type,
status=ExecutionStatus.pending,
input_data=input_data or {},
started_at=datetime.now(timezone.utc)
)
self.session.add(execution)
self.session.commit()
self.session.refresh(execution)
# Update trigger statistics
trigger.last_executed_at = datetime.now(timezone.utc)
trigger.last_execution_status = "pending"
self.session.add(trigger)
self.session.commit()
logger.info("Execution created", extra={
"trigger_id": trigger.id,
"execution_id": execution_id,
"execution_type": execution_type.value
})
return execution
def update_execution_status(
self,
execution: TriggerExecution,
status: ExecutionStatus,
output_data: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
tokens_used: Optional[int] = None,
tools_executed: Optional[Dict[str, Any]] = None
) -> TriggerExecution:
"""Update execution status and metadata."""
execution.status = status
# Set completed_at and duration for terminal statuses
if status in [ExecutionStatus.completed, ExecutionStatus.failed, ExecutionStatus.cancelled, ExecutionStatus.missed]:
execution.completed_at = datetime.now(timezone.utc)
if execution.started_at:
# Ensure started_at is timezone-aware for subtraction
started_at = execution.started_at
if started_at.tzinfo is None:
started_at = started_at.replace(tzinfo=timezone.utc)
execution.duration_seconds = (execution.completed_at - started_at).total_seconds()
if output_data:
execution.output_data = output_data
if error_message:
execution.error_message = error_message
if tokens_used:
execution.tokens_used = tokens_used
if tools_executed:
execution.tools_executed = tools_executed
self.session.add(execution)
self.session.commit()
# Update trigger status and handle auto-disable logic
trigger = self.session.get(Trigger, execution.trigger_id)
if trigger:
if status == ExecutionStatus.failed:
trigger.last_execution_status = "failed"
trigger.consecutive_failures += 1
# Check for auto-disable based on max_failure_count in config
self._check_auto_disable(trigger)
elif status == ExecutionStatus.completed:
trigger.last_execution_status = "completed"
# Reset consecutive failures on success
trigger.consecutive_failures = 0
elif status == ExecutionStatus.cancelled:
trigger.last_execution_status = "cancelled"
elif status == ExecutionStatus.missed:
trigger.last_execution_status = "missed"
self.session.add(trigger)
self.session.commit()
logger.info("Execution status updated", extra={
"execution_id": execution.execution_id,
"status": status.name,
"duration": execution.duration_seconds
})
return execution
def _check_auto_disable(self, trigger: Trigger) -> bool:
"""
Check if trigger should be auto-disabled based on consecutive failures.
Args:
trigger: The trigger to check
Returns:
True if trigger was auto-disabled, False otherwise
"""
if not trigger.config:
return False
try:
# Get the appropriate config class based on trigger type
config: BaseTriggerConfig
if trigger.trigger_type == TriggerType.schedule:
config = ScheduleTriggerConfig(**trigger.config)
elif trigger.trigger_type == TriggerType.webhook:
config = WebhookTriggerConfig(**trigger.config)
else:
# For other trigger types, use base config
config = BaseTriggerConfig(**trigger.config)
# Check if auto-disable should happen
if config.should_auto_disable(trigger.consecutive_failures):
trigger.status = TriggerStatus.inactive
trigger.auto_disabled_at = datetime.now(timezone.utc)
logger.warning(
"Trigger auto-disabled due to max failures",
extra={
"trigger_id": trigger.id,
"trigger_name": trigger.name,
"consecutive_failures": trigger.consecutive_failures,
"max_failure_count": config.max_failure_count
}
)
return True
except Exception as e:
logger.error(
"Failed to check auto-disable for trigger",
extra={
"trigger_id": trigger.id,
"error": str(e)
}
)
return False
def get_pending_executions(self) -> List[TriggerExecution]:
"""Get all pending executions that need to be processed."""
executions = self.session.exec(
select(TriggerExecution).where(
TriggerExecution.status == ExecutionStatus.pending
).order_by(TriggerExecution.created_at)
).all()
return list(executions)
def get_failed_executions_for_retry(self) -> List[TriggerExecution]:
"""Get failed executions that can be retried."""
executions = self.session.exec(
select(TriggerExecution).where(
and_(
TriggerExecution.status == ExecutionStatus.failed,
TriggerExecution.attempts < TriggerExecution.max_retries
)
).order_by(TriggerExecution.created_at)
).all()
return list(executions)
def get_due_scheduled_triggers(self, limit: Optional[int] = None) -> List[Trigger]:
"""
Fetch scheduled triggers that are due for execution.
Args:
limit: Maximum number of triggers to fetch (defaults to SCHEDULED_FETCH_BATCH_SIZE)
Returns:
List of triggers that are due for execution
"""
current_time = datetime.now(timezone.utc)
limit = limit or SCHEDULED_FETCH_BATCH_SIZE
# Query triggers that:
# 1. Are scheduled type
# 2. Are active
# 3. Have a cron expression
# 4. next_run_at is null (never run) or next_run_at <= now
triggers = self.session.exec(
select(Trigger)
.where(
and_(
Trigger.trigger_type == TriggerType.schedule,
Trigger.status == TriggerStatus.active,
Trigger.custom_cron_expression.is_not(None),
or_(
Trigger.next_run_at.is_(None),
Trigger.next_run_at <= current_time
)
)
)
.limit(limit)
).all()
return list(triggers)
def execute_scheduled_triggers(self) -> int:
"""
Execute all due scheduled triggers.
Uses TriggerScheduleService for the actual execution logic.
"""
due_triggers = self.get_due_scheduled_triggers()
if not due_triggers:
return 0
dispatched_count, rate_limited_count = self.schedule_service.process_schedules(due_triggers)
logger.info(
"Scheduled triggers execution completed",
extra={
"dispatched": dispatched_count,
"rate_limited": rate_limited_count
}
)
return dispatched_count
def process_slack_trigger(
self,
trigger: Trigger,
slack_data: Dict[str, Any]
) -> Optional[TriggerExecution]:
"""Process a Slack trigger event."""
if trigger.trigger_type != TriggerType.slack_trigger:
raise ValueError("Trigger is not a Slack trigger")
if trigger.status != TriggerStatus.active:
logger.warning("Slack trigger is not active", extra={
"trigger_id": trigger.id
})
return None
if not check_rate_limits(self.session, trigger):
logger.warning("Slack trigger execution skipped due to rate limits", extra={
"trigger_id": trigger.id
})
return None
try:
execution = self.create_execution(
trigger=trigger,
execution_type=ExecutionType.slack,
input_data=slack_data
)
# TODO: Queue the actual task execution
logger.info("Slack trigger executed", extra={
"trigger_id": trigger.id,
"execution_id": execution.execution_id
})
return execution
except Exception as e:
logger.error("Slack trigger execution failed", extra={
"trigger_id": trigger.id,
"error": str(e)
}, exc_info=True)
return None
def cleanup_old_executions(self, days_to_keep: int = 30) -> int:
"""Clean up old execution records."""
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_to_keep)
old_executions = self.session.exec(
select(TriggerExecution).where(
and_(
TriggerExecution.created_at < cutoff_date,
TriggerExecution.status.in_([
ExecutionStatus.completed,
ExecutionStatus.failed,
ExecutionStatus.cancelled
])
)
)
).all()
count = len(old_executions)
for execution in old_executions:
self.session.delete(execution)
self.session.commit()
logger.info("Old executions cleaned up", extra={
"count": count,
"days_to_keep": days_to_keep
})
return count
def get_trigger_statistics(self, trigger_id: int) -> Dict[str, Any]:
"""Get statistics for a specific trigger."""
trigger = self.session.get(Trigger, trigger_id)
if not trigger:
raise ValueError("Trigger not found")
# Get execution counts by status
executions = self.session.exec(
select(TriggerExecution).where(
TriggerExecution.trigger_id == trigger_id
)
).all()
stats = {
"trigger_id": trigger_id,
"name": trigger.name,
"trigger_type": trigger.trigger_type.value,
"status": trigger.status.name,
"total_executions": len(executions),
"successful_executions": len([e for e in executions if e.status == ExecutionStatus.completed]),
"failed_executions": len([e for e in executions if e.status == ExecutionStatus.failed]),
"pending_executions": len([e for e in executions if e.status == ExecutionStatus.pending]),
"cancelled_executions": len([e for e in executions if e.status == ExecutionStatus.cancelled]),
"last_executed_at": trigger.last_executed_at.isoformat() if trigger.last_executed_at else None,
"created_at": trigger.created_at.isoformat() if trigger.created_at else None
}
# Calculate average execution time for completed executions
completed_executions = [e for e in executions if e.status == ExecutionStatus.completed and e.duration_seconds]
if completed_executions:
avg_duration = sum(e.duration_seconds for e in completed_executions) / len(completed_executions)
stats["average_execution_time_seconds"] = round(avg_duration, 2)
# Calculate total tokens used
total_tokens = sum(e.tokens_used for e in executions if e.tokens_used)
if total_tokens:
stats["total_tokens_used"] = total_tokens
return stats
def get_trigger_service(session=None) -> TriggerService:
"""Factory function to create a TriggerService instance with a fresh session."""
return TriggerService(session)