agent-zero/helpers/notification.py
Alessandro d1827e6c66
Some checks are pending
Build And Publish Docker Images / plan (push) Waiting to run
Build And Publish Docker Images / build (push) Blocked by required conditions
Refactor: use user locale for time displays
Add user-configurable timezone and 12/24-hour preferences, then wire them through settings, runtime snapshots, scheduler payloads, wait handling, notifications, backups, memory, plugin metadata, and frontend formatters.

Keep UTC as the boundary for absolute instants while serializing user-facing dates in the configured or browser-resolved timezone. Preserve scheduler wall-clock inputs in the selected timezone, propagate TZ into desktop/runtime process environments, and restart active desktop sessions when the runtime timezone changes.

Cover the risky paths with timezone regression tests for settings normalization, auto and fixed timezone resolution, scheduler round-trips, memory timestamp conversion, and desktop timezone sync.
2026-05-21 15:26:00 +02:00

247 lines
8.4 KiB
Python

from dataclasses import dataclass
import uuid
import threading
from datetime import datetime, timedelta
from enum import Enum
from helpers.localization import Localization
class NotificationType(Enum):
INFO = "info"
SUCCESS = "success"
WARNING = "warning"
ERROR = "error"
PROGRESS = "progress"
class NotificationPriority(Enum):
NORMAL = 10
HIGH = 20
@dataclass
class NotificationItem:
manager: "NotificationManager"
no: int
type: NotificationType
priority: NotificationPriority
title: str
message: str
detail: str # HTML content for expandable details
timestamp: datetime
display_time: int = 3 # Display duration in seconds, default 3 seconds
read: bool = False
id: str = ""
group: str = "" # Group identifier for grouping related notifications
def __post_init__(self):
if not self.id:
self.id = str(uuid.uuid4())
# Ensure type is always NotificationType
if isinstance(self.type, str):
self.type = NotificationType(self.type)
def mark_read(self):
self.read = True
self.manager.update_item(self.no, read=True)
def output(self):
return {
"no": self.no,
"id": self.id,
"type": self.type.value if isinstance(self.type, NotificationType) else self.type,
"priority": self.priority.value if isinstance(self.priority, NotificationPriority) else self.priority,
"title": self.title,
"message": self.message,
"detail": self.detail,
"timestamp": Localization.get().serialize_datetime(self.timestamp),
"display_time": self.display_time,
"read": self.read,
"group": self.group,
}
class NotificationManager:
def __init__(self, max_notifications: int = 100):
self._lock = threading.RLock()
self.guid: str = str(uuid.uuid4())
self.updates: list[int] = []
self.notifications: list[NotificationItem] = []
self.max_notifications = max_notifications
@staticmethod
def send_notification(
type: NotificationType,
priority: NotificationPriority,
message: str,
title: str = "",
detail: str = "",
display_time: int = 3,
group: str = "",
id: str = "",
) -> NotificationItem:
from agent import AgentContext
return AgentContext.get_notification_manager().add_notification(
type, priority, message, title, detail, display_time, group, id
)
def add_notification(
self,
type: NotificationType,
priority: NotificationPriority,
message: str,
title: str = "",
detail: str = "",
display_time: int = 3,
group: str = "",
id: str = "",
) -> NotificationItem:
with self._lock:
existing = None
if id:
existing = next((n for n in self.notifications if n.id == id), None)
if existing:
existing.type = NotificationType(type)
existing.priority = NotificationPriority(priority)
existing.title = title
existing.message = message
existing.detail = detail
existing.timestamp = Localization.get().now()
existing.display_time = display_time
existing.group = group
existing.read = False
self.updates.append(existing.no)
item = existing
else:
# Create notification item
item = NotificationItem(
manager=self,
no=len(self.notifications),
type=NotificationType(type),
priority=NotificationPriority(priority),
title=title,
message=message,
detail=detail,
timestamp=Localization.get().now(),
display_time=display_time,
id=id,
group=group,
)
self.notifications.append(item)
self.updates.append(item.no)
self._enforce_limit()
from helpers.state_monitor_integration import mark_dirty_all
mark_dirty_all(reason="notification.NotificationManager.add_notification")
return item
def _enforce_limit(self):
with self._lock:
if len(self.notifications) > self.max_notifications:
# Remove oldest notifications
to_remove = len(self.notifications) - self.max_notifications
self.notifications = self.notifications[to_remove:]
# Adjust notification numbers
for i, notification in enumerate(self.notifications):
notification.no = i
# Adjust updates list
self.updates = [no - to_remove for no in self.updates if no >= to_remove]
def get_recent_notifications(self, seconds: int = 30) -> list[NotificationItem]:
cutoff = Localization.get().now() - timedelta(seconds=seconds)
with self._lock:
return [n for n in self.notifications if n.timestamp >= cutoff]
def output(self, start: int | None = None, end: int | None = None) -> list[dict]:
with self._lock:
if start is None:
start = 0
if end is None:
end = len(self.updates)
updates = self.updates[start:end]
notifications = list(self.notifications)
out = []
seen = set()
for update in updates:
if update not in seen and update < len(notifications):
out.append(notifications[update].output())
seen.add(update)
return out
def output_all(self) -> list[dict]:
with self._lock:
notifications = list(self.notifications)
return [n.output() for n in notifications]
def mark_read_by_ids(self, notification_ids: list[str]) -> int:
ids = {nid for nid in notification_ids if isinstance(nid, str) and nid.strip()}
if not ids:
return 0
changed_nos: list[int] = []
with self._lock:
for notification in self.notifications:
if notification.id in ids and not notification.read:
notification.read = True
changed_nos.append(notification.no)
if changed_nos:
self.updates.extend(changed_nos)
if not changed_nos:
return 0
from helpers.state_monitor_integration import mark_dirty_all
mark_dirty_all(reason="notification.NotificationManager.mark_read_by_ids")
return len(changed_nos)
def update_item(self, no: int, **kwargs) -> None:
self._update_item(no, **kwargs)
def _update_item(self, no: int, **kwargs):
changed = False
with self._lock:
if no < len(self.notifications):
item = self.notifications[no]
for key, value in kwargs.items():
if hasattr(item, key):
setattr(item, key, value)
self.updates.append(no)
changed = True
if not changed:
return
from helpers.state_monitor_integration import mark_dirty_all
mark_dirty_all(reason="notification.NotificationManager._update_item")
def mark_all_read(self):
changed_nos: list[int] = []
with self._lock:
for notification in self.notifications:
if not notification.read:
notification.read = True
changed_nos.append(notification.no)
if changed_nos:
self.updates.extend(changed_nos)
if not changed_nos:
return
from helpers.state_monitor_integration import mark_dirty_all
mark_dirty_all(reason="notification.NotificationManager.mark_all_read")
def clear_all(self):
with self._lock:
self.notifications = []
self.updates = []
self.guid = str(uuid.uuid4())
from helpers.state_monitor_integration import mark_dirty_all
mark_dirty_all(reason="notification.NotificationManager.clear_all")
def get_notifications_by_type(self, type: NotificationType) -> list[NotificationItem]:
with self._lock:
return [n for n in self.notifications if n.type == type]