mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-23 04:17:34 +00:00
Add user-configurable timezone and 12/24-hour preferences, then wire them through settings, runtime snapshots, scheduler payloads, wait handling, notifications, backups, memory, plugin metadata, and frontend formatters. Keep UTC as the boundary for absolute instants while serializing user-facing dates in the configured or browser-resolved timezone. Preserve scheduler wall-clock inputs in the selected timezone, propagate TZ into desktop/runtime process environments, and restart active desktop sessions when the runtime timezone changes. Cover the risky paths with timezone regression tests for settings normalization, auto and fixed timezone resolution, scheduler round-trips, memory timestamp conversion, and desktop timezone sync.
247 lines
8.4 KiB
Python
247 lines
8.4 KiB
Python
from dataclasses import dataclass
|
|
import uuid
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
|
|
from helpers.localization import Localization
|
|
|
|
|
|
class NotificationType(Enum):
|
|
INFO = "info"
|
|
SUCCESS = "success"
|
|
WARNING = "warning"
|
|
ERROR = "error"
|
|
PROGRESS = "progress"
|
|
|
|
|
|
class NotificationPriority(Enum):
|
|
NORMAL = 10
|
|
HIGH = 20
|
|
|
|
|
|
@dataclass
|
|
class NotificationItem:
|
|
manager: "NotificationManager"
|
|
no: int
|
|
type: NotificationType
|
|
priority: NotificationPriority
|
|
title: str
|
|
message: str
|
|
detail: str # HTML content for expandable details
|
|
timestamp: datetime
|
|
display_time: int = 3 # Display duration in seconds, default 3 seconds
|
|
read: bool = False
|
|
id: str = ""
|
|
group: str = "" # Group identifier for grouping related notifications
|
|
|
|
def __post_init__(self):
|
|
if not self.id:
|
|
self.id = str(uuid.uuid4())
|
|
# Ensure type is always NotificationType
|
|
if isinstance(self.type, str):
|
|
self.type = NotificationType(self.type)
|
|
|
|
def mark_read(self):
|
|
self.read = True
|
|
self.manager.update_item(self.no, read=True)
|
|
|
|
def output(self):
|
|
return {
|
|
"no": self.no,
|
|
"id": self.id,
|
|
"type": self.type.value if isinstance(self.type, NotificationType) else self.type,
|
|
"priority": self.priority.value if isinstance(self.priority, NotificationPriority) else self.priority,
|
|
"title": self.title,
|
|
"message": self.message,
|
|
"detail": self.detail,
|
|
"timestamp": Localization.get().serialize_datetime(self.timestamp),
|
|
"display_time": self.display_time,
|
|
"read": self.read,
|
|
"group": self.group,
|
|
}
|
|
|
|
|
|
class NotificationManager:
|
|
def __init__(self, max_notifications: int = 100):
|
|
self._lock = threading.RLock()
|
|
self.guid: str = str(uuid.uuid4())
|
|
self.updates: list[int] = []
|
|
self.notifications: list[NotificationItem] = []
|
|
self.max_notifications = max_notifications
|
|
|
|
@staticmethod
|
|
def send_notification(
|
|
type: NotificationType,
|
|
priority: NotificationPriority,
|
|
message: str,
|
|
title: str = "",
|
|
detail: str = "",
|
|
display_time: int = 3,
|
|
group: str = "",
|
|
id: str = "",
|
|
) -> NotificationItem:
|
|
from agent import AgentContext
|
|
return AgentContext.get_notification_manager().add_notification(
|
|
type, priority, message, title, detail, display_time, group, id
|
|
)
|
|
|
|
def add_notification(
|
|
self,
|
|
type: NotificationType,
|
|
priority: NotificationPriority,
|
|
message: str,
|
|
title: str = "",
|
|
detail: str = "",
|
|
display_time: int = 3,
|
|
group: str = "",
|
|
id: str = "",
|
|
) -> NotificationItem:
|
|
with self._lock:
|
|
existing = None
|
|
if id:
|
|
existing = next((n for n in self.notifications if n.id == id), None)
|
|
|
|
if existing:
|
|
existing.type = NotificationType(type)
|
|
existing.priority = NotificationPriority(priority)
|
|
existing.title = title
|
|
existing.message = message
|
|
existing.detail = detail
|
|
existing.timestamp = Localization.get().now()
|
|
existing.display_time = display_time
|
|
existing.group = group
|
|
existing.read = False
|
|
self.updates.append(existing.no)
|
|
item = existing
|
|
else:
|
|
# Create notification item
|
|
item = NotificationItem(
|
|
manager=self,
|
|
no=len(self.notifications),
|
|
type=NotificationType(type),
|
|
priority=NotificationPriority(priority),
|
|
title=title,
|
|
message=message,
|
|
detail=detail,
|
|
timestamp=Localization.get().now(),
|
|
display_time=display_time,
|
|
id=id,
|
|
group=group,
|
|
)
|
|
|
|
self.notifications.append(item)
|
|
self.updates.append(item.no)
|
|
self._enforce_limit()
|
|
|
|
from helpers.state_monitor_integration import mark_dirty_all
|
|
mark_dirty_all(reason="notification.NotificationManager.add_notification")
|
|
return item
|
|
|
|
def _enforce_limit(self):
|
|
with self._lock:
|
|
if len(self.notifications) > self.max_notifications:
|
|
# Remove oldest notifications
|
|
to_remove = len(self.notifications) - self.max_notifications
|
|
self.notifications = self.notifications[to_remove:]
|
|
# Adjust notification numbers
|
|
for i, notification in enumerate(self.notifications):
|
|
notification.no = i
|
|
# Adjust updates list
|
|
self.updates = [no - to_remove for no in self.updates if no >= to_remove]
|
|
|
|
def get_recent_notifications(self, seconds: int = 30) -> list[NotificationItem]:
|
|
cutoff = Localization.get().now() - timedelta(seconds=seconds)
|
|
with self._lock:
|
|
return [n for n in self.notifications if n.timestamp >= cutoff]
|
|
|
|
def output(self, start: int | None = None, end: int | None = None) -> list[dict]:
|
|
with self._lock:
|
|
if start is None:
|
|
start = 0
|
|
if end is None:
|
|
end = len(self.updates)
|
|
updates = self.updates[start:end]
|
|
notifications = list(self.notifications)
|
|
|
|
out = []
|
|
seen = set()
|
|
for update in updates:
|
|
if update not in seen and update < len(notifications):
|
|
out.append(notifications[update].output())
|
|
seen.add(update)
|
|
return out
|
|
|
|
def output_all(self) -> list[dict]:
|
|
with self._lock:
|
|
notifications = list(self.notifications)
|
|
return [n.output() for n in notifications]
|
|
|
|
def mark_read_by_ids(self, notification_ids: list[str]) -> int:
|
|
ids = {nid for nid in notification_ids if isinstance(nid, str) and nid.strip()}
|
|
if not ids:
|
|
return 0
|
|
|
|
changed_nos: list[int] = []
|
|
with self._lock:
|
|
for notification in self.notifications:
|
|
if notification.id in ids and not notification.read:
|
|
notification.read = True
|
|
changed_nos.append(notification.no)
|
|
if changed_nos:
|
|
self.updates.extend(changed_nos)
|
|
|
|
if not changed_nos:
|
|
return 0
|
|
|
|
from helpers.state_monitor_integration import mark_dirty_all
|
|
mark_dirty_all(reason="notification.NotificationManager.mark_read_by_ids")
|
|
return len(changed_nos)
|
|
|
|
def update_item(self, no: int, **kwargs) -> None:
|
|
self._update_item(no, **kwargs)
|
|
|
|
def _update_item(self, no: int, **kwargs):
|
|
changed = False
|
|
with self._lock:
|
|
if no < len(self.notifications):
|
|
item = self.notifications[no]
|
|
for key, value in kwargs.items():
|
|
if hasattr(item, key):
|
|
setattr(item, key, value)
|
|
self.updates.append(no)
|
|
changed = True
|
|
|
|
if not changed:
|
|
return
|
|
|
|
from helpers.state_monitor_integration import mark_dirty_all
|
|
mark_dirty_all(reason="notification.NotificationManager._update_item")
|
|
|
|
def mark_all_read(self):
|
|
changed_nos: list[int] = []
|
|
with self._lock:
|
|
for notification in self.notifications:
|
|
if not notification.read:
|
|
notification.read = True
|
|
changed_nos.append(notification.no)
|
|
if changed_nos:
|
|
self.updates.extend(changed_nos)
|
|
|
|
if not changed_nos:
|
|
return
|
|
|
|
from helpers.state_monitor_integration import mark_dirty_all
|
|
mark_dirty_all(reason="notification.NotificationManager.mark_all_read")
|
|
|
|
def clear_all(self):
|
|
with self._lock:
|
|
self.notifications = []
|
|
self.updates = []
|
|
self.guid = str(uuid.uuid4())
|
|
from helpers.state_monitor_integration import mark_dirty_all
|
|
mark_dirty_all(reason="notification.NotificationManager.clear_all")
|
|
|
|
def get_notifications_by_type(self, type: NotificationType) -> list[NotificationItem]:
|
|
with self._lock:
|
|
return [n for n in self.notifications if n.type == type]
|