diff --git a/docs/release-control/v6/internal/subsystems/monitoring.md b/docs/release-control/v6/internal/subsystems/monitoring.md index b04617aa3..33e4259c4 100644 --- a/docs/release-control/v6/internal/subsystems/monitoring.md +++ b/docs/release-control/v6/internal/subsystems/monitoring.md @@ -31,7 +31,8 @@ truth for live infrastructure data. 7. `internal/unifiedresources/monitor_adapter.go` 8. `internal/unifiedresources/views.go` 9. `internal/monitoring/connected_infrastructure.go` -10. `docker-entrypoint.sh` +10. `internal/monitoring/reload.go` +11. `docker-entrypoint.sh` ## Shared Boundaries @@ -87,6 +88,10 @@ The registry proof map now treats provider discovery and metrics history as their own governed runtime surfaces instead of leaving them folded into a generic monitoring catch-all. Changes to provider wiring, discovery helpers, or metrics history retention must stay attached to those explicit proof routes. +Install-wide telemetry counts are also monitoring-owned now. Any telemetry or +reporting surface that claims installation totals must aggregate across the +provisioned tenant set through the reloadable multi-tenant monitor boundary, +not by reading `GetMonitor()`'s default-org compatibility shim. Consumer packages already use `ReadState`, but the monitoring core still has dual truth between unified resources and `StateSnapshot`. This is the main diff --git a/internal/monitoring/canonical_guardrails_test.go b/internal/monitoring/canonical_guardrails_test.go index f9fb654bb..0f7a8ca1c 100644 --- a/internal/monitoring/canonical_guardrails_test.go +++ b/internal/monitoring/canonical_guardrails_test.go @@ -10,6 +10,8 @@ import ( "github.com/rcourtman/pulse-go-rewrite/internal/ai/memory" "github.com/rcourtman/pulse-go-rewrite/internal/alerts" + "github.com/rcourtman/pulse-go-rewrite/internal/config" + "github.com/rcourtman/pulse-go-rewrite/internal/models" unifiedresources "github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources" ) @@ -206,3 +208,51 @@ func TestAlertLifecycleCanonicalChangesRemainWritable(t *testing.T) { t.Fatalf("expected projected alert-fired event, got %#v", timeline.Events) } } + +func TestTelemetrySnapshotAggregationUsesProvisionedTenantSet(t *testing.T) { + baseDir := t.TempDir() + persistence := config.NewMultiTenantPersistence(baseDir) + for _, orgID := range []string{"default", "org-a"} { + if _, err := persistence.GetPersistence(orgID); err != nil { + t.Fatalf("GetPersistence(%s): %v", orgID, err) + } + } + + rm, err := NewReloadableMonitor(&config.Config{DataPath: baseDir}, persistence, nil) + if err != nil { + t.Fatalf("NewReloadableMonitor: %v", err) + } + + mtm := rm.GetMultiTenantMonitor() + if mtm == nil { + t.Fatal("expected multi-tenant monitor") + } + mtm.monitors["default"] = testTelemetryMonitor( + nil, + []models.VM{{ID: "vm-default", VMID: 101, Name: "vm-default", Instance: "pve-default"}}, + nil, + nil, + nil, + nil, + nil, + 1, + ) + mtm.monitors["org-a"] = testTelemetryMonitor( + nil, + []models.VM{{ID: "vm-a", VMID: 201, Name: "vm-a", Instance: "pve-a"}}, + nil, + nil, + nil, + nil, + nil, + 2, + ) + + counts := rm.AggregateInstallSnapshotCounts() + if counts.VMs != 2 { + t.Fatalf("VMs = %d, want 2 across provisioned tenants", counts.VMs) + } + if counts.ActiveAlerts != 3 { + t.Fatalf("ActiveAlerts = %d, want 3 across provisioned tenants", counts.ActiveAlerts) + } +} diff --git a/internal/monitoring/reload.go b/internal/monitoring/reload.go index 572967469..09d950746 100644 --- a/internal/monitoring/reload.go +++ b/internal/monitoring/reload.go @@ -3,6 +3,7 @@ package monitoring import ( "context" "fmt" + "strings" "sync" "time" @@ -11,6 +12,19 @@ import ( "github.com/rs/zerolog/log" ) +// InstallSnapshotCounts holds install-wide resource and alert counts aggregated +// across tenant monitors. +type InstallSnapshotCounts struct { + PVENodes int + PBSInstances int + PMGInstances int + VMs int + Containers int + DockerHosts int + KubernetesClusters int + ActiveAlerts int +} + // ReloadableMonitor wraps a Monitor with reload capability type ReloadableMonitor struct { mu sync.RWMutex @@ -176,6 +190,76 @@ func (rm *ReloadableMonitor) ReadSnapshot(orgID string) interface{} { return monitor.ReadSnapshot() } +// AggregateInstallSnapshotCounts returns install-wide resource and alert counts +// across all provisioned organizations. +func (rm *ReloadableMonitor) AggregateInstallSnapshotCounts() InstallSnapshotCounts { + rm.mu.RLock() + mtMonitor := rm.mtMonitor + persistence := rm.persistence + rm.mu.RUnlock() + + if mtMonitor == nil { + return InstallSnapshotCounts{} + } + + orgIDs := []string{"default"} + if persistence != nil { + orgs, err := persistence.ListOrganizations() + if err != nil { + log.Warn().Err(err).Msg("Telemetry snapshot falling back to default organization after tenant listing failed") + } else { + seen := make(map[string]struct{}, len(orgs)) + orgIDs = orgIDs[:0] + for _, org := range orgs { + if org == nil { + continue + } + orgID := strings.TrimSpace(org.ID) + if orgID == "" { + continue + } + if _, exists := seen[orgID]; exists { + continue + } + seen[orgID] = struct{}{} + orgIDs = append(orgIDs, orgID) + } + if len(orgIDs) == 0 { + orgIDs = []string{"default"} + } + } + } + + var counts InstallSnapshotCounts + for _, orgID := range orgIDs { + monitor, err := mtMonitor.GetMonitor(orgID) + if err != nil || monitor == nil { + log.Debug().Err(err).Str("org_id", orgID).Msg("Telemetry snapshot could not load tenant monitor") + continue + } + accumulateInstallSnapshotCounts(&counts, monitor) + } + return counts +} + +func accumulateInstallSnapshotCounts(counts *InstallSnapshotCounts, monitor *Monitor) { + if counts == nil || monitor == nil { + return + } + + readState := monitor.GetUnifiedReadStateOrSnapshot() + if readState != nil { + counts.PVENodes += len(readState.Nodes()) + counts.PBSInstances += len(readState.PBSInstances()) + counts.PMGInstances += len(readState.PMGInstances()) + counts.VMs += len(readState.VMs()) + counts.Containers += len(readState.Containers()) + counts.DockerHosts += len(readState.DockerHosts()) + counts.KubernetesClusters += len(readState.K8sClusters()) + } + counts.ActiveAlerts += len(monitor.ActiveAlertsSnapshot()) +} + // Stop stops the monitor func (rm *ReloadableMonitor) Stop() { rm.mu.Lock() diff --git a/internal/monitoring/reload_test.go b/internal/monitoring/reload_test.go index 8412bf4e6..b8d4e0f2f 100644 --- a/internal/monitoring/reload_test.go +++ b/internal/monitoring/reload_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/rcourtman/pulse-go-rewrite/internal/config" + "github.com/rcourtman/pulse-go-rewrite/internal/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -76,3 +77,96 @@ func TestReloadableMonitor_Lifecycle_Coverage(t *testing.T) { // Test Stop rm.Stop() } + +func TestReloadableMonitorAggregateInstallSnapshotCountsIncludesProvisionedTenants(t *testing.T) { + baseDir := t.TempDir() + cfg := &config.Config{DataPath: baseDir} + persistence := config.NewMultiTenantPersistence(baseDir) + + for _, orgID := range []string{"default", "org-a", "org-b"} { + _, err := persistence.GetPersistence(orgID) + require.NoError(t, err) + } + + rm, err := NewReloadableMonitor(cfg, persistence, nil) + require.NoError(t, err) + + mtm := rm.GetMultiTenantMonitor() + require.NotNil(t, mtm) + mtm.monitors["default"] = testTelemetryMonitor( + []models.Node{{ID: "node-default", Name: "node-default", Instance: "pve-default"}}, + []models.VM{{ID: "vm-default", VMID: 101, Name: "vm-default", Instance: "pve-default"}}, + []models.Container{{ID: "ct-default", VMID: 201, Name: "ct-default", Instance: "pve-default"}}, + []models.PBSInstance{{Name: "pbs-default", Host: "pbs-default.local"}}, + []models.PMGInstance{{Name: "pmg-default", Host: "pmg-default.local"}}, + []models.DockerHost{{ID: "docker-default", Hostname: "docker-default"}}, + []models.KubernetesCluster{{ID: "k8s-default", Name: "k8s-default"}}, + 1, + ) + mtm.monitors["org-a"] = testTelemetryMonitor( + []models.Node{{ID: "node-a", Name: "node-a", Instance: "pve-a"}}, + []models.VM{ + {ID: "vm-a1", VMID: 102, Name: "vm-a1", Instance: "pve-a"}, + {ID: "vm-a2", VMID: 103, Name: "vm-a2", Instance: "pve-a"}, + }, + nil, + []models.PBSInstance{{Name: "pbs-a", Host: "pbs-a.local"}}, + nil, + nil, + nil, + 2, + ) + mtm.monitors["org-b"] = testTelemetryMonitor( + nil, + nil, + []models.Container{{ID: "ct-b1", VMID: 202, Name: "ct-b1", Instance: "pve-b"}}, + nil, + []models.PMGInstance{{Name: "pmg-b", Host: "pmg-b.local"}}, + []models.DockerHost{{ID: "docker-b", Hostname: "docker-b"}}, + []models.KubernetesCluster{{ID: "k8s-b", Name: "k8s-b"}}, + 3, + ) + + counts := rm.AggregateInstallSnapshotCounts() + + assert.Equal(t, 2, counts.PVENodes) + assert.Equal(t, 2, counts.PBSInstances) + assert.Equal(t, 2, counts.PMGInstances) + assert.Equal(t, 3, counts.VMs) + assert.Equal(t, 2, counts.Containers) + assert.Equal(t, 2, counts.DockerHosts) + assert.Equal(t, 2, counts.KubernetesClusters) + assert.Equal(t, 6, counts.ActiveAlerts) +} + +func testTelemetryMonitor( + nodes []models.Node, + vms []models.VM, + containers []models.Container, + pbsInstances []models.PBSInstance, + pmgInstances []models.PMGInstance, + dockerHosts []models.DockerHost, + k8sClusters []models.KubernetesCluster, + activeAlerts int, +) *Monitor { + state := models.NewState() + state.UpdateNodes(nodes) + state.UpdateVMs(vms) + state.UpdateContainers(containers) + state.UpdatePBSInstances(pbsInstances) + state.UpdatePMGInstances(pmgInstances) + for _, host := range dockerHosts { + state.UpsertDockerHost(host) + } + for _, cluster := range k8sClusters { + state.UpsertKubernetesCluster(cluster) + } + + alerts := make([]models.Alert, 0, activeAlerts) + for i := 0; i < activeAlerts; i++ { + alerts = append(alerts, models.Alert{ID: time.Now().Add(time.Duration(i) * time.Second).Format(time.RFC3339Nano)}) + } + state.UpdateActiveAlerts(alerts) + + return &Monitor{state: state} +} diff --git a/pkg/server/server.go b/pkg/server/server.go index e83214cad..1643ffe0e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -451,20 +451,17 @@ func Run(ctx context.Context, version string) error { LicenseTier: "free", } - // Resource counts from monitor state. - if mon := reloadableMonitor.GetMonitor(); mon != nil { - readState := mon.GetUnifiedReadStateOrSnapshot() - if readState != nil { - snap.PVENodes = len(readState.Nodes()) - snap.PBSInstances = len(readState.PBSInstances()) - snap.PMGInstances = len(readState.PMGInstances()) - snap.VMs = len(readState.VMs()) - snap.Containers = len(readState.Containers()) - snap.DockerHosts = len(readState.DockerHosts()) - snap.KubernetesClusters = len(readState.K8sClusters()) - } - snap.ActiveAlerts = len(mon.ActiveAlertsSnapshot()) - } + // Resource counts come from the tenant-aware monitor aggregate, not the + // default-org compatibility shim. + counts := reloadableMonitor.AggregateInstallSnapshotCounts() + snap.PVENodes = counts.PVENodes + snap.PBSInstances = counts.PBSInstances + snap.PMGInstances = counts.PMGInstances + snap.VMs = counts.VMs + snap.Containers = counts.Containers + snap.DockerHosts = counts.DockerHosts + snap.KubernetesClusters = counts.KubernetesClusters + snap.ActiveAlerts = counts.ActiveAlerts // Feature flags from persisted config (using pre-created persistence). if aiCfg, err := telemetryPersistence.LoadAIConfig(); err == nil && aiCfg != nil {