perf: reduce polling allocations and guest metadata load

This commit is contained in:
rcourtman 2025-10-25 13:12:47 +00:00
parent 138d8facd2
commit d643dcf0bc
22 changed files with 1980 additions and 392 deletions

View file

@ -3,7 +3,10 @@
The Pulse host agent extends monitoring to standalone servers that do not expose
Proxmox or Docker APIs. With it you can surface uptime, OS metadata, CPU load,
memory/disk utilisation, and connection health for any Linux, macOS, or Windows
machine alongside the rest of your infrastructure.
machine alongside the rest of your infrastructure. Beginning with the upcoming
release the installer now handshakes with Pulse in real time so you can confirm
registration directly from the UI and receive host-agent alerts alongside your
existing Docker/Proxmox notifications.
## Prerequisites
@ -13,6 +16,11 @@ machine alongside the rest of your infrastructure.
> The agent only initiates outbound connections; no inbound firewall rules are required.
If your Pulse instance does not require API tokens (e.g. during an on-premises
lab install) you can still generate commands without embedding a credential.
Confirm the warning in **Settings → Agents → Host agents** and the script will
prompt for a token instead of hard-coding one.
## Quick Start
> Replace `<api-token>` with a Pulse API token limited to the `host-agent:report` scope. Tokens generated from **Settings → Agents → Host agents** already apply this scope.
@ -151,6 +159,17 @@ Run `pulse-host-agent --help` for the full list.
- **Settings → Agents → Host agents** lists every reporting host and provides ready-made install commands.
- The **Servers** tab surfaces host telemetry alongside Proxmox/Docker data in the main dashboard.
### Checking installation status
- Click **Check status** under **Settings → Agents → Host agents** and enter the host ID or hostname you just installed.
- Pulse hits `/api/agents/host/lookup`, highlights the matching row for 10seconds, and refreshes the connection badge, last-seen timestamp, and agent version in-line.
- If the host has not checked in yet, the UI returns a friendly "Host has not registered" message so you can retry without re-running the script.
### Alerts and notifications
- Host agents now participate in the main alert engine. Offline detection, metric thresholds, and override scopes (global or per-host) live in **Settings → Alerts → Thresholds** beside your Docker and Proxmox rules.
- Alert notifications, webhooks, and quiet-hours behaviour reuse the existing pipelines—no extra setup is required once you enable host-agent monitoring.
## Updating
Since the agent is a single static binary, updates are as simple as replacing the file and restarting your launchd/systemd unit. The Settings pane always links to the latest release artefacts.

12
docs/RELEASE_NOTES.md Normal file
View file

@ -0,0 +1,12 @@
# Upcoming Release Highlights
## Improvements
- Host agent installs now report back to Pulse in real time. The **Settings → Agents → Host agents** view surfaces the lookup status, highlights the matching row, and refreshes the connection badge without reloading.
- Host agents participate in the alert system alongside Proxmox and Docker sources. Offline detection, thresholds, and overrides live in **Settings → Alerts** and flow through the existing notification channels.
## Documentation
- Updated the host agent guide with the new lookup workflow, token-free commands, and alert integration notes.
_(Replace this stub with per-version notes when tagging the release.)_

View file

@ -8,6 +8,7 @@ import type {
VM,
Container,
Node,
Host,
Alert,
Storage,
PBSInstance,
@ -27,7 +28,15 @@ import type {
} from '@/types/alerts';
import { ResourceTable, Resource, GroupHeaderMeta } from './ResourceTable';
import { useAlertsActivation } from '@/stores/alertsActivation';
type OverrideType = 'guest' | 'node' | 'storage' | 'pbs' | 'pmg' | 'dockerHost' | 'dockerContainer';
type OverrideType =
| 'guest'
| 'node'
| 'hostAgent'
| 'storage'
| 'pbs'
| 'pmg'
| 'dockerHost'
| 'dockerContainer';
type OfflineState = 'off' | 'warning' | 'critical';
@ -133,6 +142,7 @@ interface ThresholdsTableProps {
setRawOverridesConfig: (config: Record<string, RawOverrideConfig>) => void;
allGuests: () => (VM | Container)[];
nodes: Node[];
hosts: Host[];
storage: Storage[];
dockerHosts: DockerHost[];
pbsInstances?: PBSInstance[]; // PBS instances from state
@ -156,11 +166,17 @@ interface ThresholdsTableProps {
guestPoweredOffSeverity: () => 'warning' | 'critical';
setGuestPoweredOffSeverity: (value: 'warning' | 'critical') => void;
nodeDefaults: SimpleThresholds;
hostDefaults: SimpleThresholds;
setNodeDefaults: (
value:
| Record<string, number | undefined>
| ((prev: Record<string, number | undefined>) => Record<string, number | undefined>),
) => void;
setHostDefaults: (
value:
| Record<string, number | undefined>
| ((prev: Record<string, number | undefined>) => Record<string, number | undefined>),
) => void;
dockerDefaults: {
cpu: number;
memory: number;
@ -201,11 +217,13 @@ interface ThresholdsTableProps {
setStorageDefault: (value: number) => void;
resetGuestDefaults?: () => void;
resetNodeDefaults?: () => void;
resetHostDefaults?: () => void;
resetDockerDefaults?: () => void;
resetDockerIgnoredPrefixes?: () => void;
resetStorageDefault?: () => void;
factoryGuestDefaults?: Record<string, number | undefined>;
factoryNodeDefaults?: Record<string, number | undefined>;
factoryHostDefaults?: Record<string, number | undefined>;
factoryDockerDefaults?: Record<string, number | undefined>;
factoryStorageDefault?: number;
timeThresholds: () => { guest: number; node: number; storage: number; pbs: number };
@ -235,6 +253,8 @@ interface ThresholdsTableProps {
setDisableAllNodes: (value: boolean) => void;
disableAllGuests: () => boolean;
setDisableAllGuests: (value: boolean) => void;
disableAllHosts: () => boolean;
setDisableAllHosts: (value: boolean) => void;
disableAllStorage: () => boolean;
setDisableAllStorage: (value: boolean) => void;
disableAllPBS: () => boolean;
@ -250,6 +270,8 @@ interface ThresholdsTableProps {
setDisableAllNodesOffline: (value: boolean) => void;
disableAllGuestsOffline: () => boolean;
setDisableAllGuestsOffline: (value: boolean) => void;
disableAllHostsOffline: () => boolean;
setDisableAllHostsOffline: (value: boolean) => void;
disableAllPBSOffline: () => boolean;
setDisableAllPBSOffline: (value: boolean) => void;
disableAllPMGOffline: () => boolean;
@ -568,7 +590,79 @@ export function ThresholdsTable(props: ThresholdsTableProps) {
if (search) {
return nodes.filter((n) => n.name.toLowerCase().includes(search));
}
return nodes;
return nodes;
}, []);
const hostAgentsWithOverrides = createMemo<Resource[]>((prev = []) => {
if (editingId()) {
return prev;
}
const search = searchTerm().toLowerCase();
const overridesMap = new Map((props.overrides() ?? []).map((o) => [o.id, o]));
const seen = new Set<string>();
const hosts = (props.hosts ?? []).map((host) => {
const override = overridesMap.get(host.id);
const hasCustomThresholds =
override?.thresholds &&
Object.keys(override.thresholds).some((key) => {
const k = key as keyof typeof override.thresholds;
return (
override.thresholds[k] !== undefined &&
override.thresholds[k] !== (props.hostDefaults as any)[k]
);
});
const displayName = host.displayName?.trim() || host.hostname || host.id;
const status = host.status || (host.lastSeen ? 'online' : 'offline');
seen.add(host.id);
return {
id: host.id,
name: displayName,
displayName,
rawName: host.hostname || host.id,
type: 'hostAgent' as const,
resourceType: 'Host Agent',
node: host.hostname,
instance: host.platform || host.osName || '',
status,
hasOverride:
hasCustomThresholds ||
Boolean(override?.disabled) ||
Boolean(override?.disableConnectivity),
disableConnectivity: override?.disableConnectivity || false,
thresholds: override?.thresholds || {},
defaults: props.hostDefaults,
} satisfies Resource;
});
(props.overrides() ?? [])
.filter((override) => override.type === 'hostAgent' && !seen.has(override.id))
.forEach((override) => {
const name = override.name?.trim() || override.id;
hosts.push({
id: override.id,
name,
displayName: name,
rawName: name,
type: 'hostAgent',
resourceType: 'Host Agent',
status: 'unknown',
hasOverride: true,
disableConnectivity: override.disableConnectivity || false,
thresholds: override.thresholds || {},
defaults: props.hostDefaults,
} satisfies Resource);
});
if (search) {
return hosts.filter((host) => host.name.toLowerCase().includes(search));
}
return hosts;
}, []);
// Process Docker hosts with their overrides (primarily for connectivity toggles)
@ -1318,6 +1412,13 @@ export function ThresholdsTable(props: ThresholdsTableProps) {
overrides: countOverrides(dockerHostsWithOverrides()),
tab: 'docker' as const,
},
{
key: 'hostAgents' as const,
label: 'Host Agents',
total: props.hosts?.length ?? 0,
overrides: countOverrides(hostAgentsWithOverrides()),
tab: 'docker' as const,
},
{
key: 'storage' as const,
label: 'Storage',
@ -1396,6 +1497,7 @@ export function ThresholdsTable(props: ThresholdsTableProps) {
const allDockerContainers = dockerContainersFlat();
const allResources = [
...nodesWithOverrides(),
...hostAgentsWithOverrides(),
...dockerHostsWithOverrides(),
...allGuests,
...allDockerContainers,
@ -1753,8 +1855,9 @@ export function ThresholdsTable(props: ThresholdsTableProps) {
const nodes = nodesWithOverrides();
const pbsServers = pbsServersWithOverrides();
const guests = guestsFlat();
const hostAgents = hostAgentsWithOverrides();
const dockerHosts = dockerHostsWithOverrides();
const resource = [...nodes, ...pbsServers, ...guests, ...dockerHosts].find(
const resource = [...nodes, ...pbsServers, ...guests, ...hostAgents, ...dockerHosts].find(
(r) => r.id === resourceId,
);
if (
@ -1762,6 +1865,7 @@ export function ThresholdsTable(props: ThresholdsTableProps) {
(resource.type !== 'node' &&
resource.type !== 'pbs' &&
resource.type !== 'guest' &&
resource.type !== 'hostAgent' &&
resource.type !== 'dockerHost')
)
return;
@ -2507,6 +2611,42 @@ export function ThresholdsTable(props: ThresholdsTableProps) {
</Show>
<Show when={activeTab() === 'docker'}>
<Show when={hasSection('hostAgents')}>
<div ref={registerSection('hostAgents')} class="scroll-mt-24">
<ResourceTable
title="Host Agents"
resources={hostAgentsWithOverrides()}
columns={['CPU %', 'Memory %', 'Disk %']}
activeAlerts={props.activeAlerts}
emptyMessage="No host agents match the current filters."
onEdit={startEditing}
onSaveEdit={saveEdit}
onCancelEdit={cancelEdit}
onRemoveOverride={removeOverride}
onToggleDisabled={toggleDisabled}
onToggleNodeConnectivity={toggleNodeConnectivity}
showOfflineAlertsColumn={true}
editingId={editingId}
editingThresholds={editingThresholds}
setEditingThresholds={setEditingThresholds}
formatMetricValue={formatMetricValue}
hasActiveAlert={hasActiveAlert}
globalDefaults={props.hostDefaults}
setGlobalDefaults={props.setHostDefaults}
setHasUnsavedChanges={props.setHasUnsavedChanges}
globalDisableFlag={props.disableAllHosts}
onToggleGlobalDisable={() =>
props.setDisableAllHosts(!props.disableAllHosts())
}
globalDisableOfflineFlag={props.disableAllHostsOffline}
onToggleGlobalDisableOffline={() =>
props.setDisableAllHostsOffline(!props.disableAllHostsOffline())
}
factoryDefaults={props.factoryHostDefaults}
onResetDefaults={props.resetHostDefaults}
/>
</div>
</Show>
<div class="mb-6 rounded-lg border border-gray-200 bg-white p-5 shadow-sm dark:border-gray-700 dark:bg-gray-900">
<div class="flex flex-col gap-3 sm:flex-row sm:items-center sm:justify-between">
<div>

View file

@ -57,6 +57,7 @@ const baseProps = () => ({
setRawOverridesConfig: vi.fn(),
allGuests: () => [],
nodes: [],
hosts: [],
storage: [],
dockerHosts: [],
pbsInstances: [],
@ -71,17 +72,21 @@ const baseProps = () => ({
setGuestPoweredOffSeverity: vi.fn(),
nodeDefaults: {},
setNodeDefaults: vi.fn(),
hostDefaults: { cpu: 80, memory: 85, disk: 90 },
setHostDefaults: vi.fn(),
dockerDefaults: DEFAULT_DOCKER_DEFAULTS,
setDockerDefaults: vi.fn(),
storageDefault: () => 85,
setStorageDefault: vi.fn(),
resetGuestDefaults: vi.fn(),
resetNodeDefaults: vi.fn(),
resetHostDefaults: vi.fn(),
resetDockerDefaults: vi.fn(),
resetDockerIgnoredPrefixes: undefined as (() => void) | undefined,
resetStorageDefault: vi.fn(),
factoryGuestDefaults: {},
factoryNodeDefaults: {},
factoryHostDefaults: { cpu: 80, memory: 85, disk: 90 },
factoryDockerDefaults: {},
factoryStorageDefault: 85,
backupDefaults: () => ({ enabled: false, warningDays: 7, criticalDays: 14 }),
@ -113,6 +118,8 @@ const baseProps = () => ({
setDisableAllNodes: vi.fn(),
disableAllGuests: () => false,
setDisableAllGuests: vi.fn(),
disableAllHosts: () => false,
setDisableAllHosts: vi.fn(),
disableAllStorage: () => false,
setDisableAllStorage: vi.fn(),
disableAllPBS: () => false,
@ -127,6 +134,8 @@ const baseProps = () => ({
setDisableAllNodesOffline: vi.fn(),
disableAllGuestsOffline: () => false,
setDisableAllGuestsOffline: vi.fn(),
disableAllHostsOffline: () => false,
setDisableAllHostsOffline: vi.fn(),
disableAllPBSOffline: () => false,
setDisableAllPBSOffline: vi.fn(),
disableAllPMGOffline: () => false,

View file

@ -755,7 +755,7 @@ sudo systemctl daemon-reload`;
};
return (
<tr class={`${rowClass} ${highlightClass}`}>
<tr data-host-id={host.id} class={`${rowClass} ${highlightClass}`}>
<td class="py-3 px-4">
<div class="font-medium text-gray-900 dark:text-gray-100">
{host.displayName || host.hostname || host.id}

View file

@ -0,0 +1,206 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { render, fireEvent, screen, waitFor, cleanup } from '@solidjs/testing-library';
import { createStore } from 'solid-js/store';
import { HostAgents } from '../HostAgents';
import type { Host } from '@/types/api';
let mockWsStore: {
state: { hosts: Host[]; connectionHealth?: Record<string, boolean> };
connected: () => boolean;
reconnecting: () => boolean;
activeAlerts: unknown[];
};
const lookupMock = vi.fn();
const createTokenMock = vi.fn();
vi.mock('@/App', () => ({
useWebSocket: () => mockWsStore,
}));
vi.mock('@/api/monitoring', () => ({
MonitoringAPI: {
lookupHost: (...args: unknown[]) => lookupMock(...args),
},
}));
vi.mock('@/api/security', () => ({
SecurityAPI: {
createToken: (...args: unknown[]) => createTokenMock(...args),
},
}));
const createHost = (overrides?: Partial<Host>): Host => ({
id: 'host-1',
hostname: 'host-1.local',
displayName: 'Host One',
platform: 'linux',
osName: 'Ubuntu',
osVersion: '22.04',
kernelVersion: '6.5.0',
architecture: 'x86_64',
cpuCount: 4,
cpuUsage: 12.5,
memory: {
total: 8 * 1024 * 1024 * 1024,
used: 4 * 1024 * 1024 * 1024,
free: 4 * 1024 * 1024 * 1024,
usage: 50,
balloon: 0,
swapUsed: 0,
swapTotal: 0,
},
loadAverage: [],
disks: [],
networkInterfaces: [],
sensors: {
temperatureCelsius: {},
fanRpm: {},
additional: {},
},
status: 'online',
uptimeSeconds: 12_345,
intervalSeconds: 30,
lastSeen: Date.now(),
agentVersion: '1.2.3',
tags: ['prod'],
...overrides,
});
const stubFetchSuccess = vi.fn();
beforeEach(() => {
lookupMock.mockReset();
createTokenMock.mockReset();
stubFetchSuccess.mockImplementation(
async () =>
new Response(JSON.stringify({ requiresAuth: true, apiTokenConfigured: false }), {
status: 200,
headers: { 'Content-Type': 'application/json' },
}),
);
vi.stubGlobal('fetch', stubFetchSuccess);
});
afterEach(() => {
cleanup();
vi.unstubAllGlobals();
});
describe('HostAgents lookup flow', () => {
const setupComponent = (hosts: Host[]) => {
const [state] = createStore({
hosts,
connectionHealth: {},
});
mockWsStore = {
state,
connected: () => true,
reconnecting: () => false,
activeAlerts: [],
};
return render(() => <HostAgents />);
};
it('highlights a host after a successful lookup and clears highlight after timeout', async () => {
const host = createHost();
setupComponent([host]);
createTokenMock.mockResolvedValue({
token: 'token-123',
record: {
id: 'token-record',
name: 'Test Token',
prefix: 'abcdef',
suffix: '1234',
createdAt: new Date().toISOString(),
},
});
const generateButton = screen.getByRole('button', { name: 'Generate token' });
fireEvent.click(generateButton);
await waitFor(() => expect(createTokenMock).toHaveBeenCalled(), { interval: 0 });
await waitFor(() => expect(screen.getByRole('button', { name: 'Check status' })).toBeEnabled(), {
interval: 0,
});
lookupMock.mockResolvedValue({
success: true,
host: {
id: host.id,
hostname: host.hostname,
displayName: host.displayName,
status: host.status,
connected: true,
lastSeen: host.lastSeen,
agentVersion: host.agentVersion,
},
});
const input = screen.getByPlaceholderText('Hostname or host ID') as HTMLInputElement;
fireEvent.input(input, { target: { value: host.id } });
const button = screen.getByRole('button', { name: 'Check status' });
fireEvent.click(button);
await waitFor(() => expect(lookupMock).toHaveBeenCalled(), { interval: 0 });
const [lookupArgs] = lookupMock.mock.calls.at(-1) ?? [];
expect(lookupArgs).toEqual({ id: host.id, hostname: host.id });
await waitFor(
() =>
expect(
screen.getByText('Connected', { selector: 'span' }),
).toBeInTheDocument(),
{ interval: 0 },
);
const statusBadges = screen.getAllByText('online', { selector: 'span' });
expect(statusBadges.length).toBeGreaterThan(0);
expect(screen.getByText('Agent version 1.2.3')).toBeInTheDocument();
});
it('shows an error when lookup returns no host and does not highlight rows', async () => {
const host = createHost();
const { container } = setupComponent([host]);
createTokenMock.mockResolvedValue({
token: 'token-456',
record: {
id: 'token-record-2',
name: 'Test Token 2',
prefix: 'ghijkl',
suffix: '5678',
createdAt: new Date().toISOString(),
},
});
const generateButton = screen.getByRole('button', { name: 'Generate token' });
fireEvent.click(generateButton);
await waitFor(() => expect(createTokenMock).toHaveBeenCalled(), { interval: 0 });
await waitFor(() => expect(screen.getByRole('button', { name: 'Check status' })).toBeEnabled(), {
interval: 0,
});
lookupMock.mockResolvedValue(null);
const query = 'missing-host';
const input = screen.getByPlaceholderText('Hostname or host ID') as HTMLInputElement;
fireEvent.input(input, { target: { value: query } });
const button = screen.getByRole('button', { name: 'Check status' });
fireEvent.click(button);
await waitFor(
() =>
expect(
screen.getByText(`No host has reported with "${query}" yet. Try again in a few seconds.`),
).toBeInTheDocument(),
{ interval: 0 },
);
const row = container.querySelector(`tr[data-host-id="${host.id}"]`) as HTMLTableRowElement;
expect(row.classList.contains('ring-2')).toBe(false);
});
});

View file

@ -18,7 +18,7 @@ import { AlertsAPI } from '@/api/alerts';
import { NotificationsAPI, Webhook } from '@/api/notifications';
import type { EmailConfig, AppriseConfig } from '@/api/notifications';
import type { HysteresisThreshold } from '@/types/alerts';
import type { Alert, State, VM, Container, DockerHost, DockerContainer } from '@/types/api';
import type { Alert, State, VM, Container, DockerHost, DockerContainer, Host } from '@/types/api';
import { useNavigate, useLocation } from '@solidjs/router';
import { useAlertsActivation } from '@/stores/alertsActivation';
import LayoutDashboard from 'lucide-solid/icons/layout-dashboard';
@ -806,6 +806,16 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
});
}
if (config.hostDefaults) {
setHostDefaults({
cpu: getTriggerValue(config.hostDefaults.cpu) ?? 80,
memory: getTriggerValue(config.hostDefaults.memory) ?? 85,
disk: getTriggerValue(config.hostDefaults.disk) ?? 90,
});
} else {
setHostDefaults({ ...FACTORY_HOST_DEFAULTS });
}
if (config.dockerDefaults) {
setDockerDefaults({
cpu: getTriggerValue(config.dockerDefaults.cpu) ?? 80,
@ -913,6 +923,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
// Load global disable flags
setDisableAllNodes(config.disableAllNodes ?? false);
setDisableAllGuests(config.disableAllGuests ?? false);
setDisableAllHosts(config.disableAllHosts ?? false);
setDisableAllStorage(config.disableAllStorage ?? false);
setDisableAllPBS(config.disableAllPBS ?? false);
setDisableAllPMG(config.disableAllPMG ?? false);
@ -922,6 +933,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
// Load global disable offline alerts flags
setDisableAllNodesOffline(config.disableAllNodesOffline ?? false);
setDisableAllGuestsOffline(config.disableAllGuestsOffline ?? false);
setDisableAllHostsOffline(config.disableAllHostsOffline ?? false);
setDisableAllPBSOffline(config.disableAllPBSOffline ?? false);
setDisableAllPMGOffline(config.disableAllPMGOffline ?? false);
setDisableAllDockerHostsOffline(config.disableAllDockerHostsOffline ?? false);
@ -1151,6 +1163,12 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
temperature: 80,
};
const FACTORY_HOST_DEFAULTS = {
cpu: 80,
memory: 85,
disk: 90,
};
const FACTORY_DOCKER_DEFAULTS = {
cpu: 80,
memory: 85,
@ -1178,6 +1196,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
const [guestPoweredOffSeverity, setGuestPoweredOffSeverity] = createSignal<'warning' | 'critical'>('warning');
const [nodeDefaults, setNodeDefaults] = createSignal<Record<string, number | undefined>>({ ...FACTORY_NODE_DEFAULTS });
const [hostDefaults, setHostDefaults] = createSignal<Record<string, number | undefined>>({ ...FACTORY_HOST_DEFAULTS });
const [dockerDefaults, setDockerDefaults] = createSignal({ ...FACTORY_DOCKER_DEFAULTS });
const [dockerIgnoredPrefixes, setDockerIgnoredPrefixes] = createSignal<string[]>([]);
@ -1198,6 +1217,11 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
setHasUnsavedChanges(true);
};
const resetHostDefaults = () => {
setHostDefaults({ ...FACTORY_HOST_DEFAULTS });
setHasUnsavedChanges(true);
};
const resetDockerDefaults = () => {
setDockerDefaults({ ...FACTORY_DOCKER_DEFAULTS });
setHasUnsavedChanges(true);
@ -1255,6 +1279,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
// Global disable flags per resource type
const [disableAllNodes, setDisableAllNodes] = createSignal(false);
const [disableAllGuests, setDisableAllGuests] = createSignal(false);
const [disableAllHosts, setDisableAllHosts] = createSignal(false);
const [disableAllStorage, setDisableAllStorage] = createSignal(false);
const [disableAllPBS, setDisableAllPBS] = createSignal(false);
const [disableAllPMG, setDisableAllPMG] = createSignal(false);
@ -1264,6 +1289,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
// Global disable offline alerts flags
const [disableAllNodesOffline, setDisableAllNodesOffline] = createSignal(false);
const [disableAllGuestsOffline, setDisableAllGuestsOffline] = createSignal(false);
const [disableAllHostsOffline, setDisableAllHostsOffline] = createSignal(false);
const [disableAllPBSOffline, setDisableAllPBSOffline] = createSignal(false);
const [disableAllPMGOffline, setDisableAllPMGOffline] = createSignal(false);
const [disableAllDockerHostsOffline, setDisableAllDockerHostsOffline] = createSignal(false);
@ -1404,6 +1430,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
// Global disable flags per resource type
disableAllNodes: disableAllNodes(),
disableAllGuests: disableAllGuests(),
disableAllHosts: disableAllHosts(),
disableAllStorage: disableAllStorage(),
disableAllPBS: disableAllPBS(),
disableAllPMG: disableAllPMG(),
@ -1413,6 +1440,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
disableAllNodesOffline: disableAllNodesOffline(),
disableAllGuestsOffline: disableAllGuestsOffline(),
disableAllPBSOffline: disableAllPBSOffline(),
disableAllHostsOffline: disableAllHostsOffline(),
disableAllPMGOffline: disableAllPMGOffline(),
disableAllDockerHostsOffline: disableAllDockerHostsOffline(),
guestDefaults: {
@ -1432,6 +1460,11 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
disk: createHysteresisThreshold(nodeDefaults().disk),
temperature: createHysteresisThreshold(nodeDefaults().temperature),
},
hostDefaults: {
cpu: createHysteresisThreshold(hostDefaults().cpu),
memory: createHysteresisThreshold(hostDefaults().memory),
disk: createHysteresisThreshold(hostDefaults().disk),
},
dockerDefaults: {
cpu: createHysteresisThreshold(dockerDefaults().cpu),
memory: createHysteresisThreshold(dockerDefaults().memory),
@ -1677,6 +1710,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
setRawOverridesConfig={setRawOverridesConfig}
allGuests={allGuests}
state={state}
hosts={state.hosts || []}
guestDefaults={guestDefaults}
guestDisableConnectivity={guestDisableConnectivity}
setGuestDefaults={setGuestDefaults}
@ -1685,6 +1719,8 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
setGuestPoweredOffSeverity={setGuestPoweredOffSeverity}
nodeDefaults={nodeDefaults}
setNodeDefaults={setNodeDefaults}
hostDefaults={hostDefaults}
setHostDefaults={setHostDefaults}
dockerDefaults={dockerDefaults}
setDockerDefaults={setDockerDefaults}
dockerIgnoredPrefixes={dockerIgnoredPrefixes}
@ -1693,6 +1729,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
setStorageDefault={setStorageDefault}
resetGuestDefaults={resetGuestDefaults}
resetNodeDefaults={resetNodeDefaults}
resetHostDefaults={resetHostDefaults}
resetDockerDefaults={resetDockerDefaults}
resetDockerIgnoredPrefixes={resetDockerIgnoredPrefixes}
resetStorageDefault={resetStorageDefault}
@ -1700,6 +1737,7 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
resetBackupDefaults={resetBackupDefaults}
factoryGuestDefaults={FACTORY_GUEST_DEFAULTS}
factoryNodeDefaults={FACTORY_NODE_DEFAULTS}
factoryHostDefaults={FACTORY_HOST_DEFAULTS}
factoryDockerDefaults={FACTORY_DOCKER_DEFAULTS}
factoryStorageDefault={FACTORY_STORAGE_DEFAULT}
snapshotFactoryDefaults={FACTORY_SNAPSHOT_DEFAULTS}
@ -1721,6 +1759,8 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
setDisableAllNodes={setDisableAllNodes}
disableAllGuests={disableAllGuests}
setDisableAllGuests={setDisableAllGuests}
disableAllHosts={disableAllHosts}
setDisableAllHosts={setDisableAllHosts}
disableAllStorage={disableAllStorage}
setDisableAllStorage={setDisableAllStorage}
disableAllPBS={disableAllPBS}
@ -1735,6 +1775,8 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
setDisableAllNodesOffline={setDisableAllNodesOffline}
disableAllGuestsOffline={disableAllGuestsOffline}
setDisableAllGuestsOffline={setDisableAllGuestsOffline}
disableAllHostsOffline={disableAllHostsOffline}
setDisableAllHostsOffline={setDisableAllHostsOffline}
disableAllPBSOffline={disableAllPBSOffline}
setDisableAllPBSOffline={setDisableAllPBSOffline}
disableAllPMGOffline={disableAllPMGOffline}
@ -2192,8 +2234,10 @@ function OverviewTab(props: {
interface ThresholdsTabProps {
allGuests: () => (VM | Container)[];
state: State;
hosts: Host[];
guestDefaults: () => Record<string, number | undefined>;
nodeDefaults: () => Record<string, number | undefined>;
hostDefaults: () => Record<string, number | undefined>;
dockerDefaults: () => { cpu: number; memory: number; restartCount: number; restartWindow: number; memoryWarnPct: number; memoryCriticalPct: number };
dockerIgnoredPrefixes: () => string[];
storageDefault: () => number;
@ -2221,6 +2265,11 @@ interface ThresholdsTabProps {
| Record<string, number | undefined>
| ((prev: Record<string, number | undefined>) => Record<string, number | undefined>),
) => void;
setHostDefaults: (
value:
| Record<string, number | undefined>
| ((prev: Record<string, number | undefined>) => Record<string, number | undefined>),
) => void;
setDockerDefaults: (
value: { cpu: number; memory: number; restartCount: number; restartWindow: number; memoryWarnPct: number; memoryCriticalPct: number } | ((prev: { cpu: number; memory: number; restartCount: number; restartWindow: number; memoryWarnPct: number; memoryCriticalPct: number }) => { cpu: number; memory: number; restartCount: number; restartWindow: number; memoryWarnPct: number; memoryCriticalPct: number }),
) => void;
@ -2258,6 +2307,8 @@ interface ThresholdsTabProps {
setDisableAllNodes: (value: boolean) => void;
disableAllGuests: () => boolean;
setDisableAllGuests: (value: boolean) => void;
disableAllHosts: () => boolean;
setDisableAllHosts: (value: boolean) => void;
disableAllStorage: () => boolean;
setDisableAllStorage: (value: boolean) => void;
disableAllPBS: () => boolean;
@ -2273,6 +2324,8 @@ interface ThresholdsTabProps {
setDisableAllNodesOffline: (value: boolean) => void;
disableAllGuestsOffline: () => boolean;
setDisableAllGuestsOffline: (value: boolean) => void;
disableAllHostsOffline: () => boolean;
setDisableAllHostsOffline: (value: boolean) => void;
disableAllPBSOffline: () => boolean;
setDisableAllPBSOffline: (value: boolean) => void;
disableAllPMGOffline: () => boolean;
@ -2282,11 +2335,13 @@ interface ThresholdsTabProps {
// Reset functions and factory defaults
resetGuestDefaults?: () => void;
resetNodeDefaults?: () => void;
resetHostDefaults?: () => void;
resetDockerDefaults?: () => void;
resetDockerIgnoredPrefixes?: () => void;
resetStorageDefault?: () => void;
factoryGuestDefaults?: Record<string, number | undefined>;
factoryNodeDefaults?: Record<string, number | undefined>;
factoryHostDefaults?: Record<string, number | undefined>;
factoryDockerDefaults?: Record<string, number | undefined>;
factoryStorageDefault?: number;
}
@ -2300,6 +2355,7 @@ function ThresholdsTab(props: ThresholdsTabProps) {
setRawOverridesConfig={props.setRawOverridesConfig}
allGuests={props.allGuests}
nodes={props.state.nodes || []}
hosts={props.hosts}
storage={props.state.storage || []}
dockerHosts={props.state.dockerHosts || []}
pbsInstances={props.state.pbs || []}
@ -2317,7 +2373,9 @@ function ThresholdsTab(props: ThresholdsTabProps) {
guestPoweredOffSeverity={props.guestPoweredOffSeverity}
setGuestPoweredOffSeverity={props.setGuestPoweredOffSeverity}
nodeDefaults={props.nodeDefaults()}
hostDefaults={props.hostDefaults()}
setNodeDefaults={props.setNodeDefaults}
setHostDefaults={props.setHostDefaults}
dockerDefaults={props.dockerDefaults()}
setDockerDefaults={props.setDockerDefaults}
dockerIgnoredPrefixes={props.dockerIgnoredPrefixes}
@ -2342,6 +2400,8 @@ function ThresholdsTab(props: ThresholdsTabProps) {
setDisableAllNodes={props.setDisableAllNodes}
disableAllGuests={props.disableAllGuests}
setDisableAllGuests={props.setDisableAllGuests}
disableAllHosts={props.disableAllHosts}
setDisableAllHosts={props.setDisableAllHosts}
disableAllStorage={props.disableAllStorage}
setDisableAllStorage={props.setDisableAllStorage}
disableAllPBS={props.disableAllPBS}
@ -2356,6 +2416,8 @@ function ThresholdsTab(props: ThresholdsTabProps) {
setDisableAllNodesOffline={props.setDisableAllNodesOffline}
disableAllGuestsOffline={props.disableAllGuestsOffline}
setDisableAllGuestsOffline={props.setDisableAllGuestsOffline}
disableAllHostsOffline={props.disableAllHostsOffline}
setDisableAllHostsOffline={props.setDisableAllHostsOffline}
disableAllPBSOffline={props.disableAllPBSOffline}
setDisableAllPBSOffline={props.setDisableAllPBSOffline}
disableAllPMGOffline={props.disableAllPMGOffline}
@ -2364,11 +2426,13 @@ function ThresholdsTab(props: ThresholdsTabProps) {
setDisableAllDockerHostsOffline={props.setDisableAllDockerHostsOffline}
resetGuestDefaults={props.resetGuestDefaults}
resetNodeDefaults={props.resetNodeDefaults}
resetHostDefaults={props.resetHostDefaults}
resetDockerDefaults={props.resetDockerDefaults}
resetDockerIgnoredPrefixes={props.resetDockerIgnoredPrefixes}
resetStorageDefault={props.resetStorageDefault}
factoryGuestDefaults={props.factoryGuestDefaults}
factoryNodeDefaults={props.factoryNodeDefaults}
factoryHostDefaults={props.factoryHostDefaults}
factoryDockerDefaults={props.factoryDockerDefaults}
factoryStorageDefault={props.factoryStorageDefault}
/>

View file

@ -108,6 +108,7 @@ export interface AlertConfig {
activationTime?: string;
guestDefaults: AlertThresholds;
nodeDefaults: AlertThresholds;
hostDefaults?: AlertThresholds;
storageDefault: HysteresisThreshold;
dockerDefaults?: DockerThresholdConfig;
dockerIgnoredContainerPrefixes?: string[];
@ -194,12 +195,14 @@ export interface AlertConfig {
disableAllStorage?: boolean;
disableAllPBS?: boolean;
disableAllPMG?: boolean;
disableAllHosts?: boolean;
disableAllDockerHosts?: boolean;
disableAllDockerContainers?: boolean;
disableAllNodesOffline?: boolean;
disableAllGuestsOffline?: boolean;
disableAllPBSOffline?: boolean;
disableAllPMGOffline?: boolean;
disableAllHostsOffline?: boolean;
disableAllDockerHostsOffline?: boolean;
}

View file

@ -325,6 +325,7 @@ type AlertConfig struct {
ActivationTime *time.Time `json:"activationTime,omitempty"`
GuestDefaults ThresholdConfig `json:"guestDefaults"`
NodeDefaults ThresholdConfig `json:"nodeDefaults"`
HostDefaults ThresholdConfig `json:"hostDefaults"`
StorageDefault HysteresisThreshold `json:"storageDefault"`
DockerDefaults DockerThresholdConfig `json:"dockerDefaults"`
DockerIgnoredContainerPrefixes []string `json:"dockerIgnoredContainerPrefixes,omitempty"`
@ -337,6 +338,7 @@ type AlertConfig struct {
// Global disable flags per resource type
DisableAllNodes bool `json:"disableAllNodes"` // Disable all alerts for Proxmox nodes
DisableAllGuests bool `json:"disableAllGuests"` // Disable all alerts for VMs/containers
DisableAllHosts bool `json:"disableAllHosts"` // Disable all alerts for Pulse host agents
DisableAllStorage bool `json:"disableAllStorage"` // Disable all alerts for storage
DisableAllPBS bool `json:"disableAllPBS"` // Disable all alerts for PBS servers
DisableAllPMG bool `json:"disableAllPMG"` // Disable all alerts for PMG instances
@ -344,6 +346,7 @@ type AlertConfig struct {
DisableAllDockerContainers bool `json:"disableAllDockerContainers"` // Disable all alerts for Docker containers
DisableAllNodesOffline bool `json:"disableAllNodesOffline"` // Disable node offline/connectivity alerts globally
DisableAllGuestsOffline bool `json:"disableAllGuestsOffline"` // Disable guest powered-off alerts globally
DisableAllHostsOffline bool `json:"disableAllHostsOffline"` // Disable host agent offline alerts globally
DisableAllPBSOffline bool `json:"disableAllPBSOffline"` // Disable PBS offline alerts globally
DisableAllPMGOffline bool `json:"disableAllPMGOffline"` // Disable PMG offline alerts globally
DisableAllDockerHostsOffline bool `json:"disableAllDockerHostsOffline"` // Disable Docker host offline alerts globally
@ -488,6 +491,11 @@ func NewManager() *Manager {
Disk: &HysteresisThreshold{Trigger: 90, Clear: 85},
Temperature: &HysteresisThreshold{Trigger: 80, Clear: 75}, // Warning at 80°C, clear at 75°C
},
HostDefaults: ThresholdConfig{
CPU: &HysteresisThreshold{Trigger: 80, Clear: 75},
Memory: &HysteresisThreshold{Trigger: 85, Clear: 80},
Disk: &HysteresisThreshold{Trigger: 90, Clear: 85},
},
DockerDefaults: DockerThresholdConfig{
CPU: HysteresisThreshold{Trigger: 80, Clear: 75},
Memory: HysteresisThreshold{Trigger: 85, Clear: 80},
@ -886,6 +894,32 @@ func (m *Manager) UpdateConfig(config AlertConfig) {
}
}
// Ensure host agent defaults exist
if config.HostDefaults.CPU == nil || config.HostDefaults.CPU.Trigger <= 0 {
config.HostDefaults.CPU = &HysteresisThreshold{Trigger: 80, Clear: 75}
} else if config.HostDefaults.CPU.Clear <= 0 {
config.HostDefaults.CPU.Clear = config.HostDefaults.CPU.Trigger - 5
if config.HostDefaults.CPU.Clear <= 0 {
config.HostDefaults.CPU.Clear = 75
}
}
if config.HostDefaults.Memory == nil || config.HostDefaults.Memory.Trigger <= 0 {
config.HostDefaults.Memory = &HysteresisThreshold{Trigger: 85, Clear: 80}
} else if config.HostDefaults.Memory.Clear <= 0 {
config.HostDefaults.Memory.Clear = config.HostDefaults.Memory.Trigger - 5
if config.HostDefaults.Memory.Clear <= 0 {
config.HostDefaults.Memory.Clear = 80
}
}
if config.HostDefaults.Disk == nil || config.HostDefaults.Disk.Trigger <= 0 {
config.HostDefaults.Disk = &HysteresisThreshold{Trigger: 90, Clear: 85}
} else if config.HostDefaults.Disk.Clear <= 0 {
config.HostDefaults.Disk.Clear = config.HostDefaults.Disk.Trigger - 5
if config.HostDefaults.Disk.Clear <= 0 {
config.HostDefaults.Disk.Clear = 85
}
}
// Normalize any metric-level delay overrides
config.MetricTimeThresholds = normalizeMetricTimeThresholds(config.MetricTimeThresholds)
@ -1879,6 +1913,373 @@ func (m *Manager) CheckNode(node models.Node) {
}
}
func hostResourceID(hostID string) string {
trimmed := strings.TrimSpace(hostID)
if trimmed == "" {
return "host:unknown"
}
return fmt.Sprintf("host:%s", trimmed)
}
func hostDisplayName(host models.Host) string {
if name := strings.TrimSpace(host.DisplayName); name != "" {
return name
}
if name := strings.TrimSpace(host.Hostname); name != "" {
return name
}
if host.ID != "" {
return host.ID
}
return "Host"
}
func hostInstanceName(host models.Host) string {
if platform := strings.TrimSpace(host.Platform); platform != "" {
return platform
}
if osName := strings.TrimSpace(host.OSName); osName != "" {
return osName
}
return "Host Agent"
}
func sanitizeHostComponent(value string) string {
value = strings.TrimSpace(strings.ToLower(value))
if value == "" {
return "unknown"
}
var builder strings.Builder
lastHyphen := false
for _, r := range value {
switch {
case r >= 'a' && r <= 'z':
builder.WriteRune(r)
lastHyphen = false
case r >= '0' && r <= '9':
builder.WriteRune(r)
lastHyphen = false
default:
if !lastHyphen {
builder.WriteRune('-')
lastHyphen = true
}
}
}
sanitized := strings.Trim(builder.String(), "-")
if sanitized == "" {
return "unknown"
}
return sanitized
}
func hostDiskResourceID(host models.Host, disk models.Disk) (string, string) {
label := strings.TrimSpace(disk.Mountpoint)
if label == "" {
label = strings.TrimSpace(disk.Device)
}
if label == "" {
label = "disk"
}
resourceID := fmt.Sprintf("%s/disk:%s", hostResourceID(host.ID), sanitizeHostComponent(label))
resourceName := fmt.Sprintf("%s (%s)", hostDisplayName(host), label)
return resourceID, resourceName
}
// CheckHost evaluates host agent telemetry for alerts.
func (m *Manager) CheckHost(host models.Host) {
if host.ID == "" {
return
}
// Fresh telemetry marks the host as online and clears offline tracking.
m.HandleHostOnline(host)
m.mu.RLock()
alertsEnabled := m.config.Enabled
disableAllHosts := m.config.DisableAllHosts
thresholds := m.config.HostDefaults
override, hasOverride := m.config.Overrides[host.ID]
m.mu.RUnlock()
if !alertsEnabled || disableAllHosts {
return
}
if hasOverride {
thresholds = m.applyThresholdOverride(thresholds, override)
if thresholds.Disabled {
m.clearHostMetricAlerts(host.ID)
m.clearHostDiskAlerts(host.ID)
return
}
}
resourceID := hostResourceID(host.ID)
resourceName := hostDisplayName(host)
nodeName := strings.TrimSpace(host.Hostname)
instanceName := hostInstanceName(host)
baseMetadata := map[string]interface{}{
"resourceType": "Host",
"hostId": host.ID,
"hostname": host.Hostname,
"displayName": host.DisplayName,
"platform": host.Platform,
"osName": host.OSName,
"osVersion": host.OSVersion,
"agentVersion": host.AgentVersion,
"architecture": host.Architecture,
}
if len(host.Tags) > 0 {
baseMetadata["tags"] = append([]string(nil), host.Tags...)
}
if thresholds.CPU != nil {
cpuMetadata := cloneMetadata(baseMetadata)
cpuMetadata["metric"] = "cpu"
cpuMetadata["cpuUsagePercent"] = host.CPUUsage
if host.CPUCount > 0 {
cpuMetadata["cpuCount"] = host.CPUCount
}
m.checkMetric(resourceID, resourceName, nodeName, instanceName, "Host", "cpu", host.CPUUsage, thresholds.CPU, &metricOptions{Metadata: cpuMetadata})
} else {
m.clearHostMetricAlerts(host.ID, "cpu")
}
if thresholds.Memory != nil {
memMetadata := cloneMetadata(baseMetadata)
memMetadata["metric"] = "memory"
memMetadata["memoryUsagePercent"] = host.Memory.Usage
if host.Memory.Total > 0 {
memMetadata["memoryTotalBytes"] = host.Memory.Total
memMetadata["memoryUsedBytes"] = host.Memory.Used
memMetadata["memoryFreeBytes"] = host.Memory.Free
}
m.checkMetric(resourceID, resourceName, nodeName, instanceName, "Host", "memory", host.Memory.Usage, thresholds.Memory, &metricOptions{Metadata: memMetadata})
} else {
m.clearHostMetricAlerts(host.ID, "memory")
}
seenDisks := make(map[string]struct{}, len(host.Disks))
if thresholds.Disk != nil && thresholds.Disk.Trigger > 0 {
for _, disk := range host.Disks {
diskResourceID, diskName := hostDiskResourceID(host, disk)
seenDisks[diskResourceID] = struct{}{}
diskMetadata := cloneMetadata(baseMetadata)
diskMetadata["metric"] = "disk"
diskMetadata["mountpoint"] = disk.Mountpoint
diskMetadata["device"] = disk.Device
diskMetadata["diskType"] = disk.Type
diskMetadata["diskUsagePercent"] = disk.Usage
if disk.Total > 0 {
diskMetadata["diskTotalBytes"] = disk.Total
diskMetadata["diskUsedBytes"] = disk.Used
diskMetadata["diskFreeBytes"] = disk.Free
}
m.checkMetric(diskResourceID, diskName, nodeName, instanceName, "Host Disk", "disk", disk.Usage, thresholds.Disk, &metricOptions{Metadata: diskMetadata})
}
} else {
m.clearHostDiskAlerts(host.ID)
}
m.cleanupHostDiskAlerts(host, seenDisks)
}
// HandleHostOnline clears offline tracking and alerts for a host agent.
func (m *Manager) HandleHostOnline(host models.Host) {
if host.ID == "" {
return
}
alertID := fmt.Sprintf("host-offline-%s", host.ID)
resourceKey := hostResourceID(host.ID)
m.mu.Lock()
delete(m.offlineConfirmations, resourceKey)
_, exists := m.activeAlerts[alertID]
m.mu.Unlock()
if exists {
m.clearAlert(alertID)
}
}
// HandleHostRemoved clears alerts and tracking when a host agent is deleted.
func (m *Manager) HandleHostRemoved(host models.Host) {
if host.ID == "" {
return
}
m.HandleHostOnline(host)
m.clearHostMetricAlerts(host.ID)
m.clearHostDiskAlerts(host.ID)
}
// HandleHostOffline raises an alert when a host agent stops reporting.
func (m *Manager) HandleHostOffline(host models.Host) {
if host.ID == "" {
return
}
m.mu.RLock()
if !m.config.Enabled {
m.mu.RUnlock()
return
}
disableHostsOffline := m.config.DisableAllHostsOffline
m.mu.RUnlock()
alertID := fmt.Sprintf("host-offline-%s", host.ID)
resourceKey := hostResourceID(host.ID)
resourceName := hostDisplayName(host)
nodeName := strings.TrimSpace(host.Hostname)
instanceName := hostInstanceName(host)
if disableHostsOffline {
m.mu.Lock()
delete(m.offlineConfirmations, resourceKey)
m.mu.Unlock()
m.clearAlert(alertID)
return
}
var disableConnectivity bool
m.mu.RLock()
if override, exists := m.config.Overrides[host.ID]; exists {
disableConnectivity = override.DisableConnectivity || override.Disabled
}
m.mu.RUnlock()
if disableConnectivity {
m.clearAlert(alertID)
m.mu.Lock()
delete(m.offlineConfirmations, resourceKey)
m.mu.Unlock()
return
}
m.mu.Lock()
if alert, exists := m.activeAlerts[alertID]; exists && alert != nil {
alert.LastSeen = time.Now()
m.activeAlerts[alertID] = alert
m.mu.Unlock()
return
}
m.offlineConfirmations[resourceKey]++
const requiredConfirmations = 3
if confirmations := m.offlineConfirmations[resourceKey]; confirmations < requiredConfirmations {
m.mu.Unlock()
log.Debug().
Str("host", resourceName).
Str("hostID", host.ID).
Int("confirmations", confirmations).
Int("required", requiredConfirmations).
Msg("Host agent appears offline, awaiting confirmation")
return
}
alert := &Alert{
ID: alertID,
Type: "host-offline",
Level: AlertLevelCritical,
ResourceID: resourceKey,
ResourceName: resourceName,
Node: nodeName,
Instance: instanceName,
Message: fmt.Sprintf("Host '%s' is offline", resourceName),
Value: 0,
Threshold: 0,
StartTime: time.Now(),
LastSeen: time.Now(),
Metadata: map[string]interface{}{
"resourceType": "Host",
"hostId": host.ID,
"hostname": host.Hostname,
"displayName": host.DisplayName,
"platform": host.Platform,
"osName": host.OSName,
"osVersion": host.OSVersion,
},
}
m.preserveAlertState(alertID, alert)
m.activeAlerts[alertID] = alert
m.recentAlerts[alertID] = alert
m.historyManager.AddAlert(*alert)
m.dispatchAlert(alert, false)
m.mu.Unlock()
log.Error().
Str("host", resourceName).
Str("hostID", host.ID).
Str("hostname", host.Hostname).
Msg("CRITICAL: Host agent is offline")
}
func (m *Manager) clearHostMetricAlerts(hostID string, metrics ...string) {
if hostID == "" {
return
}
resourceID := hostResourceID(hostID)
if len(metrics) == 0 {
metrics = []string{"cpu", "memory"}
}
for _, metric := range metrics {
m.clearAlert(fmt.Sprintf("%s-%s", resourceID, metric))
}
}
func (m *Manager) clearHostDiskAlerts(hostID string) {
if hostID == "" {
return
}
prefix := fmt.Sprintf("%s/disk:", hostResourceID(hostID))
m.mu.Lock()
defer m.mu.Unlock()
for alertID, alert := range m.activeAlerts {
if alert == nil {
continue
}
if !strings.HasPrefix(alert.ResourceID, prefix) {
continue
}
m.clearAlertNoLock(alertID)
}
}
func (m *Manager) cleanupHostDiskAlerts(host models.Host, seen map[string]struct{}) {
if host.ID == "" {
return
}
prefix := fmt.Sprintf("%s/disk:", hostResourceID(host.ID))
m.mu.Lock()
defer m.mu.Unlock()
for alertID, alert := range m.activeAlerts {
if alert == nil {
continue
}
if !strings.HasPrefix(alert.ResourceID, prefix) {
continue
}
if _, exists := seen[alert.ResourceID]; exists {
continue
}
m.clearAlertNoLock(alertID)
}
}
// CheckPBS checks PBS instance metrics against thresholds
func (m *Manager) CheckPBS(pbs models.PBSInstance) {
m.mu.RLock()
@ -6728,7 +7129,7 @@ func (m *Manager) SaveActiveAlerts() error {
alerts = append(alerts, alert)
}
data, err := json.MarshalIndent(alerts, "", " ")
data, err := json.Marshal(alerts)
if err != nil {
return fmt.Errorf("failed to marshal active alerts: %w", err)
}

View file

@ -213,6 +213,167 @@ func TestHandleDockerHostRemovedClearsAlertsAndTracking(t *testing.T) {
}
}
func TestCheckHostGeneratesMetricAlerts(t *testing.T) {
m := NewManager()
m.ClearActiveAlerts()
m.mu.Lock()
m.config.TimeThreshold = 0
m.config.TimeThresholds = map[string]int{}
m.mu.Unlock()
host := models.Host{
ID: "host-1",
DisplayName: "Test Host",
Hostname: "host-1.example",
Platform: "linux",
OSName: "ubuntu",
CPUUsage: 95,
CPUCount: 8,
Memory: models.Memory{
Usage: 92,
Total: 16384,
Used: 15000,
Free: 1384,
},
Disks: []models.Disk{
{
Mountpoint: "/",
Usage: 93,
Total: 100,
Used: 93,
Free: 7,
},
},
Status: "online",
IntervalSeconds: 30,
LastSeen: time.Now(),
Tags: []string{"prod"},
}
m.CheckHost(host)
m.mu.RLock()
defer m.mu.RUnlock()
cpuAlertID := fmt.Sprintf("%s-cpu", hostResourceID(host.ID))
if _, exists := m.activeAlerts[cpuAlertID]; !exists {
t.Fatalf("expected CPU alert %q to be active", cpuAlertID)
}
memAlertID := fmt.Sprintf("%s-memory", hostResourceID(host.ID))
if _, exists := m.activeAlerts[memAlertID]; !exists {
t.Fatalf("expected memory alert %q to be active", memAlertID)
}
diskResourceID, _ := hostDiskResourceID(host, host.Disks[0])
diskAlertID := fmt.Sprintf("%s-disk", diskResourceID)
if _, exists := m.activeAlerts[diskAlertID]; !exists {
t.Fatalf("expected disk alert %q to be active", diskAlertID)
}
}
func TestHandleHostOfflineRequiresConfirmations(t *testing.T) {
m := NewManager()
m.ClearActiveAlerts()
host := models.Host{ID: "host-2", DisplayName: "Second Host", Hostname: "host-two"}
alertID := fmt.Sprintf("host-offline-%s", host.ID)
resourceKey := hostResourceID(host.ID)
m.HandleHostOffline(host)
m.mu.RLock()
if _, exists := m.activeAlerts[alertID]; exists {
t.Fatalf("expected no alert after first offline detection")
}
if count := m.offlineConfirmations[resourceKey]; count != 1 {
t.Fatalf("expected confirmation count to be 1, got %d", count)
}
m.mu.RUnlock()
m.HandleHostOffline(host)
m.mu.RLock()
if _, exists := m.activeAlerts[alertID]; exists {
t.Fatalf("expected no alert after second offline detection")
}
if count := m.offlineConfirmations[resourceKey]; count != 2 {
t.Fatalf("expected confirmation count to be 2, got %d", count)
}
m.mu.RUnlock()
m.HandleHostOffline(host)
m.mu.RLock()
if _, exists := m.activeAlerts[alertID]; !exists {
t.Fatalf("expected alert %q after third offline detection", alertID)
}
m.mu.RUnlock()
m.HandleHostOnline(host)
m.mu.RLock()
if _, exists := m.activeAlerts[alertID]; exists {
t.Fatalf("expected offline alert %q to be cleared after host online", alertID)
}
if _, exists := m.offlineConfirmations[resourceKey]; exists {
t.Fatalf("expected offline confirmations to be cleared when host online")
}
m.mu.RUnlock()
}
func TestCheckHostDisabledOverrideClearsAlerts(t *testing.T) {
m := NewManager()
m.ClearActiveAlerts()
m.mu.Lock()
m.config.TimeThreshold = 0
m.config.TimeThresholds = map[string]int{}
m.mu.Unlock()
host := models.Host{
ID: "host-3",
DisplayName: "Override Host",
Hostname: "override.example",
CPUUsage: 90,
Memory: models.Memory{
Usage: 91,
Total: 16000,
Used: 14560,
Free: 1440,
},
Disks: []models.Disk{
{Mountpoint: "/data", Usage: 92, Total: 200, Used: 184, Free: 16},
},
Status: "online",
IntervalSeconds: 30,
LastSeen: time.Now(),
}
m.CheckHost(host)
m.mu.RLock()
if len(m.activeAlerts) == 0 {
m.mu.RUnlock()
t.Fatalf("expected active alerts prior to disabling host overrides")
}
m.mu.RUnlock()
cfg := m.GetConfig()
cfg.Overrides = map[string]ThresholdConfig{
host.ID: {
Disabled: true,
},
}
m.UpdateConfig(cfg)
m.mu.Lock()
m.config.TimeThreshold = 0
m.config.TimeThresholds = map[string]int{}
m.mu.Unlock()
m.CheckHost(host)
m.mu.RLock()
defer m.mu.RUnlock()
if len(m.activeAlerts) != 0 {
t.Fatalf("expected all host alerts to be cleared after disabling override, got %d", len(m.activeAlerts))
}
}
func TestCheckSnapshotsForInstanceCreatesAndClearsAlerts(t *testing.T) {
m := NewManager()
m.ClearActiveAlerts()

View file

@ -174,7 +174,7 @@ func (hm *HistoryManager) saveHistory() error {
copy(snapshot, hm.history)
hm.mu.RUnlock()
data, err := json.MarshalIndent(snapshot, "", " ")
data, err := json.Marshal(snapshot)
if err != nil {
return fmt.Errorf("failed to marshal history: %w", err)

View file

@ -207,7 +207,7 @@ func (c *CSRFTokenStore) saveUnsafe() {
}
// Marshal tokens
jsonData, err := json.MarshalIndent(data, "", " ")
jsonData, err := json.Marshal(data)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal CSRF tokens")
return

View file

@ -0,0 +1,136 @@
package api
import (
"encoding/json"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
"unsafe"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/internal/monitoring"
)
func TestHandleLookupByIDSuccess(t *testing.T) {
t.Parallel()
hostID := "host-123"
tokenID := "token-abc"
lastSeen := time.Now().UTC()
handler := newHostAgentHandlerForTests(t, models.Host{
ID: hostID,
Hostname: "host.local",
DisplayName:"Host Local",
Status: "online",
TokenID: tokenID,
LastSeen: lastSeen,
})
req := httptest.NewRequest(http.MethodGet, "/api/agents/host/lookup?id="+hostID, nil)
attachAPITokenRecord(req, &config.APITokenRecord{ID: tokenID})
rec := httptest.NewRecorder()
handler.HandleLookup(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected status %d, got %d", http.StatusOK, rec.Code)
}
var resp struct {
Success bool `json:"success"`
Host struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Status string `json:"status"`
Connected bool `json:"connected"`
LastSeen time.Time `json:"lastSeen"`
} `json:"host"`
}
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if !resp.Success {
t.Fatalf("expected success=true")
}
if resp.Host.ID != hostID {
t.Fatalf("unexpected host id %q", resp.Host.ID)
}
if !resp.Host.Connected {
t.Fatalf("expected connected host")
}
if !resp.Host.LastSeen.Equal(lastSeen) {
t.Fatalf("expected lastSeen %v, got %v", lastSeen, resp.Host.LastSeen)
}
}
func TestHandleLookupForbiddenOnTokenMismatch(t *testing.T) {
t.Parallel()
hostID := "host-456"
handler := newHostAgentHandlerForTests(t, models.Host{
ID: hostID,
Hostname: "mismatch.local",
Status: "online",
TokenID: "token-correct",
})
req := httptest.NewRequest(http.MethodGet, "/api/agents/host/lookup?id="+hostID, nil)
attachAPITokenRecord(req, &config.APITokenRecord{ID: "token-wrong"})
rec := httptest.NewRecorder()
handler.HandleLookup(rec, req)
if rec.Code != http.StatusForbidden {
t.Fatalf("expected status %d, got %d", http.StatusForbidden, rec.Code)
}
}
func TestHandleLookupNotFound(t *testing.T) {
t.Parallel()
handler := newHostAgentHandlerForTests(t)
req := httptest.NewRequest(http.MethodGet, "/api/agents/host/lookup?id=missing", nil)
rec := httptest.NewRecorder()
handler.HandleLookup(rec, req)
if rec.Code != http.StatusNotFound {
t.Fatalf("expected status %d, got %d", http.StatusNotFound, rec.Code)
}
}
func newHostAgentHandlerForTests(t *testing.T, hosts ...models.Host) *HostAgentHandlers {
t.Helper()
monitor := &monitoring.Monitor{}
state := models.NewState()
for _, host := range hosts {
state.UpsertHost(host)
}
setUnexportedField(t, monitor, "state", state)
return &HostAgentHandlers{
monitor: monitor,
}
}
func setUnexportedField(t *testing.T, target interface{}, field string, value interface{}) {
t.Helper()
v := reflect.ValueOf(target).Elem()
f := v.FieldByName(field)
if !f.IsValid() {
t.Fatalf("field %q not found", field)
}
ptr := unsafe.Pointer(f.UnsafeAddr())
reflect.NewAt(f.Type(), ptr).Elem().Set(reflect.ValueOf(value))
}

View file

@ -218,7 +218,7 @@ func (r *RecoveryTokenStore) saveUnsafe() {
}
// Marshal tokens
data, err := json.MarshalIndent(r.tokens, "", " ")
data, err := json.Marshal(r.tokens)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal recovery tokens")
return

View file

@ -162,7 +162,7 @@ func (s *SessionStore) saveUnsafe() {
}
// Marshal sessions
data, err := json.MarshalIndent(s.sessions, "", " ")
data, err := json.Marshal(s.sessions)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal sessions")
return

View file

@ -28,6 +28,13 @@ import (
"github.com/rs/zerolog/log"
)
const (
DefaultGuestMetadataMinRefresh = 2 * time.Minute
DefaultGuestMetadataRefreshJitter = 45 * time.Second
DefaultGuestMetadataRetryBackoff = 30 * time.Second
DefaultGuestMetadataMaxConcurrent = 4
)
// IsPasswordHashed checks if a string looks like a bcrypt hash
func IsPasswordHashed(password string) bool {
// Bcrypt hashes start with $2a$, $2b$, or $2y$ and are 60 characters long
@ -76,19 +83,23 @@ type Config struct {
// Monitoring settings
// Note: PVE polling is hardcoded to 10s since Proxmox cluster/resources endpoint only updates every 10s
PBSPollingInterval time.Duration `envconfig:"PBS_POLLING_INTERVAL"` // PBS polling interval (60s default)
PMGPollingInterval time.Duration `envconfig:"PMG_POLLING_INTERVAL"` // PMG polling interval (60s default)
ConcurrentPolling bool `envconfig:"CONCURRENT_POLLING" default:"true"`
ConnectionTimeout time.Duration `envconfig:"CONNECTION_TIMEOUT" default:"45s"` // Increased for slow storage operations
MetricsRetentionDays int `envconfig:"METRICS_RETENTION_DAYS" default:"7"`
BackupPollingCycles int `envconfig:"BACKUP_POLLING_CYCLES" default:"10"`
BackupPollingInterval time.Duration `envconfig:"BACKUP_POLLING_INTERVAL"`
EnableBackupPolling bool `envconfig:"ENABLE_BACKUP_POLLING" default:"true"`
WebhookBatchDelay time.Duration `envconfig:"WEBHOOK_BATCH_DELAY" default:"10s"`
AdaptivePollingEnabled bool `envconfig:"ADAPTIVE_POLLING_ENABLED" default:"false"`
AdaptivePollingBaseInterval time.Duration `envconfig:"ADAPTIVE_POLLING_BASE_INTERVAL" default:"10s"`
AdaptivePollingMinInterval time.Duration `envconfig:"ADAPTIVE_POLLING_MIN_INTERVAL" default:"5s"`
AdaptivePollingMaxInterval time.Duration `envconfig:"ADAPTIVE_POLLING_MAX_INTERVAL" default:"5m"`
PBSPollingInterval time.Duration `envconfig:"PBS_POLLING_INTERVAL"` // PBS polling interval (60s default)
PMGPollingInterval time.Duration `envconfig:"PMG_POLLING_INTERVAL"` // PMG polling interval (60s default)
ConcurrentPolling bool `envconfig:"CONCURRENT_POLLING" default:"true"`
ConnectionTimeout time.Duration `envconfig:"CONNECTION_TIMEOUT" default:"45s"` // Increased for slow storage operations
MetricsRetentionDays int `envconfig:"METRICS_RETENTION_DAYS" default:"7"`
BackupPollingCycles int `envconfig:"BACKUP_POLLING_CYCLES" default:"10"`
BackupPollingInterval time.Duration `envconfig:"BACKUP_POLLING_INTERVAL"`
EnableBackupPolling bool `envconfig:"ENABLE_BACKUP_POLLING" default:"true"`
WebhookBatchDelay time.Duration `envconfig:"WEBHOOK_BATCH_DELAY" default:"10s"`
AdaptivePollingEnabled bool `envconfig:"ADAPTIVE_POLLING_ENABLED" default:"false"`
AdaptivePollingBaseInterval time.Duration `envconfig:"ADAPTIVE_POLLING_BASE_INTERVAL" default:"10s"`
AdaptivePollingMinInterval time.Duration `envconfig:"ADAPTIVE_POLLING_MIN_INTERVAL" default:"5s"`
AdaptivePollingMaxInterval time.Duration `envconfig:"ADAPTIVE_POLLING_MAX_INTERVAL" default:"5m"`
GuestMetadataMinRefreshInterval time.Duration `envconfig:"GUEST_METADATA_MIN_REFRESH_INTERVAL" default:"2m" json:"guestMetadataMinRefreshInterval"`
GuestMetadataRefreshJitter time.Duration `envconfig:"GUEST_METADATA_REFRESH_JITTER" default:"45s" json:"guestMetadataRefreshJitter"`
GuestMetadataRetryBackoff time.Duration `envconfig:"GUEST_METADATA_RETRY_BACKOFF" default:"30s" json:"guestMetadataRetryBackoff"`
GuestMetadataMaxConcurrent int `envconfig:"GUEST_METADATA_MAX_CONCURRENT" default:"4" json:"guestMetadataMaxConcurrent"`
// Logging settings
LogLevel string `envconfig:"LOG_LEVEL" default:"info"`
@ -485,36 +496,40 @@ func Load() (*Config, error) {
// Initialize config with defaults
cfg := &Config{
BackendHost: "0.0.0.0",
BackendPort: 3000,
FrontendHost: "0.0.0.0",
FrontendPort: 7655,
ConfigPath: dataDir,
DataPath: dataDir,
ConcurrentPolling: true,
ConnectionTimeout: 60 * time.Second,
MetricsRetentionDays: 7,
BackupPollingCycles: 10,
BackupPollingInterval: 0,
EnableBackupPolling: true,
WebhookBatchDelay: 10 * time.Second,
AdaptivePollingEnabled: false,
AdaptivePollingBaseInterval: 10 * time.Second,
AdaptivePollingMinInterval: 5 * time.Second,
AdaptivePollingMaxInterval: 5 * time.Minute,
LogLevel: "info",
LogFormat: "auto",
LogMaxSize: 100,
LogMaxAge: 30,
LogCompress: true,
AllowedOrigins: "", // Empty means no CORS headers (same-origin only)
IframeEmbeddingAllow: "SAMEORIGIN",
PBSPollingInterval: 60 * time.Second, // Default PBS polling (slower)
PMGPollingInterval: 60 * time.Second, // Default PMG polling (aggregated stats)
DiscoveryEnabled: false,
DiscoverySubnet: "auto",
EnvOverrides: make(map[string]bool),
OIDC: NewOIDCConfig(),
BackendHost: "0.0.0.0",
BackendPort: 3000,
FrontendHost: "0.0.0.0",
FrontendPort: 7655,
ConfigPath: dataDir,
DataPath: dataDir,
ConcurrentPolling: true,
ConnectionTimeout: 60 * time.Second,
MetricsRetentionDays: 7,
BackupPollingCycles: 10,
BackupPollingInterval: 0,
EnableBackupPolling: true,
WebhookBatchDelay: 10 * time.Second,
AdaptivePollingEnabled: false,
AdaptivePollingBaseInterval: 10 * time.Second,
AdaptivePollingMinInterval: 5 * time.Second,
AdaptivePollingMaxInterval: 5 * time.Minute,
GuestMetadataMinRefreshInterval: DefaultGuestMetadataMinRefresh,
GuestMetadataRefreshJitter: DefaultGuestMetadataRefreshJitter,
GuestMetadataRetryBackoff: DefaultGuestMetadataRetryBackoff,
GuestMetadataMaxConcurrent: DefaultGuestMetadataMaxConcurrent,
LogLevel: "info",
LogFormat: "auto",
LogMaxSize: 100,
LogMaxAge: 30,
LogCompress: true,
AllowedOrigins: "", // Empty means no CORS headers (same-origin only)
IframeEmbeddingAllow: "SAMEORIGIN",
PBSPollingInterval: 60 * time.Second, // Default PBS polling (slower)
PMGPollingInterval: 60 * time.Second, // Default PMG polling (aggregated stats)
DiscoveryEnabled: false,
DiscoverySubnet: "auto",
EnvOverrides: make(map[string]bool),
OIDC: NewOIDCConfig(),
}
cfg.Discovery = DefaultDiscoveryConfig()
@ -725,6 +740,62 @@ func Load() (*Config, error) {
}
}
if minRefresh := strings.TrimSpace(os.Getenv("GUEST_METADATA_MIN_REFRESH_INTERVAL")); minRefresh != "" {
if dur, err := time.ParseDuration(minRefresh); err == nil {
if dur <= 0 {
log.Warn().Str("value", minRefresh).Msg("Ignoring non-positive GUEST_METADATA_MIN_REFRESH_INTERVAL from environment")
} else {
cfg.GuestMetadataMinRefreshInterval = dur
cfg.EnvOverrides["GUEST_METADATA_MIN_REFRESH_INTERVAL"] = true
log.Info().Dur("interval", dur).Msg("Guest metadata min refresh interval overridden by environment")
}
} else {
log.Warn().Str("value", minRefresh).Msg("Invalid GUEST_METADATA_MIN_REFRESH_INTERVAL value, expected duration string")
}
}
if jitter := strings.TrimSpace(os.Getenv("GUEST_METADATA_REFRESH_JITTER")); jitter != "" {
if dur, err := time.ParseDuration(jitter); err == nil {
if dur < 0 {
log.Warn().Str("value", jitter).Msg("Ignoring negative GUEST_METADATA_REFRESH_JITTER from environment")
} else {
cfg.GuestMetadataRefreshJitter = dur
cfg.EnvOverrides["GUEST_METADATA_REFRESH_JITTER"] = true
log.Info().Dur("jitter", dur).Msg("Guest metadata refresh jitter overridden by environment")
}
} else {
log.Warn().Str("value", jitter).Msg("Invalid GUEST_METADATA_REFRESH_JITTER value, expected duration string")
}
}
if backoff := strings.TrimSpace(os.Getenv("GUEST_METADATA_RETRY_BACKOFF")); backoff != "" {
if dur, err := time.ParseDuration(backoff); err == nil {
if dur <= 0 {
log.Warn().Str("value", backoff).Msg("Ignoring non-positive GUEST_METADATA_RETRY_BACKOFF from environment")
} else {
cfg.GuestMetadataRetryBackoff = dur
cfg.EnvOverrides["GUEST_METADATA_RETRY_BACKOFF"] = true
log.Info().Dur("backoff", dur).Msg("Guest metadata retry backoff overridden by environment")
}
} else {
log.Warn().Str("value", backoff).Msg("Invalid GUEST_METADATA_RETRY_BACKOFF value, expected duration string")
}
}
if concurrent := strings.TrimSpace(os.Getenv("GUEST_METADATA_MAX_CONCURRENT")); concurrent != "" {
if val, err := strconv.Atoi(concurrent); err == nil {
if val <= 0 {
log.Warn().Str("value", concurrent).Msg("Ignoring non-positive GUEST_METADATA_MAX_CONCURRENT from environment")
} else {
cfg.GuestMetadataMaxConcurrent = val
cfg.EnvOverrides["GUEST_METADATA_MAX_CONCURRENT"] = true
log.Info().Int("maxConcurrent", val).Msg("Guest metadata max concurrency overridden by environment")
}
} else {
log.Warn().Str("value", concurrent).Msg("Invalid GUEST_METADATA_MAX_CONCURRENT value, expected integer")
}
}
// Support both FRONTEND_PORT (preferred) and PORT (legacy) env vars
if frontendPort := os.Getenv("FRONTEND_PORT"); frontendPort != "" {
if p, err := strconv.Atoi(frontendPort); err == nil {

View file

@ -150,7 +150,7 @@ func (s *GuestMetadataStore) save() error {
log.Debug().Str("path", filePath).Msg("Saving guest metadata to disk")
data, err := json.MarshalIndent(s.metadata, "", " ")
data, err := json.Marshal(s.metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata: %w", err)
}

View file

@ -156,7 +156,7 @@ func (c *ConfigPersistence) SaveAPITokens(tokens []APITokenRecord) error {
sanitized[i] = record
}
data, err := json.MarshalIndent(sanitized, "", " ")
data, err := json.Marshal(sanitized)
if err != nil {
return err
}
@ -198,6 +198,32 @@ func (c *ConfigPersistence) SaveAlertConfig(config alerts.AlertConfig) error {
if config.HysteresisMargin <= 0 {
config.HysteresisMargin = 5.0
}
if config.HostDefaults.CPU == nil || config.HostDefaults.CPU.Trigger <= 0 {
config.HostDefaults.CPU = &alerts.HysteresisThreshold{Trigger: 80, Clear: 75}
} else if config.HostDefaults.CPU.Clear <= 0 {
config.HostDefaults.CPU.Clear = config.HostDefaults.CPU.Trigger - 5
if config.HostDefaults.CPU.Clear <= 0 {
config.HostDefaults.CPU.Clear = 75
}
}
if config.HostDefaults.Memory == nil || config.HostDefaults.Memory.Trigger <= 0 {
config.HostDefaults.Memory = &alerts.HysteresisThreshold{Trigger: 85, Clear: 80}
} else if config.HostDefaults.Memory.Clear <= 0 {
config.HostDefaults.Memory.Clear = config.HostDefaults.Memory.Trigger - 5
if config.HostDefaults.Memory.Clear <= 0 {
config.HostDefaults.Memory.Clear = 80
}
}
if config.HostDefaults.Disk == nil || config.HostDefaults.Disk.Trigger <= 0 {
config.HostDefaults.Disk = &alerts.HysteresisThreshold{Trigger: 90, Clear: 85}
} else if config.HostDefaults.Disk.Clear <= 0 {
config.HostDefaults.Disk.Clear = config.HostDefaults.Disk.Trigger - 5
if config.HostDefaults.Disk.Clear <= 0 {
config.HostDefaults.Disk.Clear = 85
}
}
config.MetricTimeThresholds = alerts.NormalizeMetricTimeThresholds(config.MetricTimeThresholds)
if config.TimeThreshold <= 0 {
config.TimeThreshold = 5
@ -252,7 +278,7 @@ func (c *ConfigPersistence) SaveAlertConfig(config alerts.AlertConfig) error {
}
config.DockerIgnoredContainerPrefixes = alerts.NormalizeDockerIgnoredPrefixes(config.DockerIgnoredContainerPrefixes)
data, err := json.MarshalIndent(config, "", " ")
data, err := json.Marshal(config)
if err != nil {
return err
}
@ -291,6 +317,11 @@ func (c *ConfigPersistence) LoadAlertConfig() (*alerts.AlertConfig, error) {
Disk: &alerts.HysteresisThreshold{Trigger: 90, Clear: 85},
Temperature: &alerts.HysteresisThreshold{Trigger: 80, Clear: 75},
},
HostDefaults: alerts.ThresholdConfig{
CPU: &alerts.HysteresisThreshold{Trigger: 80, Clear: 75},
Memory: &alerts.HysteresisThreshold{Trigger: 85, Clear: 80},
Disk: &alerts.HysteresisThreshold{Trigger: 90, Clear: 85},
},
StorageDefault: alerts.HysteresisThreshold{Trigger: 85, Clear: 80},
TimeThreshold: 5,
TimeThresholds: map[string]int{
@ -346,6 +377,30 @@ func (c *ConfigPersistence) LoadAlertConfig() (*alerts.AlertConfig, error) {
if config.NodeDefaults.Temperature == nil || config.NodeDefaults.Temperature.Trigger <= 0 {
config.NodeDefaults.Temperature = &alerts.HysteresisThreshold{Trigger: 80, Clear: 75}
}
if config.HostDefaults.CPU == nil || config.HostDefaults.CPU.Trigger <= 0 {
config.HostDefaults.CPU = &alerts.HysteresisThreshold{Trigger: 80, Clear: 75}
} else if config.HostDefaults.CPU.Clear <= 0 {
config.HostDefaults.CPU.Clear = config.HostDefaults.CPU.Trigger - 5
if config.HostDefaults.CPU.Clear <= 0 {
config.HostDefaults.CPU.Clear = 75
}
}
if config.HostDefaults.Memory == nil || config.HostDefaults.Memory.Trigger <= 0 {
config.HostDefaults.Memory = &alerts.HysteresisThreshold{Trigger: 85, Clear: 80}
} else if config.HostDefaults.Memory.Clear <= 0 {
config.HostDefaults.Memory.Clear = config.HostDefaults.Memory.Trigger - 5
if config.HostDefaults.Memory.Clear <= 0 {
config.HostDefaults.Memory.Clear = 80
}
}
if config.HostDefaults.Disk == nil || config.HostDefaults.Disk.Trigger <= 0 {
config.HostDefaults.Disk = &alerts.HysteresisThreshold{Trigger: 90, Clear: 85}
} else if config.HostDefaults.Disk.Clear <= 0 {
config.HostDefaults.Disk.Clear = config.HostDefaults.Disk.Trigger - 5
if config.HostDefaults.Disk.Clear <= 0 {
config.HostDefaults.Disk.Clear = 85
}
}
if config.TimeThreshold <= 0 {
config.TimeThreshold = 5
}
@ -425,7 +480,7 @@ func (c *ConfigPersistence) SaveEmailConfig(config notifications.EmailConfig) er
defer c.mu.Unlock()
// Marshal to JSON first
data, err := json.MarshalIndent(config, "", " ")
data, err := json.Marshal(config)
if err != nil {
return err
}
@ -502,7 +557,7 @@ func (c *ConfigPersistence) SaveAppriseConfig(config notifications.AppriseConfig
config = notifications.NormalizeAppriseConfig(config)
data, err := json.MarshalIndent(config, "", " ")
data, err := json.Marshal(config)
if err != nil {
return err
}
@ -578,7 +633,7 @@ func (c *ConfigPersistence) SaveWebhooks(webhooks []notifications.WebhookConfig)
c.mu.Lock()
defer c.mu.Unlock()
data, err := json.MarshalIndent(webhooks, "", " ")
data, err := json.Marshal(webhooks)
if err != nil {
return err
}
@ -811,7 +866,7 @@ func (c *ConfigPersistence) saveNodesConfig(pveInstances []PVEInstance, pbsInsta
PMGInstances: pmgInstances,
}
data, err := json.MarshalIndent(config, "", " ")
data, err := json.Marshal(config)
if err != nil {
return err
}
@ -1123,7 +1178,7 @@ func (c *ConfigPersistence) SaveSystemSettings(settings SystemSettings) error {
c.mu.Lock()
defer c.mu.Unlock()
data, err := json.MarshalIndent(settings, "", " ")
data, err := json.Marshal(settings)
if err != nil {
return err
}
@ -1159,7 +1214,7 @@ func (c *ConfigPersistence) SaveOIDCConfig(settings OIDCConfig) error {
// Do not persist runtime-only flags.
settings.EnvOverrides = nil
data, err := json.MarshalIndent(settings, "", " ")
data, err := json.Marshal(settings)
if err != nil {
return err
}

View file

@ -2,43 +2,53 @@ package monitoring
import (
stdErrors "errors"
"fmt"
"strings"
"sync"
"time"
internalerrors "github.com/rcourtman/pulse-go-rewrite/internal/errors"
"github.com/prometheus/client_golang/prometheus"
internalerrors "github.com/rcourtman/pulse-go-rewrite/internal/errors"
)
// PollMetrics manages Prometheus instrumentation for polling activity.
type PollMetrics struct {
pollDuration *prometheus.HistogramVec
pollResults *prometheus.CounterVec
pollErrors *prometheus.CounterVec
lastSuccess *prometheus.GaugeVec
staleness *prometheus.GaugeVec
queueDepth prometheus.Gauge
inflight *prometheus.GaugeVec
nodePollDuration *prometheus.HistogramVec
nodePollResults *prometheus.CounterVec
nodePollErrors *prometheus.CounterVec
nodeLastSuccess *prometheus.GaugeVec
nodeStaleness *prometheus.GaugeVec
schedulerQueueReady prometheus.Gauge
schedulerQueueDepthByType *prometheus.GaugeVec
schedulerQueueWait *prometheus.HistogramVec
schedulerDeadLetterDepth *prometheus.GaugeVec
schedulerBreakerState *prometheus.GaugeVec
pollDuration *prometheus.HistogramVec
pollResults *prometheus.CounterVec
pollErrors *prometheus.CounterVec
lastSuccess *prometheus.GaugeVec
staleness *prometheus.GaugeVec
queueDepth prometheus.Gauge
inflight *prometheus.GaugeVec
nodePollDuration *prometheus.HistogramVec
nodePollResults *prometheus.CounterVec
nodePollErrors *prometheus.CounterVec
nodeLastSuccess *prometheus.GaugeVec
nodeStaleness *prometheus.GaugeVec
schedulerQueueReady prometheus.Gauge
schedulerQueueDepthByType *prometheus.GaugeVec
schedulerQueueWait *prometheus.HistogramVec
schedulerDeadLetterDepth *prometheus.GaugeVec
schedulerBreakerState *prometheus.GaugeVec
schedulerBreakerFailureCount *prometheus.GaugeVec
schedulerBreakerRetrySeconds *prometheus.GaugeVec
mu sync.RWMutex
lastSuccessByKey map[string]time.Time
nodeLastSuccessByKey map[string]time.Time
lastQueueTypeKeys map[string]struct{}
lastDLQKeys map[string]struct{}
pending int
mu sync.RWMutex
lastSuccessByKey map[metricKey]time.Time
nodeLastSuccessByKey map[nodeMetricKey]time.Time
lastQueueTypeKeys map[string]struct{}
lastDLQKeys map[string]struct{}
pending int
}
type metricKey struct {
instanceType string
instance string
}
type nodeMetricKey struct {
instanceType string
instance string
node string
}
var (
@ -227,10 +237,10 @@ func newPollMetrics() *PollMetrics {
},
[]string{"instance_type", "instance"},
),
lastSuccessByKey: make(map[string]time.Time),
nodeLastSuccessByKey: make(map[string]time.Time),
lastQueueTypeKeys: make(map[string]struct{}),
lastDLQKeys: make(map[string]struct{}),
lastSuccessByKey: make(map[metricKey]time.Time),
nodeLastSuccessByKey: make(map[nodeMetricKey]time.Time),
lastQueueTypeKeys: make(map[string]struct{}),
lastDLQKeys: make(map[string]struct{}),
}
prometheus.MustRegister(
@ -260,13 +270,13 @@ func newPollMetrics() *PollMetrics {
// NodePollResult captures timing and outcome for a specific node within a poll cycle.
type NodePollResult struct {
InstanceName string
InstanceType string
NodeName string
Success bool
Error error
StartTime time.Time
EndTime time.Time
InstanceName string
InstanceType string
NodeName string
Success bool
Error error
StartTime time.Time
EndTime time.Time
}
// RecordNodeResult records metrics for an individual node poll.
@ -275,173 +285,136 @@ func (pm *PollMetrics) RecordNodeResult(result NodePollResult) {
return
}
nodeLabel := strings.TrimSpace(result.NodeName)
if nodeLabel == "" {
nodeLabel = "unknown-node"
}
instType, inst := sanitizeInstanceLabels(result.InstanceType, result.InstanceName)
nodeLabel := normalizeNodeLabel(result.NodeName)
labels := prometheus.Labels{
"instance_type": result.InstanceType,
"instance": result.InstanceName,
"node": nodeLabel,
}
duration := result.EndTime.Sub(result.StartTime).Seconds()
if duration < 0 {
duration = 0
}
pm.nodePollDuration.WithLabelValues(instType, inst, nodeLabel).Observe(duration)
duration := result.EndTime.Sub(result.StartTime).Seconds()
if duration < 0 {
duration = 0
}
pm.nodePollDuration.With(labels).Observe(duration)
resultValue := "success"
if !result.Success {
resultValue = "error"
}
pm.nodePollResults.WithLabelValues(instType, inst, nodeLabel, resultValue).Inc()
resultValue := "success"
if !result.Success {
resultValue = "error"
}
pm.nodePollResults.With(prometheus.Labels{
"instance_type": result.InstanceType,
"instance": result.InstanceName,
"node": nodeLabel,
"result": resultValue,
}).Inc()
if result.Success {
pm.nodeLastSuccess.WithLabelValues(instType, inst, nodeLabel).Set(float64(result.EndTime.Unix()))
pm.storeNodeLastSuccess(instType, inst, nodeLabel, result.EndTime)
pm.updateNodeStaleness(instType, inst, nodeLabel, 0)
return
}
if result.Success {
pm.nodeLastSuccess.With(labels).Set(float64(result.EndTime.Unix()))
pm.storeNodeLastSuccess(result.InstanceType, result.InstanceName, nodeLabel, result.EndTime)
pm.updateNodeStaleness(result.InstanceType, result.InstanceName, nodeLabel, 0)
return
}
errType := pm.classifyError(result.Error)
pm.nodePollErrors.WithLabelValues(instType, inst, nodeLabel, errType).Inc()
errType := pm.classifyError(result.Error)
pm.nodePollErrors.With(prometheus.Labels{
"instance_type": result.InstanceType,
"instance": result.InstanceName,
"node": nodeLabel,
"error_type": errType,
}).Inc()
if last, ok := pm.lastNodeSuccessFor(result.InstanceType, result.InstanceName, nodeLabel); ok && !last.IsZero() {
staleness := result.EndTime.Sub(last).Seconds()
if staleness < 0 {
staleness = 0
}
pm.updateNodeStaleness(result.InstanceType, result.InstanceName, nodeLabel, staleness)
} else {
pm.updateNodeStaleness(result.InstanceType, result.InstanceName, nodeLabel, -1)
if last, ok := pm.lastNodeSuccessFor(instType, inst, nodeLabel); ok && !last.IsZero() {
staleness := result.EndTime.Sub(last).Seconds()
if staleness < 0 {
staleness = 0
}
pm.updateNodeStaleness(instType, inst, nodeLabel, staleness)
} else {
pm.updateNodeStaleness(instType, inst, nodeLabel, -1)
}
}
// RecordQueueWait observes the time a task spent waiting in the scheduler queue.
func (pm *PollMetrics) RecordQueueWait(instanceType string, wait time.Duration) {
if pm == nil {
return
}
if wait < 0 {
wait = 0
}
instanceType = strings.TrimSpace(instanceType)
if instanceType == "" {
instanceType = "unknown"
}
pm.schedulerQueueWait.WithLabelValues(instanceType).Observe(wait.Seconds())
if pm == nil {
return
}
if wait < 0 {
wait = 0
}
label := normalizeLabel(instanceType)
pm.schedulerQueueWait.WithLabelValues(label).Observe(wait.Seconds())
}
// UpdateQueueSnapshot updates scheduler queue depth metrics.
func (pm *PollMetrics) UpdateQueueSnapshot(snapshot QueueSnapshot) {
if pm == nil {
return
}
if pm == nil {
return
}
pm.schedulerQueueReady.Set(float64(snapshot.DueWithinSeconds))
pm.schedulerQueueReady.Set(float64(snapshot.DueWithinSeconds))
current := make(map[string]struct{}, len(snapshot.PerType))
for instanceType, depth := range snapshot.PerType {
key := strings.TrimSpace(instanceType)
if key == "" {
key = "unknown"
}
pm.schedulerQueueDepthByType.WithLabelValues(key).Set(float64(depth))
current[key] = struct{}{}
}
current := make(map[string]struct{}, len(snapshot.PerType))
for instanceType, depth := range snapshot.PerType {
key := normalizeLabel(instanceType)
pm.schedulerQueueDepthByType.WithLabelValues(key).Set(float64(depth))
current[key] = struct{}{}
}
pm.mu.Lock()
for key := range pm.lastQueueTypeKeys {
if _, ok := current[key]; !ok {
pm.schedulerQueueDepthByType.WithLabelValues(key).Set(0)
}
}
pm.lastQueueTypeKeys = current
pm.mu.Unlock()
pm.mu.Lock()
for key := range pm.lastQueueTypeKeys {
if _, ok := current[key]; !ok {
pm.schedulerQueueDepthByType.WithLabelValues(key).Set(0)
}
}
pm.lastQueueTypeKeys = current
pm.mu.Unlock()
}
// UpdateDeadLetterCounts refreshes dead-letter queue gauges based on the provided tasks.
func (pm *PollMetrics) UpdateDeadLetterCounts(tasks []DeadLetterTask) {
if pm == nil {
return
}
if pm == nil {
return
}
current := make(map[string]float64)
for _, task := range tasks {
instType := strings.TrimSpace(task.Type)
if instType == "" {
instType = "unknown"
}
inst := strings.TrimSpace(task.Instance)
if inst == "" {
inst = "unknown"
}
key := instType + "::" + inst
current[key] = current[key] + 1
}
current := make(map[string]float64)
for _, task := range tasks {
instType := normalizeLabel(task.Type)
inst := normalizeLabel(task.Instance)
key := instType + "::" + inst
current[key] = current[key] + 1
}
pm.mu.Lock()
prev := pm.lastDLQKeys
pm.lastDLQKeys = make(map[string]struct{}, len(current))
pm.mu.Unlock()
pm.mu.Lock()
prev := pm.lastDLQKeys
pm.lastDLQKeys = make(map[string]struct{}, len(current))
pm.mu.Unlock()
for key, count := range current {
instType, inst := splitInstanceKey(key)
pm.schedulerDeadLetterDepth.WithLabelValues(instType, inst).Set(count)
}
for key, count := range current {
instType, inst := splitInstanceKey(key)
pm.schedulerDeadLetterDepth.WithLabelValues(instType, inst).Set(count)
}
pm.mu.Lock()
for key := range current {
pm.lastDLQKeys[key] = struct{}{}
}
for key := range prev {
if _, ok := current[key]; !ok {
instType, inst := splitInstanceKey(key)
pm.schedulerDeadLetterDepth.WithLabelValues(instType, inst).Set(0)
}
}
pm.mu.Unlock()
pm.mu.Lock()
for key := range current {
pm.lastDLQKeys[key] = struct{}{}
}
for key := range prev {
if _, ok := current[key]; !ok {
instType, inst := splitInstanceKey(key)
pm.schedulerDeadLetterDepth.WithLabelValues(instType, inst).Set(0)
}
}
pm.mu.Unlock()
}
// SetBreakerState updates circuit breaker metrics for a specific instance.
func (pm *PollMetrics) SetBreakerState(instanceType, instance, state string, failures int, retryAt time.Time) {
if pm == nil {
return
}
if pm == nil {
return
}
instType := strings.TrimSpace(instanceType)
if instType == "" {
instType = "unknown"
}
inst := strings.TrimSpace(instance)
if inst == "" {
inst = "unknown"
}
instType, inst := sanitizeInstanceLabels(instanceType, instance)
value := breakerStateToValue(state)
pm.schedulerBreakerState.WithLabelValues(instType, inst).Set(value)
pm.schedulerBreakerFailureCount.WithLabelValues(instType, inst).Set(float64(failures))
retrySeconds := 0.0
if !retryAt.IsZero() {
retrySeconds = retryAt.Sub(time.Now()).Seconds()
if retrySeconds < 0 {
retrySeconds = 0
}
}
pm.schedulerBreakerRetrySeconds.WithLabelValues(instType, inst).Set(retrySeconds)
retrySeconds := 0.0
if !retryAt.IsZero() {
retrySeconds = retryAt.Sub(time.Now()).Seconds()
if retrySeconds < 0 {
retrySeconds = 0
}
}
pm.schedulerBreakerRetrySeconds.WithLabelValues(instType, inst).Set(retrySeconds)
}
// RecordResult records metrics for a polling result.
@ -450,47 +423,36 @@ func (pm *PollMetrics) RecordResult(result PollResult) {
return
}
labels := prometheus.Labels{
"instance_type": result.InstanceType,
"instance": result.InstanceName,
}
instType, inst := sanitizeInstanceLabels(result.InstanceType, result.InstanceName)
duration := result.EndTime.Sub(result.StartTime).Seconds()
if duration < 0 {
duration = 0
}
pm.pollDuration.With(labels).Observe(duration)
pm.pollDuration.WithLabelValues(instType, inst).Observe(duration)
resultValue := "success"
if !result.Success {
resultValue = "error"
}
pm.pollResults.With(prometheus.Labels{
"instance_type": result.InstanceType,
"instance": result.InstanceName,
"result": resultValue,
}).Inc()
pm.pollResults.WithLabelValues(instType, inst, resultValue).Inc()
if result.Success {
pm.lastSuccess.With(labels).Set(float64(result.EndTime.Unix()))
pm.storeLastSuccess(result.InstanceType, result.InstanceName, result.EndTime)
pm.updateStaleness(result.InstanceType, result.InstanceName, 0)
pm.lastSuccess.WithLabelValues(instType, inst).Set(float64(result.EndTime.Unix()))
pm.storeLastSuccess(instType, inst, result.EndTime)
pm.updateStaleness(instType, inst, 0)
} else {
errType := pm.classifyError(result.Error)
pm.pollErrors.With(prometheus.Labels{
"instance_type": result.InstanceType,
"instance": result.InstanceName,
"error_type": errType,
}).Inc()
pm.pollErrors.WithLabelValues(instType, inst, errType).Inc()
if last, ok := pm.lastSuccessFor(result.InstanceType, result.InstanceName); ok && !last.IsZero() {
if last, ok := pm.lastSuccessFor(instType, inst); ok && !last.IsZero() {
staleness := result.EndTime.Sub(last).Seconds()
if staleness < 0 {
staleness = 0
}
pm.updateStaleness(result.InstanceType, result.InstanceName, staleness)
pm.updateStaleness(instType, inst, staleness)
} else {
pm.updateStaleness(result.InstanceType, result.InstanceName, -1)
pm.updateStaleness(instType, inst, -1)
}
}
@ -555,75 +517,101 @@ func (pm *PollMetrics) decrementPending() {
}
func (pm *PollMetrics) storeLastSuccess(instanceType, instance string, ts time.Time) {
pm.mu.Lock()
pm.lastSuccessByKey[pm.key(instanceType, instance)] = ts
pm.mu.Unlock()
pm.mu.Lock()
pm.lastSuccessByKey[makeMetricKey(instanceType, instance)] = ts
pm.mu.Unlock()
}
func (pm *PollMetrics) lastSuccessFor(instanceType, instance string) (time.Time, bool) {
pm.mu.RLock()
ts, ok := pm.lastSuccessByKey[pm.key(instanceType, instance)]
ts, ok := pm.lastSuccessByKey[makeMetricKey(instanceType, instance)]
pm.mu.RUnlock()
return ts, ok
}
func (pm *PollMetrics) updateStaleness(instanceType, instance string, value float64) {
pm.staleness.WithLabelValues(instanceType, instance).Set(value)
}
func (pm *PollMetrics) key(instanceType, instance string) string {
return fmt.Sprintf("%s::%s", instanceType, instance)
instType, inst := sanitizeInstanceLabels(instanceType, instance)
pm.staleness.WithLabelValues(instType, inst).Set(value)
}
func (pm *PollMetrics) storeNodeLastSuccess(instanceType, instance, node string, ts time.Time) {
pm.mu.Lock()
pm.nodeLastSuccessByKey[pm.nodeKey(instanceType, instance, node)] = ts
pm.nodeLastSuccessByKey[makeNodeMetricKey(instanceType, instance, node)] = ts
pm.mu.Unlock()
}
func (pm *PollMetrics) lastNodeSuccessFor(instanceType, instance, node string) (time.Time, bool) {
pm.mu.RLock()
ts, ok := pm.nodeLastSuccessByKey[pm.nodeKey(instanceType, instance, node)]
ts, ok := pm.nodeLastSuccessByKey[makeNodeMetricKey(instanceType, instance, node)]
pm.mu.RUnlock()
return ts, ok
}
func (pm *PollMetrics) updateNodeStaleness(instanceType, instance, node string, value float64) {
pm.nodeStaleness.WithLabelValues(instanceType, instance, node).Set(value)
}
func (pm *PollMetrics) nodeKey(instanceType, instance, node string) string {
return fmt.Sprintf("%s::%s::%s", instanceType, instance, node)
instType, inst := sanitizeInstanceLabels(instanceType, instance)
nodeLabel := normalizeLabel(node)
pm.nodeStaleness.WithLabelValues(instType, inst, nodeLabel).Set(value)
}
func splitInstanceKey(key string) (string, string) {
parts := strings.SplitN(key, "::", 2)
if len(parts) == 2 {
if parts[0] == "" {
parts[0] = "unknown"
}
if parts[1] == "" {
parts[1] = "unknown"
}
return parts[0], parts[1]
}
if key == "" {
return "unknown", "unknown"
}
return "unknown", key
parts := strings.SplitN(key, "::", 2)
if len(parts) == 2 {
return normalizeLabel(parts[0]), normalizeLabel(parts[1])
}
if key == "" {
return "unknown", "unknown"
}
return "unknown", normalizeLabel(key)
}
func breakerStateToValue(state string) float64 {
switch strings.ToLower(state) {
case "closed":
return 0
case "half_open", "half-open":
return 1
case "open":
return 2
default:
return -1
}
switch strings.ToLower(state) {
case "closed":
return 0
case "half_open", "half-open":
return 1
case "open":
return 2
default:
return -1
}
}
func sanitizeInstanceLabels(instanceType, instance string) (string, string) {
return normalizeLabel(instanceType), normalizeLabel(instance)
}
func makeMetricKey(instanceType, instance string) metricKey {
instType, inst := sanitizeInstanceLabels(instanceType, instance)
return metricKey{
instanceType: instType,
instance: inst,
}
}
func makeNodeMetricKey(instanceType, instance, node string) nodeMetricKey {
instType, inst := sanitizeInstanceLabels(instanceType, instance)
return nodeMetricKey{
instanceType: instType,
instance: inst,
node: normalizeLabel(node),
}
}
func normalizeLabel(value string) string {
v := strings.TrimSpace(value)
if v == "" {
return "unknown"
}
return v
}
func normalizeNodeLabel(value string) string {
label := normalizeLabel(value)
if label == "unknown" {
return "unknown-node"
}
return label
}
func (pm *PollMetrics) classifyError(err error) string {

View file

@ -421,60 +421,67 @@ func timePtr(t time.Time) *time.Time {
// Monitor handles all monitoring operations
type Monitor struct {
config *config.Config
state *models.State
pveClients map[string]PVEClientInterface
pbsClients map[string]*pbs.Client
pmgClients map[string]*pmg.Client
pollMetrics *PollMetrics
scheduler *AdaptiveScheduler
stalenessTracker *StalenessTracker
taskQueue *TaskQueue
circuitBreakers map[string]*circuitBreaker
deadLetterQueue *TaskQueue
failureCounts map[string]int
lastOutcome map[string]taskOutcome
backoffCfg backoffConfig
rng *rand.Rand
maxRetryAttempts int
tempCollector *TemperatureCollector // SSH-based temperature collector
mu sync.RWMutex
startTime time.Time
rateTracker *RateTracker
metricsHistory *MetricsHistory
alertManager *alerts.Manager
notificationMgr *notifications.NotificationManager
configPersist *config.ConfigPersistence
discoveryService *discovery.Service // Background discovery service
activePollCount int32 // Number of active polling operations
pollCounter int64 // Counter for polling cycles
authFailures map[string]int // Track consecutive auth failures per node
lastAuthAttempt map[string]time.Time // Track last auth attempt time
lastClusterCheck map[string]time.Time // Track last cluster check for standalone nodes
lastPhysicalDiskPoll map[string]time.Time // Track last physical disk poll time per instance
lastPVEBackupPoll map[string]time.Time // Track last PVE backup poll per instance
lastPBSBackupPoll map[string]time.Time // Track last PBS backup poll per instance
persistence *config.ConfigPersistence // Add persistence for saving updated configs
pbsBackupPollers map[string]bool // Track PBS backup polling goroutines per instance
runtimeCtx context.Context // Context used while monitor is running
wsHub *websocket.Hub // Hub used for broadcasting state
diagMu sync.RWMutex // Protects diagnostic snapshot maps
nodeSnapshots map[string]NodeMemorySnapshot
guestSnapshots map[string]GuestMemorySnapshot
rrdCacheMu sync.RWMutex // Protects RRD memavailable cache
nodeRRDMemCache map[string]rrdMemCacheEntry
removedDockerHosts map[string]time.Time // Track deliberately removed Docker hosts (ID -> removal time)
dockerCommands map[string]*dockerHostCommand
dockerCommandIndex map[string]string
guestMetadataMu sync.RWMutex
guestMetadataCache map[string]guestMetadataCacheEntry
executor PollExecutor
breakerBaseRetry time.Duration
breakerMaxDelay time.Duration
breakerHalfOpenWindow time.Duration
instanceInfoCache map[string]*instanceInfo
pollStatusMap map[string]*pollStatus
dlqInsightMap map[string]*dlqInsight
config *config.Config
state *models.State
pveClients map[string]PVEClientInterface
pbsClients map[string]*pbs.Client
pmgClients map[string]*pmg.Client
pollMetrics *PollMetrics
scheduler *AdaptiveScheduler
stalenessTracker *StalenessTracker
taskQueue *TaskQueue
circuitBreakers map[string]*circuitBreaker
deadLetterQueue *TaskQueue
failureCounts map[string]int
lastOutcome map[string]taskOutcome
backoffCfg backoffConfig
rng *rand.Rand
maxRetryAttempts int
tempCollector *TemperatureCollector // SSH-based temperature collector
mu sync.RWMutex
startTime time.Time
rateTracker *RateTracker
metricsHistory *MetricsHistory
alertManager *alerts.Manager
notificationMgr *notifications.NotificationManager
configPersist *config.ConfigPersistence
discoveryService *discovery.Service // Background discovery service
activePollCount int32 // Number of active polling operations
pollCounter int64 // Counter for polling cycles
authFailures map[string]int // Track consecutive auth failures per node
lastAuthAttempt map[string]time.Time // Track last auth attempt time
lastClusterCheck map[string]time.Time // Track last cluster check for standalone nodes
lastPhysicalDiskPoll map[string]time.Time // Track last physical disk poll time per instance
lastPVEBackupPoll map[string]time.Time // Track last PVE backup poll per instance
lastPBSBackupPoll map[string]time.Time // Track last PBS backup poll per instance
persistence *config.ConfigPersistence // Add persistence for saving updated configs
pbsBackupPollers map[string]bool // Track PBS backup polling goroutines per instance
runtimeCtx context.Context // Context used while monitor is running
wsHub *websocket.Hub // Hub used for broadcasting state
diagMu sync.RWMutex // Protects diagnostic snapshot maps
nodeSnapshots map[string]NodeMemorySnapshot
guestSnapshots map[string]GuestMemorySnapshot
rrdCacheMu sync.RWMutex // Protects RRD memavailable cache
nodeRRDMemCache map[string]rrdMemCacheEntry
removedDockerHosts map[string]time.Time // Track deliberately removed Docker hosts (ID -> removal time)
dockerCommands map[string]*dockerHostCommand
dockerCommandIndex map[string]string
guestMetadataMu sync.RWMutex
guestMetadataCache map[string]guestMetadataCacheEntry
guestMetadataLimiterMu sync.Mutex
guestMetadataLimiter map[string]time.Time
guestMetadataSlots chan struct{}
guestMetadataMinRefresh time.Duration
guestMetadataRefreshJitter time.Duration
guestMetadataRetryBackoff time.Duration
guestMetadataHoldDuration time.Duration
executor PollExecutor
breakerBaseRetry time.Duration
breakerMaxDelay time.Duration
breakerHalfOpenWindow time.Duration
instanceInfoCache map[string]*instanceInfo
pollStatusMap map[string]*pollStatus
dlqInsightMap map[string]*dlqInsight
}
type rrdMemCacheEntry struct {
@ -565,9 +572,13 @@ const (
dockerOfflineGraceMultiplier = 4
dockerMinimumHealthWindow = 30 * time.Second
dockerMaximumHealthWindow = 10 * time.Minute
hostOfflineGraceMultiplier = 4
hostMinimumHealthWindow = 30 * time.Second
hostMaximumHealthWindow = 10 * time.Minute
nodeRRDCacheTTL = 30 * time.Second
nodeRRDRequestTimeout = 2 * time.Second
guestMetadataCacheTTL = 5 * time.Minute
defaultGuestMetadataHold = 15 * time.Second
)
type guestMetadataCacheEntry struct {
@ -728,6 +739,10 @@ func (m *Monitor) RemoveHostAgent(hostID string) (models.Host, error) {
Bool("removed", removed).
Msg("Host agent removed from monitoring")
if m.alertManager != nil {
m.alertManager.HandleHostRemoved(host)
}
return host, nil
}
@ -1471,6 +1486,10 @@ func (m *Monitor) ApplyHostReport(report agentshost.Report, tokenRecord *config.
m.state.UpsertHost(host)
m.state.SetConnectionHealth(hostConnectionPrefix+host.ID, true)
if m.alertManager != nil {
m.alertManager.CheckHost(host)
}
return host, nil
}
@ -1530,6 +1549,43 @@ func (m *Monitor) evaluateDockerAgents(now time.Time) {
}
}
// evaluateHostAgents updates health for host agents based on last report time.
func (m *Monitor) evaluateHostAgents(now time.Time) {
hosts := m.state.GetHosts()
for _, host := range hosts {
interval := host.IntervalSeconds
if interval <= 0 {
interval = int(hostMinimumHealthWindow / time.Second)
}
window := time.Duration(interval) * time.Second * hostOfflineGraceMultiplier
if window < hostMinimumHealthWindow {
window = hostMinimumHealthWindow
} else if window > hostMaximumHealthWindow {
window = hostMaximumHealthWindow
}
healthy := !host.LastSeen.IsZero() && now.Sub(host.LastSeen) <= window
key := hostConnectionPrefix + host.ID
m.state.SetConnectionHealth(key, healthy)
hostCopy := host
if healthy {
hostCopy.Status = "online"
m.state.SetHostStatus(host.ID, "online")
if m.alertManager != nil {
m.alertManager.HandleHostOnline(hostCopy)
}
} else {
hostCopy.Status = "offline"
m.state.SetHostStatus(host.ID, "offline")
if m.alertManager != nil {
m.alertManager.HandleHostOffline(hostCopy)
}
}
}
}
// sortContent sorts comma-separated content values for consistent display
func sortContent(content string) string {
if content == "" {
@ -1540,6 +1596,76 @@ func sortContent(content string) string {
return strings.Join(parts, ",")
}
func (m *Monitor) tryReserveGuestMetadataFetch(key string, now time.Time) bool {
if m == nil {
return false
}
m.guestMetadataLimiterMu.Lock()
defer m.guestMetadataLimiterMu.Unlock()
if next, ok := m.guestMetadataLimiter[key]; ok && now.Before(next) {
return false
}
hold := m.guestMetadataHoldDuration
if hold <= 0 {
hold = defaultGuestMetadataHold
}
m.guestMetadataLimiter[key] = now.Add(hold)
return true
}
func (m *Monitor) scheduleNextGuestMetadataFetch(key string, now time.Time) {
if m == nil {
return
}
interval := m.guestMetadataMinRefresh
if interval <= 0 {
interval = config.DefaultGuestMetadataMinRefresh
}
jitter := m.guestMetadataRefreshJitter
if jitter > 0 && m.rng != nil {
interval += time.Duration(m.rng.Int63n(int64(jitter)))
}
m.guestMetadataLimiterMu.Lock()
m.guestMetadataLimiter[key] = now.Add(interval)
m.guestMetadataLimiterMu.Unlock()
}
func (m *Monitor) deferGuestMetadataRetry(key string, now time.Time) {
if m == nil {
return
}
backoff := m.guestMetadataRetryBackoff
if backoff <= 0 {
backoff = config.DefaultGuestMetadataRetryBackoff
}
m.guestMetadataLimiterMu.Lock()
m.guestMetadataLimiter[key] = now.Add(backoff)
m.guestMetadataLimiterMu.Unlock()
}
func (m *Monitor) acquireGuestMetadataSlot(ctx context.Context) bool {
if m == nil || m.guestMetadataSlots == nil {
return true
}
select {
case m.guestMetadataSlots <- struct{}{}:
return true
case <-ctx.Done():
return false
}
}
func (m *Monitor) releaseGuestMetadataSlot() {
if m == nil || m.guestMetadataSlots == nil {
return
}
select {
case <-m.guestMetadataSlots:
default:
}
}
func (m *Monitor) fetchGuestAgentMetadata(ctx context.Context, client PVEClientInterface, instanceName, nodeName, vmName string, vmid int, vmStatus *proxmox.VMStatus) ([]string, []models.GuestNetworkInterface, string, string, string) {
if vmStatus == nil || client == nil {
m.clearGuestMetadataCache(instanceName, nodeName, vmid)
@ -1562,6 +1688,19 @@ func (m *Monitor) fetchGuestAgentMetadata(ctx context.Context, client PVEClientI
return cloneStringSlice(cached.ipAddresses), cloneGuestNetworkInterfaces(cached.networkInterfaces), cached.osName, cached.osVersion, cached.agentVersion
}
needsFetch := !ok || now.Sub(cached.fetchedAt) >= guestMetadataCacheTTL
if !needsFetch {
return cloneStringSlice(cached.ipAddresses), cloneGuestNetworkInterfaces(cached.networkInterfaces), cached.osName, cached.osVersion, cached.agentVersion
}
reserved := m.tryReserveGuestMetadataFetch(key, now)
if !reserved && ok {
return cloneStringSlice(cached.ipAddresses), cloneGuestNetworkInterfaces(cached.networkInterfaces), cached.osName, cached.osVersion, cached.agentVersion
}
if !reserved && !ok {
reserved = true
}
// Start with cached values as fallback in case new calls fail
ipAddresses := cloneStringSlice(cached.ipAddresses)
networkIfaces := cloneGuestNetworkInterfaces(cached.networkInterfaces)
@ -1569,6 +1708,17 @@ func (m *Monitor) fetchGuestAgentMetadata(ctx context.Context, client PVEClientI
osVersion := cached.osVersion
agentVersion := cached.agentVersion
if reserved {
if !m.acquireGuestMetadataSlot(ctx) {
m.deferGuestMetadataRetry(key, time.Now())
return ipAddresses, networkIfaces, osName, osVersion, agentVersion
}
defer m.releaseGuestMetadataSlot()
defer func() {
m.scheduleNextGuestMetadataFetch(key, time.Now())
}()
}
ifaceCtx, cancelIface := context.WithTimeout(ctx, 5*time.Second)
interfaces, err := client.GetVMNetworkInterfaces(ifaceCtx, nodeName, vmid)
cancelIface()
@ -2584,49 +2734,72 @@ func New(cfg *config.Config) (*Monitor, error) {
}, stalenessTracker, nil, nil)
}
minRefresh := cfg.GuestMetadataMinRefreshInterval
if minRefresh <= 0 {
minRefresh = config.DefaultGuestMetadataMinRefresh
}
jitter := cfg.GuestMetadataRefreshJitter
if jitter < 0 {
jitter = 0
}
retryBackoff := cfg.GuestMetadataRetryBackoff
if retryBackoff <= 0 {
retryBackoff = config.DefaultGuestMetadataRetryBackoff
}
concurrency := cfg.GuestMetadataMaxConcurrent
if concurrency <= 0 {
concurrency = config.DefaultGuestMetadataMaxConcurrent
}
holdDuration := defaultGuestMetadataHold
m := &Monitor{
config: cfg,
state: models.NewState(),
pveClients: make(map[string]PVEClientInterface),
pbsClients: make(map[string]*pbs.Client),
pmgClients: make(map[string]*pmg.Client),
pollMetrics: getPollMetrics(),
scheduler: scheduler,
stalenessTracker: stalenessTracker,
taskQueue: taskQueue,
deadLetterQueue: deadLetterQueue,
circuitBreakers: breakers,
failureCounts: failureCounts,
lastOutcome: lastOutcome,
backoffCfg: backoff,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
maxRetryAttempts: 5,
tempCollector: tempCollector,
startTime: time.Now(),
rateTracker: NewRateTracker(),
metricsHistory: NewMetricsHistory(1000, 24*time.Hour), // Keep up to 1000 points or 24 hours
alertManager: alerts.NewManager(),
notificationMgr: notifications.NewNotificationManager(cfg.PublicURL),
configPersist: config.NewConfigPersistence(cfg.DataPath),
discoveryService: nil, // Will be initialized in Start()
authFailures: make(map[string]int),
lastAuthAttempt: make(map[string]time.Time),
lastClusterCheck: make(map[string]time.Time),
lastPhysicalDiskPoll: make(map[string]time.Time),
lastPVEBackupPoll: make(map[string]time.Time),
lastPBSBackupPoll: make(map[string]time.Time),
persistence: config.NewConfigPersistence(cfg.DataPath),
pbsBackupPollers: make(map[string]bool),
nodeSnapshots: make(map[string]NodeMemorySnapshot),
guestSnapshots: make(map[string]GuestMemorySnapshot),
nodeRRDMemCache: make(map[string]rrdMemCacheEntry),
removedDockerHosts: make(map[string]time.Time),
dockerCommands: make(map[string]*dockerHostCommand),
dockerCommandIndex: make(map[string]string),
guestMetadataCache: make(map[string]guestMetadataCacheEntry),
instanceInfoCache: make(map[string]*instanceInfo),
pollStatusMap: make(map[string]*pollStatus),
dlqInsightMap: make(map[string]*dlqInsight),
config: cfg,
state: models.NewState(),
pveClients: make(map[string]PVEClientInterface),
pbsClients: make(map[string]*pbs.Client),
pmgClients: make(map[string]*pmg.Client),
pollMetrics: getPollMetrics(),
scheduler: scheduler,
stalenessTracker: stalenessTracker,
taskQueue: taskQueue,
deadLetterQueue: deadLetterQueue,
circuitBreakers: breakers,
failureCounts: failureCounts,
lastOutcome: lastOutcome,
backoffCfg: backoff,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
maxRetryAttempts: 5,
tempCollector: tempCollector,
startTime: time.Now(),
rateTracker: NewRateTracker(),
metricsHistory: NewMetricsHistory(1000, 24*time.Hour), // Keep up to 1000 points or 24 hours
alertManager: alerts.NewManager(),
notificationMgr: notifications.NewNotificationManager(cfg.PublicURL),
configPersist: config.NewConfigPersistence(cfg.DataPath),
discoveryService: nil, // Will be initialized in Start()
authFailures: make(map[string]int),
lastAuthAttempt: make(map[string]time.Time),
lastClusterCheck: make(map[string]time.Time),
lastPhysicalDiskPoll: make(map[string]time.Time),
lastPVEBackupPoll: make(map[string]time.Time),
lastPBSBackupPoll: make(map[string]time.Time),
persistence: config.NewConfigPersistence(cfg.DataPath),
pbsBackupPollers: make(map[string]bool),
nodeSnapshots: make(map[string]NodeMemorySnapshot),
guestSnapshots: make(map[string]GuestMemorySnapshot),
nodeRRDMemCache: make(map[string]rrdMemCacheEntry),
removedDockerHosts: make(map[string]time.Time),
dockerCommands: make(map[string]*dockerHostCommand),
dockerCommandIndex: make(map[string]string),
guestMetadataCache: make(map[string]guestMetadataCacheEntry),
guestMetadataLimiter: make(map[string]time.Time),
guestMetadataMinRefresh: minRefresh,
guestMetadataRefreshJitter: jitter,
guestMetadataRetryBackoff: retryBackoff,
guestMetadataHoldDuration: holdDuration,
instanceInfoCache: make(map[string]*instanceInfo),
pollStatusMap: make(map[string]*pollStatus),
dlqInsightMap: make(map[string]*dlqInsight),
}
m.breakerBaseRetry = 5 * time.Second
@ -2672,6 +2845,10 @@ func New(cfg *config.Config) (*Monitor, error) {
log.Warn().Err(err).Msg("Failed to load email configuration")
}
if concurrency > 0 {
m.guestMetadataSlots = make(chan struct{}, concurrency)
}
if appriseConfig, err := m.configPersist.LoadAppriseConfig(); err == nil {
m.notificationMgr.SetAppriseConfig(*appriseConfig)
} else {
@ -3103,6 +3280,7 @@ func (m *Monitor) Start(ctx context.Context, wsHub *websocket.Hub) {
case <-pollTicker.C:
now := time.Now()
m.evaluateDockerAgents(now)
m.evaluateHostAgents(now)
m.cleanupRemovedDockerHosts(now)
if mock.IsMockEnabled() {
// In mock mode, keep synthetic alerts fresh

View file

@ -0,0 +1,112 @@
package monitoring
import (
"testing"
"time"
"github.com/rcourtman/pulse-go-rewrite/internal/alerts"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
)
func TestEvaluateHostAgentsTriggersOfflineAlert(t *testing.T) {
t.Helper()
monitor := &Monitor{
state: models.NewState(),
alertManager: alerts.NewManager(),
}
t.Cleanup(func() { monitor.alertManager.Stop() })
hostID := "host-offline"
monitor.state.UpsertHost(models.Host{
ID: hostID,
Hostname: "offline.local",
DisplayName: "Offline Host",
Status: "online",
IntervalSeconds: 30,
LastSeen: time.Now().Add(-10 * time.Minute),
})
now := time.Now()
for i := 0; i < 3; i++ {
monitor.evaluateHostAgents(now.Add(time.Duration(i) * time.Second))
}
snapshot := monitor.state.GetSnapshot()
statusUpdated := false
for _, host := range snapshot.Hosts {
if host.ID == hostID {
statusUpdated = true
if got := host.Status; got != "offline" {
t.Fatalf("expected host status offline, got %q", got)
}
}
}
if !statusUpdated {
t.Fatalf("host %q not found in state snapshot", hostID)
}
connKey := hostConnectionPrefix + hostID
if healthy, ok := snapshot.ConnectionHealth[connKey]; !ok || healthy {
t.Fatalf("expected connection health false, got %v (exists=%v)", healthy, ok)
}
alerts := monitor.alertManager.GetActiveAlerts()
found := false
for _, alert := range alerts {
if alert.ID == "host-offline-"+hostID {
found = true
break
}
}
if !found {
t.Fatalf("expected host offline alert to remain active")
}
}
func TestEvaluateHostAgentsClearsAlertWhenHostReturns(t *testing.T) {
t.Helper()
monitor := &Monitor{
state: models.NewState(),
alertManager: alerts.NewManager(),
}
t.Cleanup(func() { monitor.alertManager.Stop() })
hostID := "host-recover"
monitor.state.UpsertHost(models.Host{
ID: hostID,
Hostname: "recover.local",
DisplayName: "Recover Host",
Status: "online",
IntervalSeconds: 30,
LastSeen: time.Now().Add(-10 * time.Minute),
})
for i := 0; i < 3; i++ {
monitor.evaluateHostAgents(time.Now().Add(time.Duration(i) * time.Second))
}
monitor.state.UpsertHost(models.Host{
ID: hostID,
Hostname: "recover.local",
DisplayName: "Recover Host",
Status: "online",
IntervalSeconds: 30,
LastSeen: time.Now(),
})
monitor.evaluateHostAgents(time.Now())
snapshot := monitor.state.GetSnapshot()
connKey := hostConnectionPrefix + hostID
if healthy, ok := snapshot.ConnectionHealth[connKey]; !ok || !healthy {
t.Fatalf("expected connection health true after recovery, got %v (exists=%v)", healthy, ok)
}
for _, alert := range monitor.alertManager.GetActiveAlerts() {
if alert.ID == "host-offline-"+hostID {
t.Fatalf("offline alert still active after recovery")
}
}
}

View file

@ -104,6 +104,39 @@ func (q *TaskQueue) Remove(instanceType InstanceType, instance string) {
// WaitNext blocks until a task is due or context is cancelled.
func (q *TaskQueue) WaitNext(ctx context.Context) (ScheduledTask, bool) {
var (
timer *time.Timer
resetTimer = func(d time.Duration) <-chan time.Time {
if d <= 0 {
d = time.Millisecond
}
if timer == nil {
timer = time.NewTimer(d)
return timer.C
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(d)
return timer.C
}
stopTimer = func() {
if timer == nil {
return
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
}
)
defer stopTimer()
for {
select {
case <-ctx.Done():
@ -114,10 +147,11 @@ func (q *TaskQueue) WaitNext(ctx context.Context) (ScheduledTask, bool) {
q.mu.Lock()
if len(q.heap) == 0 {
q.mu.Unlock()
timerCh := resetTimer(100 * time.Millisecond)
select {
case <-ctx.Done():
return ScheduledTask{}, false
case <-time.After(100 * time.Millisecond):
case <-timerCh:
continue
}
}
@ -136,12 +170,11 @@ func (q *TaskQueue) WaitNext(ctx context.Context) (ScheduledTask, bool) {
if delay > 250*time.Millisecond {
delay = 250 * time.Millisecond
}
timer := time.NewTimer(delay)
timerCh := resetTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return ScheduledTask{}, false
case <-timer.C:
case <-timerCh:
}
}
}