fix(telemetry): aggregate tenant snapshots

This commit is contained in:
rcourtman 2026-03-28 22:31:33 +00:00
parent 1deb828577
commit c19b06d1f4
5 changed files with 245 additions and 15 deletions

View file

@ -31,7 +31,8 @@ truth for live infrastructure data.
7. `internal/unifiedresources/monitor_adapter.go`
8. `internal/unifiedresources/views.go`
9. `internal/monitoring/connected_infrastructure.go`
10. `docker-entrypoint.sh`
10. `internal/monitoring/reload.go`
11. `docker-entrypoint.sh`
## Shared Boundaries
@ -87,6 +88,10 @@ The registry proof map now treats provider discovery and metrics history as
their own governed runtime surfaces instead of leaving them folded into a
generic monitoring catch-all. Changes to provider wiring, discovery helpers,
or metrics history retention must stay attached to those explicit proof routes.
Install-wide telemetry counts are also monitoring-owned now. Any telemetry or
reporting surface that claims installation totals must aggregate across the
provisioned tenant set through the reloadable multi-tenant monitor boundary,
not by reading `GetMonitor()`'s default-org compatibility shim.
Consumer packages already use `ReadState`, but the monitoring core still has
dual truth between unified resources and `StateSnapshot`. This is the main

View file

@ -10,6 +10,8 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/ai/memory"
"github.com/rcourtman/pulse-go-rewrite/internal/alerts"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
unifiedresources "github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources"
)
@ -206,3 +208,51 @@ func TestAlertLifecycleCanonicalChangesRemainWritable(t *testing.T) {
t.Fatalf("expected projected alert-fired event, got %#v", timeline.Events)
}
}
func TestTelemetrySnapshotAggregationUsesProvisionedTenantSet(t *testing.T) {
baseDir := t.TempDir()
persistence := config.NewMultiTenantPersistence(baseDir)
for _, orgID := range []string{"default", "org-a"} {
if _, err := persistence.GetPersistence(orgID); err != nil {
t.Fatalf("GetPersistence(%s): %v", orgID, err)
}
}
rm, err := NewReloadableMonitor(&config.Config{DataPath: baseDir}, persistence, nil)
if err != nil {
t.Fatalf("NewReloadableMonitor: %v", err)
}
mtm := rm.GetMultiTenantMonitor()
if mtm == nil {
t.Fatal("expected multi-tenant monitor")
}
mtm.monitors["default"] = testTelemetryMonitor(
nil,
[]models.VM{{ID: "vm-default", VMID: 101, Name: "vm-default", Instance: "pve-default"}},
nil,
nil,
nil,
nil,
nil,
1,
)
mtm.monitors["org-a"] = testTelemetryMonitor(
nil,
[]models.VM{{ID: "vm-a", VMID: 201, Name: "vm-a", Instance: "pve-a"}},
nil,
nil,
nil,
nil,
nil,
2,
)
counts := rm.AggregateInstallSnapshotCounts()
if counts.VMs != 2 {
t.Fatalf("VMs = %d, want 2 across provisioned tenants", counts.VMs)
}
if counts.ActiveAlerts != 3 {
t.Fatalf("ActiveAlerts = %d, want 3 across provisioned tenants", counts.ActiveAlerts)
}
}

View file

@ -3,6 +3,7 @@ package monitoring
import (
"context"
"fmt"
"strings"
"sync"
"time"
@ -11,6 +12,19 @@ import (
"github.com/rs/zerolog/log"
)
// InstallSnapshotCounts holds install-wide resource and alert counts aggregated
// across tenant monitors.
type InstallSnapshotCounts struct {
PVENodes int
PBSInstances int
PMGInstances int
VMs int
Containers int
DockerHosts int
KubernetesClusters int
ActiveAlerts int
}
// ReloadableMonitor wraps a Monitor with reload capability
type ReloadableMonitor struct {
mu sync.RWMutex
@ -176,6 +190,76 @@ func (rm *ReloadableMonitor) ReadSnapshot(orgID string) interface{} {
return monitor.ReadSnapshot()
}
// AggregateInstallSnapshotCounts returns install-wide resource and alert counts
// across all provisioned organizations.
func (rm *ReloadableMonitor) AggregateInstallSnapshotCounts() InstallSnapshotCounts {
rm.mu.RLock()
mtMonitor := rm.mtMonitor
persistence := rm.persistence
rm.mu.RUnlock()
if mtMonitor == nil {
return InstallSnapshotCounts{}
}
orgIDs := []string{"default"}
if persistence != nil {
orgs, err := persistence.ListOrganizations()
if err != nil {
log.Warn().Err(err).Msg("Telemetry snapshot falling back to default organization after tenant listing failed")
} else {
seen := make(map[string]struct{}, len(orgs))
orgIDs = orgIDs[:0]
for _, org := range orgs {
if org == nil {
continue
}
orgID := strings.TrimSpace(org.ID)
if orgID == "" {
continue
}
if _, exists := seen[orgID]; exists {
continue
}
seen[orgID] = struct{}{}
orgIDs = append(orgIDs, orgID)
}
if len(orgIDs) == 0 {
orgIDs = []string{"default"}
}
}
}
var counts InstallSnapshotCounts
for _, orgID := range orgIDs {
monitor, err := mtMonitor.GetMonitor(orgID)
if err != nil || monitor == nil {
log.Debug().Err(err).Str("org_id", orgID).Msg("Telemetry snapshot could not load tenant monitor")
continue
}
accumulateInstallSnapshotCounts(&counts, monitor)
}
return counts
}
func accumulateInstallSnapshotCounts(counts *InstallSnapshotCounts, monitor *Monitor) {
if counts == nil || monitor == nil {
return
}
readState := monitor.GetUnifiedReadStateOrSnapshot()
if readState != nil {
counts.PVENodes += len(readState.Nodes())
counts.PBSInstances += len(readState.PBSInstances())
counts.PMGInstances += len(readState.PMGInstances())
counts.VMs += len(readState.VMs())
counts.Containers += len(readState.Containers())
counts.DockerHosts += len(readState.DockerHosts())
counts.KubernetesClusters += len(readState.K8sClusters())
}
counts.ActiveAlerts += len(monitor.ActiveAlertsSnapshot())
}
// Stop stops the monitor
func (rm *ReloadableMonitor) Stop() {
rm.mu.Lock()

View file

@ -6,6 +6,7 @@ import (
"time"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -76,3 +77,96 @@ func TestReloadableMonitor_Lifecycle_Coverage(t *testing.T) {
// Test Stop
rm.Stop()
}
func TestReloadableMonitorAggregateInstallSnapshotCountsIncludesProvisionedTenants(t *testing.T) {
baseDir := t.TempDir()
cfg := &config.Config{DataPath: baseDir}
persistence := config.NewMultiTenantPersistence(baseDir)
for _, orgID := range []string{"default", "org-a", "org-b"} {
_, err := persistence.GetPersistence(orgID)
require.NoError(t, err)
}
rm, err := NewReloadableMonitor(cfg, persistence, nil)
require.NoError(t, err)
mtm := rm.GetMultiTenantMonitor()
require.NotNil(t, mtm)
mtm.monitors["default"] = testTelemetryMonitor(
[]models.Node{{ID: "node-default", Name: "node-default", Instance: "pve-default"}},
[]models.VM{{ID: "vm-default", VMID: 101, Name: "vm-default", Instance: "pve-default"}},
[]models.Container{{ID: "ct-default", VMID: 201, Name: "ct-default", Instance: "pve-default"}},
[]models.PBSInstance{{Name: "pbs-default", Host: "pbs-default.local"}},
[]models.PMGInstance{{Name: "pmg-default", Host: "pmg-default.local"}},
[]models.DockerHost{{ID: "docker-default", Hostname: "docker-default"}},
[]models.KubernetesCluster{{ID: "k8s-default", Name: "k8s-default"}},
1,
)
mtm.monitors["org-a"] = testTelemetryMonitor(
[]models.Node{{ID: "node-a", Name: "node-a", Instance: "pve-a"}},
[]models.VM{
{ID: "vm-a1", VMID: 102, Name: "vm-a1", Instance: "pve-a"},
{ID: "vm-a2", VMID: 103, Name: "vm-a2", Instance: "pve-a"},
},
nil,
[]models.PBSInstance{{Name: "pbs-a", Host: "pbs-a.local"}},
nil,
nil,
nil,
2,
)
mtm.monitors["org-b"] = testTelemetryMonitor(
nil,
nil,
[]models.Container{{ID: "ct-b1", VMID: 202, Name: "ct-b1", Instance: "pve-b"}},
nil,
[]models.PMGInstance{{Name: "pmg-b", Host: "pmg-b.local"}},
[]models.DockerHost{{ID: "docker-b", Hostname: "docker-b"}},
[]models.KubernetesCluster{{ID: "k8s-b", Name: "k8s-b"}},
3,
)
counts := rm.AggregateInstallSnapshotCounts()
assert.Equal(t, 2, counts.PVENodes)
assert.Equal(t, 2, counts.PBSInstances)
assert.Equal(t, 2, counts.PMGInstances)
assert.Equal(t, 3, counts.VMs)
assert.Equal(t, 2, counts.Containers)
assert.Equal(t, 2, counts.DockerHosts)
assert.Equal(t, 2, counts.KubernetesClusters)
assert.Equal(t, 6, counts.ActiveAlerts)
}
func testTelemetryMonitor(
nodes []models.Node,
vms []models.VM,
containers []models.Container,
pbsInstances []models.PBSInstance,
pmgInstances []models.PMGInstance,
dockerHosts []models.DockerHost,
k8sClusters []models.KubernetesCluster,
activeAlerts int,
) *Monitor {
state := models.NewState()
state.UpdateNodes(nodes)
state.UpdateVMs(vms)
state.UpdateContainers(containers)
state.UpdatePBSInstances(pbsInstances)
state.UpdatePMGInstances(pmgInstances)
for _, host := range dockerHosts {
state.UpsertDockerHost(host)
}
for _, cluster := range k8sClusters {
state.UpsertKubernetesCluster(cluster)
}
alerts := make([]models.Alert, 0, activeAlerts)
for i := 0; i < activeAlerts; i++ {
alerts = append(alerts, models.Alert{ID: time.Now().Add(time.Duration(i) * time.Second).Format(time.RFC3339Nano)})
}
state.UpdateActiveAlerts(alerts)
return &Monitor{state: state}
}

View file

@ -451,20 +451,17 @@ func Run(ctx context.Context, version string) error {
LicenseTier: "free",
}
// Resource counts from monitor state.
if mon := reloadableMonitor.GetMonitor(); mon != nil {
readState := mon.GetUnifiedReadStateOrSnapshot()
if readState != nil {
snap.PVENodes = len(readState.Nodes())
snap.PBSInstances = len(readState.PBSInstances())
snap.PMGInstances = len(readState.PMGInstances())
snap.VMs = len(readState.VMs())
snap.Containers = len(readState.Containers())
snap.DockerHosts = len(readState.DockerHosts())
snap.KubernetesClusters = len(readState.K8sClusters())
}
snap.ActiveAlerts = len(mon.ActiveAlertsSnapshot())
}
// Resource counts come from the tenant-aware monitor aggregate, not the
// default-org compatibility shim.
counts := reloadableMonitor.AggregateInstallSnapshotCounts()
snap.PVENodes = counts.PVENodes
snap.PBSInstances = counts.PBSInstances
snap.PMGInstances = counts.PMGInstances
snap.VMs = counts.VMs
snap.Containers = counts.Containers
snap.DockerHosts = counts.DockerHosts
snap.KubernetesClusters = counts.KubernetesClusters
snap.ActiveAlerts = counts.ActiveAlerts
// Feature flags from persisted config (using pre-created persistence).
if aiCfg, err := telemetryPersistence.LoadAIConfig(); err == nil && aiCfg != nil {