better responsive updates on SSE/WebUI

This commit is contained in:
ChrispyBacon-dev 2025-09-22 18:05:01 +02:00
parent 63dd0f141e
commit d5ec8e1108
8 changed files with 290 additions and 82 deletions

View file

@ -18,6 +18,7 @@ import logging
import queue
import sys
import os
import json
from flask import Flask
from flask_wtf.csrf import CSRFProtect
@ -66,6 +67,17 @@ queue_handler.setLevel(logging.INFO)
root_logger.addHandler(queue_handler)
def publish_state_event(event_type, data=None):
message = json.dumps({
"type": event_type,
"data": data or {}
})
try:
state_update_queue.put_nowait(message)
except queue.Full:
logging.warning("State event queue full. Dropping event: %s", event_type)
docker_client = None
try:
docker_client = docker.from_env(timeout=10)

View file

@ -20,11 +20,10 @@ import time
import requests
import copy
import re
import queue
from docker.errors import NotFound, APIError
from flask import current_app
from app import config, docker_client, cloudflared_agent_state, tunnel_state, state_update_queue
from app import config, docker_client, cloudflared_agent_state, tunnel_state, publish_state_event
from app.core.state_manager import managed_rules, state_lock, save_state
from app.core.tunnel_manager import update_cloudflare_config
@ -353,10 +352,7 @@ def process_container_start(container_obj):
if state_changed_locally_for_this_container:
logging.debug(f"DOCKER_HANDLER_PRE_SAVE: For container {container_name_val}.")
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
else:
logging.info(f"DOCKER_HANDLER: No local state changes for {container_name_val}. Skipping save_state().")
@ -444,10 +440,7 @@ def schedule_container_stop(container_id_val):
if state_changed_after_stop_processing:
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
def docker_event_listener(stop_event_param):
if not docker_client:

View file

@ -19,9 +19,7 @@ import logging
import time
import threading
from datetime import datetime, timedelta, timezone
import queue
from app import config, docker_client, tunnel_state, state_update_queue
from app import config, docker_client, tunnel_state, publish_state_event
from flask import current_app
from app.core.state_manager import managed_rules, state_lock, save_state, get_agent, update_agent
@ -280,10 +278,7 @@ def reconcile_agent_report(agent_id, reported_containers):
except Exception:
logging.exception(f"[Reconcile-Agent] Could not update agent record for last_auto_restore_at: {agent_id}")
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
if restored_any:
try:
@ -484,10 +479,7 @@ def _run_reconciliation_logic():
if state_changed_locally:
current_app.reconciliation_info["status"] = "Saving reconciled state..."
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
if time.time() - reconciliation_start_time > max_total_time - 15:
logging.warning("[Reconcile] Timeout before Tunnel/DNS operations.")
@ -656,10 +648,7 @@ def cleanup_expired_rules(stop_event_param):
if deleted_count > 0:
logging.info(f"Removed {deleted_count} expired rules from local state.")
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
except Exception as e_cleanup:
logging.error(f"Error in cleanup task loop: {e_cleanup}", exc_info=True)

View file

@ -20,6 +20,7 @@ import logging
import os
import threading
from datetime import datetime, timezone
from typing import Dict, Any, List
from app import config
from app.core import agent_key_store
from app.core.utils import get_rule_key
@ -316,3 +317,40 @@ def get_agent_rules(agent_id):
key: rule for key, rule in managed_rules.items()
if rule.get("agent_id") == agent_id and rule.get("status") == "active"
}
def _serialize_datetime(value):
if isinstance(value, datetime):
value_utc = value.astimezone(timezone.utc) if value.tzinfo else value.replace(tzinfo=timezone.utc)
return value_utc.isoformat().replace('+00:00', 'Z')
return None
def serialize_managed_rule(rule_key: str, rule: Dict[str, Any]) -> Dict[str, Any]:
if not rule:
return {"id": rule_key}
return {
"id": rule_key,
"hostname": rule.get("hostname"),
"path": rule.get("path"),
"service": rule.get("service"),
"status": rule.get("status"),
"delete_at": _serialize_datetime(rule.get("delete_at")),
"zone_id": rule.get("zone_id"),
"zone_name": rule.get("zone_name"),
"source": rule.get("source"),
"container_id": rule.get("container_id"),
"tunnel_id": rule.get("tunnel_id"),
"tunnel_name": rule.get("tunnel_name"),
"access_policy_type": rule.get("access_policy_type"),
"access_policy_ui_override": rule.get("access_policy_ui_override", False)
}
def get_services_snapshot() -> List[Dict[str, Any]]:
with state_lock:
return [
serialize_managed_rule(rule_key, rule.copy())
for rule_key, rule in managed_rules.items()
]

View file

@ -8,6 +8,8 @@ let manualTunnelTomSelect = null;
let cachedTunnels = null;
let cachedZones = null;
let manualZoneDetectionTimeout = null;
let servicesSnapshotPromise = null;
let servicesSnapshotQueued = false;
function getMasterApiKey() {
const meta = document.querySelector('meta[name="dockflare-api-key"]');
@ -300,6 +302,178 @@ function fixResourcesAndBase() {
};
}
function fetchServicesSnapshot() {
const url = `${document.baseURI}api/v2/services?t=${Date.now()}`;
return fetch(url, { headers: buildApiHeaders() })
.then(response => {
if (!response.ok) {
throw new Error(`Snapshot request failed: ${response.status}`);
}
return response.json();
})
.then(payload => Array.isArray(payload.services) ? payload.services : []);
}
function updateRowFromService(row, service) {
if (!row || !service) return;
row.dataset.ruleStatus = service.status || '';
row.dataset.ruleSource = service.source || '';
const statusCell = row.querySelector('[data-role="status-cell"]');
const statusBadge = statusCell ? statusCell.querySelector('.status-badge') : null;
if (statusBadge) {
if (service.source === 'manual') {
statusBadge.textContent = 'Manual';
statusBadge.className = 'badge badge-info badge-sm status-badge';
} else {
const normalizedStatus = (service.status || 'unknown').replace(/_/g, ' ');
statusBadge.textContent = normalizedStatus;
let badgeClass = 'badge-success';
if (service.status && service.status.includes('pending')) {
badgeClass = 'badge-warning';
} else if (service.status && service.status.includes('error')) {
badgeClass = 'badge-error';
}
statusBadge.className = `badge ${badgeClass} badge-sm status-badge`;
}
}
const expiresCell = row.querySelector('[data-role="expires-cell"]');
if (expiresCell) {
if (service.status === 'pending_deletion' && service.delete_at) {
let container = expiresCell.querySelector('[data-delete-at]');
if (!container) {
expiresCell.innerHTML = '';
container = document.createElement('div');
expiresCell.appendChild(container);
}
container.setAttribute('data-delete-at', service.delete_at);
let absoluteSpan = container.querySelector('.absolute-time-display');
if (!absoluteSpan) {
absoluteSpan = document.createElement('span');
absoluteSpan.className = 'absolute-time-display';
container.appendChild(absoluteSpan);
}
let countdownSpan = container.querySelector('.countdown-timer');
if (!countdownSpan) {
countdownSpan = document.createElement('span');
countdownSpan.className = 'countdown-timer block text-xs opacity-80';
container.appendChild(countdownSpan);
}
} else {
expiresCell.innerHTML = '<span class="text-xs opacity-60">N/A</span>';
}
}
}
function removeServiceRow(ruleId) {
if (!ruleId) return false;
let removed = false;
document.querySelectorAll('tr[data-rule-key]').forEach(row => {
if (row.dataset.ruleKey === ruleId) {
row.remove();
removed = true;
}
});
return removed;
}
function applyServicesSnapshot(services) {
const servicesById = new Map();
services.forEach(service => {
if (service && service.id) {
servicesById.set(service.id, service);
}
});
const rows = Array.from(document.querySelectorAll('tr[data-rule-key]'));
rows.forEach(row => {
const key = row.dataset.ruleKey;
if (!servicesById.has(key)) {
row.remove();
return;
}
const service = servicesById.get(key);
updateRowFromService(row, service);
servicesById.delete(key);
});
if (servicesById.size > 0) {
window.location.reload();
return;
}
updateCountdowns();
}
function scheduleServicesSnapshotRefresh() {
if (!document.querySelector('tr[data-rule-key]')) {
return;
}
if (servicesSnapshotPromise) {
servicesSnapshotQueued = true;
return;
}
servicesSnapshotPromise = fetchServicesSnapshot()
.then(applyServicesSnapshot)
.catch(error => {
console.warn('Failed to refresh services snapshot:', error);
})
.finally(() => {
servicesSnapshotPromise = null;
if (servicesSnapshotQueued) {
servicesSnapshotQueued = false;
scheduleServicesSnapshotRefresh();
}
});
}
function findRowByRuleKey(ruleId) {
if (!ruleId) return null;
const rows = document.querySelectorAll('tr[data-rule-key]');
for (const row of rows) {
if (row.dataset.ruleKey === ruleId) {
return row;
}
}
return null;
}
function handleStructuredStateEvent(message) {
const eventType = message.type;
const data = message.data || {};
const ruleId = data.id;
switch (eventType) {
case 'snapshot_refresh':
scheduleServicesSnapshotRefresh();
break;
case 'service_deleted':
if (!removeServiceRow(ruleId)) {
scheduleServicesSnapshotRefresh();
}
break;
case 'service_pending_deletion':
case 'service_updated':
const targetRow = findRowByRuleKey(ruleId);
if (targetRow && data) {
updateRowFromService(targetRow, data);
updateCountdowns();
} else {
scheduleServicesSnapshotRefresh();
}
break;
case 'service_created':
default:
scheduleServicesSnapshotRefresh();
break;
}
}
function connectStateUpdateSource() {
if (!window.EventSource) {
console.error("Browser doesn't support Server-Sent Events. State auto-refresh disabled.");
@ -310,9 +484,29 @@ function connectStateUpdateSource() {
const eventSource = new EventSource(streamUrl);
eventSource.onmessage = function(event) {
if (event.data === "update") {
console.log("State update received, reloading page.");
window.location.reload(true);
if (!event.data) {
return;
}
if (event.data === 'update') {
scheduleServicesSnapshotRefresh();
return;
}
if (event.data.trim().length === 0) {
return;
}
try {
const message = JSON.parse(event.data);
if (message && message.type) {
handleStructuredStateEvent(message);
} else {
scheduleServicesSnapshotRefresh();
}
} catch (error) {
console.warn('Failed to parse state stream payload:', error);
scheduleServicesSnapshotRefresh();
}
};
@ -1165,11 +1359,14 @@ document.addEventListener('DOMContentLoaded', function() {
}
if (document.getElementById('reconciliation-status')) {
updateReconciliationStatus();
setInterval(updateReconciliationStatus, 2000);
connectStateUpdateSource();
}
connectStateUpdateSource();
scheduleServicesSnapshotRefresh();
startServerPing();
// Universal Cleanup

View file

@ -87,13 +87,13 @@
</thead>
<tbody>
{% for hostname, details in rules.items()|sort %}
<tr>
<td class="p-3 whitespace-nowrap">
<tr data-rule-key="{{ hostname }}" data-rule-status="{{ details.status }}" data-rule-source="{{ details.source }}">
<td class="p-3 whitespace-nowrap" data-role="status-cell">
{% if details.source == 'manual' %}
<span class="badge badge-info badge-sm">Manual</span>
<span class="badge badge-info badge-sm status-badge">Manual</span>
{% else %}
{% set status_badge_color = 'badge-warning' if 'pending' in details.status else 'badge-success' %}
<span class="badge {{ status_badge_color }} badge-sm">
<span class="badge {{ status_badge_color }} badge-sm status-badge">
{{ details.status.replace('_',' ') }}
</span>
{% endif %}
@ -205,14 +205,14 @@
{% endif %}
</div>
</td>
<td class="p-3 whitespace-nowrap text-sm opacity-70">
<td class="p-3 whitespace-nowrap text-sm opacity-70" data-role="expires-cell">
{% if details.status=='pending_deletion' and details.delete_at %}
<div data-delete-at="{{ details.delete_at.isoformat() }}">
<span class="absolute-time-display"></span>
<span class="absolute-time-display"></span>
<span class="countdown-timer block text-xs opacity-80"></span>
</div>
{% else %}
N/A
{% else %}
<span class="text-xs opacity-60">N/A</span>
{% endif %}
</td>
<td class="p-3 whitespace-nowrap text-sm" style="min-width: 120px;">

View file

@ -19,16 +19,16 @@ import copy
import logging
import time
import json
import queue
from datetime import datetime, timezone, timedelta
import secrets
import uuid
from flask import Blueprint, jsonify, request, current_app, url_for
from app import config, docker_client, tunnel_state, cloudflared_agent_state, state_update_queue
from app import config, docker_client, tunnel_state, cloudflared_agent_state, publish_state_event
from app.core.state_manager import (
managed_rules, state_lock, save_state,
add_agent, get_agent, update_agent, list_agents, remove_agent, add_agent_key, revoke_agent_key, find_agent_id_by_key, list_agent_keys, get_agent_key_info
add_agent, get_agent, update_agent, list_agents, remove_agent, add_agent_key, revoke_agent_key, find_agent_id_by_key, list_agent_keys, get_agent_key_info,
get_services_snapshot
)
from app.core.tunnel_manager import (
start_cloudflared_container,
@ -136,6 +136,12 @@ def _ensure_agent_api_key(agent_id, agent_record, token):
def get_effective_tunnel_id():
return tunnel_state.get("id") if not config.USE_EXTERNAL_CLOUDFLARED else config.EXTERNAL_TUNNEL_ID
@api_v2_bp.route('/services', methods=['GET'])
def list_services():
snapshot = get_services_snapshot()
return jsonify({"services": snapshot})
@api_v2_bp.route('/overview', methods=['GET'])
def get_overview_data():
rules_for_api = {}
@ -497,10 +503,7 @@ def create_manual_rule_api():
save_state()
state_changed = True
if state_changed:
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue full while broadcasting manual rule change")
publish_state_event('snapshot_refresh')
try:
create_cloudflare_dns_record(zone_id, hostname, tunnel_id)
except Exception as dns_error:
@ -730,6 +733,7 @@ def process_agent_container_start(payload, agent_id):
if state_changed_locally:
save_state()
publish_state_event('snapshot_refresh')
if needs_tunnel_config_update:
logging.info(f"AGENT_PROCESS: DNS and tunnel config update needed for agent {agent_id}.")
@ -803,6 +807,7 @@ def process_agent_container_stop(payload, agent_id):
rule["delete_at"] = datetime.now(timezone.utc) + grace_delta
logging.info(f"AGENT_PROCESS_STOP: Rule for {rule_key} scheduled for deletion (grace period: {grace_period}s)")
save_state()
publish_state_event('snapshot_refresh')
logging.info(f"AGENT_PROCESS_STOP: Scheduled {len(rule_keys_affected)} rules for deletion from agent {agent_id}")
else:
logging.info(f"AGENT_PROCESS_STOP: No active agent-managed rules found for container {container_id[:12]} from agent {agent_id}")
@ -1068,6 +1073,7 @@ def agents_post_events(agent_id):
if rules_marked:
logging.info(f"AGENTS_EVENTS: Marked {rules_marked} agent-managed rules for agent {agent_id} as pending_deletion due to missing containers in status_report.")
save_state()
publish_state_event('snapshot_refresh')
except Exception as e:
logging.error(f"AGENTS_EVENTS: Error while marking missing agent rules pending_deletion for {agent_id}: {e}", exc_info=True)
except Exception as e:
@ -1349,10 +1355,7 @@ def delete_manual_rule(rule_key):
config_update_success = update_cloudflare_config(tunnel_id_for_delete)
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue full while handling manual rule delete")
publish_state_event('snapshot_refresh')
if config_update_success:
message = f"Manual rule {rule_key} deleted."

View file

@ -33,7 +33,7 @@ from flask import (
from flask_login import current_user, login_required, login_user
from app.core.user import User
from app import config, docker_client, tunnel_state, cloudflared_agent_state, log_queue, state_update_queue
from app import config, docker_client, tunnel_state, cloudflared_agent_state, log_queue, state_update_queue, publish_state_event
from app.core.cache import CACHE_ENABLED
from app.core.state_manager import managed_rules, access_groups, state_lock, save_state, load_state
from app.core.tunnel_manager import (
@ -699,10 +699,7 @@ def revert_access_policy_to_labels(hostname):
state_changed_for_revert = True
if state_changed_for_revert:
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
if app_id_to_delete_if_any:
if delete_cloudflare_access_application(app_id_to_delete_if_any):
@ -996,10 +993,7 @@ def force_delete_rule_route(hostname):
del managed_rules[hostname]
rule_removed_from_state = True
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
if rule_removed_from_state and not config.USE_EXTERNAL_CLOUDFLARED:
if update_cloudflare_config():
pass
@ -1265,10 +1259,7 @@ def ui_add_manual_rule_route():
"tunnel_name": target_tunnel_name
}
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
if update_cloudflare_config(target_tunnel_id):
create_cloudflare_dns_record(target_zone_id, full_hostname, target_tunnel_id)
@ -1452,10 +1443,7 @@ def ui_edit_manual_rule_route():
del managed_rules[rule_key]
managed_rules[new_key] = rule_entry
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
effective_tunnel_id = tunnel_state.get("id") if not config.USE_EXTERNAL_CLOUDFLARED else config.EXTERNAL_TUNNEL_ID
@ -1495,10 +1483,7 @@ def ui_delete_manual_rule_route(rule_key_from_url):
del managed_rules[rule_key_from_url]
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately after delete.")
publish_state_event('snapshot_refresh')
dns_deleted_ok = True
if hostname_for_dns and zone_id_for_delete and rule_tunnel_id:
@ -1596,10 +1581,7 @@ def create_access_group():
}
access_groups[group_id] = new_group
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
flash(f"Success: Access Group '{display_name}' created.", "success")
return redirect(url_for('web.access_policies_page'))
@ -1632,10 +1614,7 @@ def edit_access_group(group_id):
}
access_groups[group_id] = updated_group
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
flash(f"Success: Access Group '{display_name}' updated. Triggering reconciliation.", "success")
reconcile_state_threaded()
@ -1661,10 +1640,7 @@ def delete_access_group(group_id):
display_name = access_groups[group_id]['display_name']
del access_groups[group_id]
save_state()
try:
state_update_queue.put_nowait('update')
except queue.Full:
logging.warning("State update queue is full. UI may not refresh immediately.")
publish_state_event('snapshot_refresh')
flash(f"Success: Access Group '{display_name}' has been deleted.", "success")
return redirect(url_for('web.access_policies_page'))