From 6427f28a087d68324da13ee7b4be52fb75f89d6a Mon Sep 17 00:00:00 2001 From: rcourtman Date: Wed, 4 Feb 2026 12:34:40 +0000 Subject: [PATCH] Fix stale metrics store reference in reporting engine after monitor reload The reporting engine held a direct pointer to the metrics store, which becomes invalid after a monitor reload (settings change, node config save, etc.) closes and recreates the store. Use a dynamic getter closure that always resolves to the current monitor's active store. Also adds diagnostic logging when report queries return zero metrics, and integration tests covering the full metrics-to-report pipeline including reload scenarios. Fixes #1186 --- pkg/reporting/engine.go | 51 ++- pkg/reporting/engine_integration_test.go | 388 +++++++++++++++++++++++ pkg/server/server.go | 10 +- 3 files changed, 436 insertions(+), 13 deletions(-) create mode 100644 pkg/reporting/engine_integration_test.go diff --git a/pkg/reporting/engine.go b/pkg/reporting/engine.go index f4dffea37..c528a419e 100644 --- a/pkg/reporting/engine.go +++ b/pkg/reporting/engine.go @@ -11,28 +11,47 @@ import ( // ReportEngine implements the reporting.Engine interface with // full CSV and PDF generation capabilities. type ReportEngine struct { - metricsStore *metrics.Store - csvGen *CSVGenerator - pdfGen *PDFGenerator + metricsStore *metrics.Store + metricsStoreGetter func() *metrics.Store + csvGen *CSVGenerator + pdfGen *PDFGenerator } // EngineConfig holds configuration for the report engine. type EngineConfig struct { + // MetricsStore is a direct reference to the metrics store. + // Use MetricsStoreGetter instead when the store may be replaced + // (e.g., after monitor reloads). MetricsStore *metrics.Store + + // MetricsStoreGetter dynamically resolves the current metrics store. + // When set, this is used instead of MetricsStore, ensuring queries + // always target the active store even after monitor reloads. + MetricsStoreGetter func() *metrics.Store } // NewReportEngine creates a new reporting engine. func NewReportEngine(cfg EngineConfig) *ReportEngine { return &ReportEngine{ - metricsStore: cfg.MetricsStore, - csvGen: NewCSVGenerator(), - pdfGen: NewPDFGenerator(), + metricsStore: cfg.MetricsStore, + metricsStoreGetter: cfg.MetricsStoreGetter, + csvGen: NewCSVGenerator(), + pdfGen: NewPDFGenerator(), } } +// getMetricsStore returns the current metrics store, preferring the dynamic +// getter over the static reference. +func (e *ReportEngine) getMetricsStore() *metrics.Store { + if e.metricsStoreGetter != nil { + return e.metricsStoreGetter() + } + return e.metricsStore +} + // Generate creates a report in the specified format. func (e *ReportEngine) Generate(req MetricReportRequest) (data []byte, contentType string, err error) { - if e.metricsStore == nil { + if e.getMetricsStore() == nil { return nil, "", fmt.Errorf("metrics store not initialized") } @@ -140,12 +159,14 @@ func (e *ReportEngine) queryMetrics(req MetricReportRequest) (*ReportData, error data.Storage = req.Storage data.Disks = req.Disks + store := e.getMetricsStore() + var metricsMap map[string][]metrics.MetricPoint var err error if req.MetricType != "" { // Query specific metric - points, queryErr := e.metricsStore.Query(req.ResourceType, req.ResourceID, req.MetricType, req.Start, req.End, 0) + points, queryErr := store.Query(req.ResourceType, req.ResourceID, req.MetricType, req.Start, req.End, 0) if queryErr != nil { return nil, queryErr } @@ -154,12 +175,22 @@ func (e *ReportEngine) queryMetrics(req MetricReportRequest) (*ReportData, error } } else { // Query all metrics for the resource - metricsMap, err = e.metricsStore.QueryAll(req.ResourceType, req.ResourceID, req.Start, req.End, 0) + metricsMap, err = store.QueryAll(req.ResourceType, req.ResourceID, req.Start, req.End, 0) if err != nil { return nil, err } } + if len(metricsMap) == 0 { + log.Warn(). + Str("resourceType", req.ResourceType). + Str("resourceID", req.ResourceID). + Str("metricType", req.MetricType). + Time("start", req.Start). + Time("end", req.End). + Msg("Report query returned no metrics — verify resource ID matches stored metrics and time range contains data") + } + // Convert to report format and calculate statistics for metricType, points := range metricsMap { if len(points) == 0 { @@ -204,7 +235,7 @@ func (e *ReportEngine) queryMetrics(req MetricReportRequest) (*ReportData, error // GenerateMulti creates a multi-resource report in the specified format. func (e *ReportEngine) GenerateMulti(req MultiReportRequest) (data []byte, contentType string, err error) { - if e.metricsStore == nil { + if e.getMetricsStore() == nil { return nil, "", fmt.Errorf("metrics store not initialized") } diff --git a/pkg/reporting/engine_integration_test.go b/pkg/reporting/engine_integration_test.go new file mode 100644 index 000000000..f517fe3fb --- /dev/null +++ b/pkg/reporting/engine_integration_test.go @@ -0,0 +1,388 @@ +package reporting + +import ( + "path/filepath" + "strings" + "testing" + "time" + + "github.com/rcourtman/pulse-go-rewrite/pkg/metrics" +) + +// TestReportEngineWithMetricsStore verifies the full data pipeline from +// metrics store writes through report generation. This is a regression test +// for the zero-datapoints bug reported in GitHub issue #1186. +func TestReportEngineWithMetricsStore(t *testing.T) { + dir := t.TempDir() + store, err := metrics.NewStore(metrics.StoreConfig{ + DBPath: filepath.Join(dir, "metrics.db"), + WriteBufferSize: 10, + FlushInterval: 100 * time.Millisecond, + RetentionRaw: 24 * time.Hour, + RetentionMinute: 7 * 24 * time.Hour, + RetentionHourly: 30 * 24 * time.Hour, + RetentionDaily: 90 * 24 * time.Hour, + }) + if err != nil { + t.Fatalf("failed to create metrics store: %v", err) + } + defer store.Close() + + engine := NewReportEngine(EngineConfig{MetricsStore: store}) + + // Write metrics data for a node over the last hour + now := time.Now() + nodeID := "pve-prod-pve1" + for i := 0; i < 12; i++ { + ts := now.Add(time.Duration(-60+i*5) * time.Minute) + store.Write("node", nodeID, "cpu", float64(40+i*2), ts) + store.Write("node", nodeID, "memory", float64(55+i), ts) + } + + // Flush to ensure data is written to SQLite + store.Flush() + // Brief pause to let the background writer process + time.Sleep(200 * time.Millisecond) + + // Generate a report for the last 2 hours (uses raw tier) + req := MetricReportRequest{ + ResourceType: "node", + ResourceID: nodeID, + Start: now.Add(-2 * time.Hour), + End: now.Add(time.Minute), // slight buffer + Format: FormatCSV, + } + + data, contentType, err := engine.Generate(req) + if err != nil { + t.Fatalf("report generation failed: %v", err) + } + + if contentType != "text/csv" { + t.Errorf("expected text/csv, got %s", contentType) + } + + csv := string(data) + + // Verify report has data points (CSV uses display names) + if !strings.Contains(csv, "CPU") { + t.Error("report missing CPU metric data") + } + if !strings.Contains(csv, "Memory") { + t.Error("report missing memory metric data") + } + + // Verify the report header includes the correct resource ID + if !strings.Contains(csv, nodeID) { + t.Errorf("report missing resource ID %s", nodeID) + } + + // Verify data rows exist (not just headers) + lines := strings.Split(csv, "\n") + dataStarted := false + dataRows := 0 + for _, line := range lines { + if strings.HasPrefix(line, "# DATA") { + dataStarted = true + continue + } + if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" { + dataRows++ + } + } + + if dataRows == 0 { + t.Fatal("report has zero data rows — metrics store data did not reach report output") + } + t.Logf("Report generated with %d data rows", dataRows) +} + +// TestReportEngineWithMetricsStore_VM verifies reports work for VM resources +// with composite IDs (instance:node:vmid format). +func TestReportEngineWithMetricsStore_VM(t *testing.T) { + dir := t.TempDir() + store, err := metrics.NewStore(metrics.StoreConfig{ + DBPath: filepath.Join(dir, "metrics.db"), + WriteBufferSize: 10, + FlushInterval: 100 * time.Millisecond, + RetentionRaw: 24 * time.Hour, + RetentionMinute: 7 * 24 * time.Hour, + RetentionHourly: 30 * 24 * time.Hour, + RetentionDaily: 90 * 24 * time.Hour, + }) + if err != nil { + t.Fatalf("failed to create metrics store: %v", err) + } + defer store.Close() + + engine := NewReportEngine(EngineConfig{MetricsStore: store}) + + now := time.Now() + vmID := "pve-prod:pve1:100" + for i := 0; i < 6; i++ { + ts := now.Add(time.Duration(-30+i*5) * time.Minute) + store.Write("vm", vmID, "cpu", float64(30+i*5), ts) + store.Write("vm", vmID, "memory", float64(60+i*2), ts) + store.Write("vm", vmID, "disk", float64(40+i), ts) + } + + store.Flush() + time.Sleep(200 * time.Millisecond) + + req := MetricReportRequest{ + ResourceType: "vm", + ResourceID: vmID, + Start: now.Add(-1 * time.Hour), + End: now.Add(time.Minute), + Format: FormatPDF, + } + + data, contentType, err := engine.Generate(req) + if err != nil { + t.Fatalf("PDF report generation failed: %v", err) + } + + if contentType != "application/pdf" { + t.Errorf("expected application/pdf, got %s", contentType) + } + + if len(data) < 1000 { + t.Errorf("PDF seems too small (%d bytes), likely has no data", len(data)) + } +} + +// TestReportEngineStaleStoreAfterClose verifies that querying a closed +// metrics store returns an error rather than silently returning zero results. +// This documents the behavior that causes blank reports after a monitor reload. +func TestReportEngineStaleStoreAfterClose(t *testing.T) { + dir := t.TempDir() + store, err := metrics.NewStore(metrics.StoreConfig{ + DBPath: filepath.Join(dir, "metrics.db"), + WriteBufferSize: 10, + FlushInterval: 100 * time.Millisecond, + RetentionRaw: 24 * time.Hour, + RetentionMinute: 7 * 24 * time.Hour, + RetentionHourly: 30 * 24 * time.Hour, + RetentionDaily: 90 * 24 * time.Hour, + }) + if err != nil { + t.Fatalf("failed to create metrics store: %v", err) + } + + engine := NewReportEngine(EngineConfig{MetricsStore: store}) + + // Write some data + now := time.Now() + store.Write("node", "test-node", "cpu", 50.0, now) + store.Flush() + time.Sleep(200 * time.Millisecond) + + // Close the store (simulates what happens during monitor reload) + store.Close() + + // Attempt to generate a report with the stale store reference + req := MetricReportRequest{ + ResourceType: "node", + ResourceID: "test-node", + Start: now.Add(-1 * time.Hour), + End: now.Add(time.Minute), + Format: FormatCSV, + } + + _, _, err = engine.Generate(req) + // After a store is closed, the engine should return an error (not silent empty results) + if err == nil { + t.Log("Note: querying a closed store did not return an error — this means blank reports could occur silently after monitor reload") + } else { + t.Logf("Querying closed store returned error as expected: %v", err) + } +} + +// TestReportEngineReplacedStore verifies that replacing the global engine +// with a new one pointing to a fresh store produces reports with data. +func TestReportEngineReplacedStore(t *testing.T) { + dir1 := t.TempDir() + store1, err := metrics.NewStore(metrics.StoreConfig{ + DBPath: filepath.Join(dir1, "metrics.db"), + WriteBufferSize: 10, + FlushInterval: 100 * time.Millisecond, + RetentionRaw: 24 * time.Hour, + RetentionMinute: 7 * 24 * time.Hour, + RetentionHourly: 30 * 24 * time.Hour, + RetentionDaily: 90 * 24 * time.Hour, + }) + if err != nil { + t.Fatalf("failed to create store1: %v", err) + } + + engine1 := NewReportEngine(EngineConfig{MetricsStore: store1}) + SetEngine(engine1) + t.Cleanup(func() { SetEngine(nil) }) + + now := time.Now() + store1.Write("node", "node-1", "cpu", 50.0, now) + store1.Flush() + time.Sleep(200 * time.Millisecond) + + // Simulate reload: close old store, create new one (same DB file) + store1.Close() + + store2, err := metrics.NewStore(metrics.StoreConfig{ + DBPath: filepath.Join(dir1, "metrics.db"), // Same file + WriteBufferSize: 10, + FlushInterval: 100 * time.Millisecond, + RetentionRaw: 24 * time.Hour, + RetentionMinute: 7 * 24 * time.Hour, + RetentionHourly: 30 * 24 * time.Hour, + RetentionDaily: 90 * 24 * time.Hour, + }) + if err != nil { + t.Fatalf("failed to create store2: %v", err) + } + defer store2.Close() + + // Write additional data via new store + store2.Write("node", "node-1", "cpu", 55.0, now.Add(5*time.Minute)) + store2.Flush() + time.Sleep(200 * time.Millisecond) + + // Replace global engine with new store + engine2 := NewReportEngine(EngineConfig{MetricsStore: store2}) + SetEngine(engine2) + + req := MetricReportRequest{ + ResourceType: "node", + ResourceID: "node-1", + Start: now.Add(-1 * time.Hour), + End: now.Add(1 * time.Hour), + Format: FormatCSV, + } + + data, _, err := engine2.Generate(req) + if err != nil { + t.Fatalf("report generation failed after store replacement: %v", err) + } + + csv := string(data) + lines := strings.Split(csv, "\n") + dataStarted := false + dataRows := 0 + for _, line := range lines { + if strings.HasPrefix(line, "# DATA") { + dataStarted = true + continue + } + if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" { + dataRows++ + } + } + + if dataRows == 0 { + t.Fatal("report has zero data rows after store replacement — fix did not work") + } + t.Logf("After store replacement: report generated with %d data rows", dataRows) +} + +// TestReportEngineGetterSurvivesReload verifies that an engine configured with +// MetricsStoreGetter automatically picks up a new store after the old one is +// closed, without needing to recreate the engine. This is the key behavior +// that prevents stale store references after monitor reloads. +func TestReportEngineGetterSurvivesReload(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "metrics.db") + + newStore := func() *metrics.Store { + s, err := metrics.NewStore(metrics.StoreConfig{ + DBPath: dbPath, + WriteBufferSize: 10, + FlushInterval: 100 * time.Millisecond, + RetentionRaw: 24 * time.Hour, + RetentionMinute: 7 * 24 * time.Hour, + RetentionHourly: 30 * 24 * time.Hour, + RetentionDaily: 90 * 24 * time.Hour, + }) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + return s + } + + // Simulate what reloadableMonitor.GetMonitor().GetMetricsStore() does: + // returns whatever the current store is. + var currentStore *metrics.Store + currentStore = newStore() + + engine := NewReportEngine(EngineConfig{ + MetricsStoreGetter: func() *metrics.Store { + return currentStore + }, + }) + + now := time.Now() + req := MetricReportRequest{ + ResourceType: "node", + ResourceID: "node-1", + Start: now.Add(-1 * time.Hour), + End: now.Add(1 * time.Hour), + Format: FormatCSV, + } + + // Phase 1: Write data and generate a report — should work + currentStore.Write("node", "node-1", "cpu", 50.0, now) + currentStore.Flush() + time.Sleep(200 * time.Millisecond) + + data, _, err := engine.Generate(req) + if err != nil { + t.Fatalf("phase 1: report generation failed: %v", err) + } + rows1 := countCSVDataRows(string(data)) + if rows1 == 0 { + t.Fatal("phase 1: expected data rows, got zero") + } + t.Logf("Phase 1 (before reload): %d data rows", rows1) + + // Phase 2: Simulate reload — close old store, create new one + currentStore.Close() + currentStore = newStore() + + // Write new data through the new store + currentStore.Write("node", "node-1", "cpu", 60.0, now.Add(5*time.Minute)) + currentStore.Write("node", "node-1", "cpu", 70.0, now.Add(10*time.Minute)) + currentStore.Flush() + time.Sleep(200 * time.Millisecond) + + // Phase 3: Same engine instance, should use the new store via getter + data, _, err = engine.Generate(req) + if err != nil { + t.Fatalf("phase 3: report generation failed after reload: %v", err) + } + rows3 := countCSVDataRows(string(data)) + if rows3 == 0 { + t.Fatal("phase 3: report has zero data rows after reload — getter did not resolve new store") + } + t.Logf("Phase 3 (after reload): %d data rows", rows3) + + // The new store should see ALL data (original + new) since it opens the same SQLite file + if rows3 < rows1 { + t.Errorf("phase 3: expected at least %d rows (pre-reload count), got %d", rows1, rows3) + } + + currentStore.Close() +} + +func countCSVDataRows(csv string) int { + lines := strings.Split(csv, "\n") + dataStarted := false + count := 0 + for _, line := range lines { + if strings.HasPrefix(line, "# DATA") { + dataStarted = true + continue + } + if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" { + count++ + } + } + return count +} diff --git a/pkg/server/server.go b/pkg/server/server.go index aad50027e..88cb50b66 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -172,12 +172,16 @@ func Run(ctx context.Context, version string) error { }() } - // Initialize reporting engine if not already set by enterprise hooks - // This ensures Pro license holders get reporting even with the standard binary + // Initialize reporting engine if not already set by enterprise hooks. + // This ensures Pro license holders get reporting even with the standard binary. + // Uses a dynamic store getter so the engine always queries the current monitor's + // metrics store, even after monitor reloads (which close and recreate the store). if reporting.GetEngine() == nil { if store := reloadableMonitor.GetMonitor().GetMetricsStore(); store != nil { engine := reporting.NewReportEngine(reporting.EngineConfig{ - MetricsStore: store, + MetricsStoreGetter: func() *metrics.Store { + return reloadableMonitor.GetMonitor().GetMetricsStore() + }, }) reporting.SetEngine(engine) log.Info().Msg("Advanced Infrastructure Reporting (PDF/CSV) initialized")