mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-04-28 03:20:11 +00:00
Fix stale metrics store reference in reporting engine after monitor reload
The reporting engine held a direct pointer to the metrics store, which becomes invalid after a monitor reload (settings change, node config save, etc.) closes and recreates the store. Use a dynamic getter closure that always resolves to the current monitor's active store. Also adds diagnostic logging when report queries return zero metrics, and integration tests covering the full metrics-to-report pipeline including reload scenarios. Fixes #1186
This commit is contained in:
parent
e23a2a793b
commit
6427f28a08
3 changed files with 436 additions and 13 deletions
|
|
@ -11,28 +11,47 @@ import (
|
|||
// ReportEngine implements the reporting.Engine interface with
|
||||
// full CSV and PDF generation capabilities.
|
||||
type ReportEngine struct {
|
||||
metricsStore *metrics.Store
|
||||
csvGen *CSVGenerator
|
||||
pdfGen *PDFGenerator
|
||||
metricsStore *metrics.Store
|
||||
metricsStoreGetter func() *metrics.Store
|
||||
csvGen *CSVGenerator
|
||||
pdfGen *PDFGenerator
|
||||
}
|
||||
|
||||
// EngineConfig holds configuration for the report engine.
|
||||
type EngineConfig struct {
|
||||
// MetricsStore is a direct reference to the metrics store.
|
||||
// Use MetricsStoreGetter instead when the store may be replaced
|
||||
// (e.g., after monitor reloads).
|
||||
MetricsStore *metrics.Store
|
||||
|
||||
// MetricsStoreGetter dynamically resolves the current metrics store.
|
||||
// When set, this is used instead of MetricsStore, ensuring queries
|
||||
// always target the active store even after monitor reloads.
|
||||
MetricsStoreGetter func() *metrics.Store
|
||||
}
|
||||
|
||||
// NewReportEngine creates a new reporting engine.
|
||||
func NewReportEngine(cfg EngineConfig) *ReportEngine {
|
||||
return &ReportEngine{
|
||||
metricsStore: cfg.MetricsStore,
|
||||
csvGen: NewCSVGenerator(),
|
||||
pdfGen: NewPDFGenerator(),
|
||||
metricsStore: cfg.MetricsStore,
|
||||
metricsStoreGetter: cfg.MetricsStoreGetter,
|
||||
csvGen: NewCSVGenerator(),
|
||||
pdfGen: NewPDFGenerator(),
|
||||
}
|
||||
}
|
||||
|
||||
// getMetricsStore returns the current metrics store, preferring the dynamic
|
||||
// getter over the static reference.
|
||||
func (e *ReportEngine) getMetricsStore() *metrics.Store {
|
||||
if e.metricsStoreGetter != nil {
|
||||
return e.metricsStoreGetter()
|
||||
}
|
||||
return e.metricsStore
|
||||
}
|
||||
|
||||
// Generate creates a report in the specified format.
|
||||
func (e *ReportEngine) Generate(req MetricReportRequest) (data []byte, contentType string, err error) {
|
||||
if e.metricsStore == nil {
|
||||
if e.getMetricsStore() == nil {
|
||||
return nil, "", fmt.Errorf("metrics store not initialized")
|
||||
}
|
||||
|
||||
|
|
@ -140,12 +159,14 @@ func (e *ReportEngine) queryMetrics(req MetricReportRequest) (*ReportData, error
|
|||
data.Storage = req.Storage
|
||||
data.Disks = req.Disks
|
||||
|
||||
store := e.getMetricsStore()
|
||||
|
||||
var metricsMap map[string][]metrics.MetricPoint
|
||||
var err error
|
||||
|
||||
if req.MetricType != "" {
|
||||
// Query specific metric
|
||||
points, queryErr := e.metricsStore.Query(req.ResourceType, req.ResourceID, req.MetricType, req.Start, req.End, 0)
|
||||
points, queryErr := store.Query(req.ResourceType, req.ResourceID, req.MetricType, req.Start, req.End, 0)
|
||||
if queryErr != nil {
|
||||
return nil, queryErr
|
||||
}
|
||||
|
|
@ -154,12 +175,22 @@ func (e *ReportEngine) queryMetrics(req MetricReportRequest) (*ReportData, error
|
|||
}
|
||||
} else {
|
||||
// Query all metrics for the resource
|
||||
metricsMap, err = e.metricsStore.QueryAll(req.ResourceType, req.ResourceID, req.Start, req.End, 0)
|
||||
metricsMap, err = store.QueryAll(req.ResourceType, req.ResourceID, req.Start, req.End, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(metricsMap) == 0 {
|
||||
log.Warn().
|
||||
Str("resourceType", req.ResourceType).
|
||||
Str("resourceID", req.ResourceID).
|
||||
Str("metricType", req.MetricType).
|
||||
Time("start", req.Start).
|
||||
Time("end", req.End).
|
||||
Msg("Report query returned no metrics — verify resource ID matches stored metrics and time range contains data")
|
||||
}
|
||||
|
||||
// Convert to report format and calculate statistics
|
||||
for metricType, points := range metricsMap {
|
||||
if len(points) == 0 {
|
||||
|
|
@ -204,7 +235,7 @@ func (e *ReportEngine) queryMetrics(req MetricReportRequest) (*ReportData, error
|
|||
|
||||
// GenerateMulti creates a multi-resource report in the specified format.
|
||||
func (e *ReportEngine) GenerateMulti(req MultiReportRequest) (data []byte, contentType string, err error) {
|
||||
if e.metricsStore == nil {
|
||||
if e.getMetricsStore() == nil {
|
||||
return nil, "", fmt.Errorf("metrics store not initialized")
|
||||
}
|
||||
|
||||
|
|
|
|||
388
pkg/reporting/engine_integration_test.go
Normal file
388
pkg/reporting/engine_integration_test.go
Normal file
|
|
@ -0,0 +1,388 @@
|
|||
package reporting
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rcourtman/pulse-go-rewrite/pkg/metrics"
|
||||
)
|
||||
|
||||
// TestReportEngineWithMetricsStore verifies the full data pipeline from
|
||||
// metrics store writes through report generation. This is a regression test
|
||||
// for the zero-datapoints bug reported in GitHub issue #1186.
|
||||
func TestReportEngineWithMetricsStore(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := metrics.NewStore(metrics.StoreConfig{
|
||||
DBPath: filepath.Join(dir, "metrics.db"),
|
||||
WriteBufferSize: 10,
|
||||
FlushInterval: 100 * time.Millisecond,
|
||||
RetentionRaw: 24 * time.Hour,
|
||||
RetentionMinute: 7 * 24 * time.Hour,
|
||||
RetentionHourly: 30 * 24 * time.Hour,
|
||||
RetentionDaily: 90 * 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics store: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
engine := NewReportEngine(EngineConfig{MetricsStore: store})
|
||||
|
||||
// Write metrics data for a node over the last hour
|
||||
now := time.Now()
|
||||
nodeID := "pve-prod-pve1"
|
||||
for i := 0; i < 12; i++ {
|
||||
ts := now.Add(time.Duration(-60+i*5) * time.Minute)
|
||||
store.Write("node", nodeID, "cpu", float64(40+i*2), ts)
|
||||
store.Write("node", nodeID, "memory", float64(55+i), ts)
|
||||
}
|
||||
|
||||
// Flush to ensure data is written to SQLite
|
||||
store.Flush()
|
||||
// Brief pause to let the background writer process
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Generate a report for the last 2 hours (uses raw tier)
|
||||
req := MetricReportRequest{
|
||||
ResourceType: "node",
|
||||
ResourceID: nodeID,
|
||||
Start: now.Add(-2 * time.Hour),
|
||||
End: now.Add(time.Minute), // slight buffer
|
||||
Format: FormatCSV,
|
||||
}
|
||||
|
||||
data, contentType, err := engine.Generate(req)
|
||||
if err != nil {
|
||||
t.Fatalf("report generation failed: %v", err)
|
||||
}
|
||||
|
||||
if contentType != "text/csv" {
|
||||
t.Errorf("expected text/csv, got %s", contentType)
|
||||
}
|
||||
|
||||
csv := string(data)
|
||||
|
||||
// Verify report has data points (CSV uses display names)
|
||||
if !strings.Contains(csv, "CPU") {
|
||||
t.Error("report missing CPU metric data")
|
||||
}
|
||||
if !strings.Contains(csv, "Memory") {
|
||||
t.Error("report missing memory metric data")
|
||||
}
|
||||
|
||||
// Verify the report header includes the correct resource ID
|
||||
if !strings.Contains(csv, nodeID) {
|
||||
t.Errorf("report missing resource ID %s", nodeID)
|
||||
}
|
||||
|
||||
// Verify data rows exist (not just headers)
|
||||
lines := strings.Split(csv, "\n")
|
||||
dataStarted := false
|
||||
dataRows := 0
|
||||
for _, line := range lines {
|
||||
if strings.HasPrefix(line, "# DATA") {
|
||||
dataStarted = true
|
||||
continue
|
||||
}
|
||||
if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" {
|
||||
dataRows++
|
||||
}
|
||||
}
|
||||
|
||||
if dataRows == 0 {
|
||||
t.Fatal("report has zero data rows — metrics store data did not reach report output")
|
||||
}
|
||||
t.Logf("Report generated with %d data rows", dataRows)
|
||||
}
|
||||
|
||||
// TestReportEngineWithMetricsStore_VM verifies reports work for VM resources
|
||||
// with composite IDs (instance:node:vmid format).
|
||||
func TestReportEngineWithMetricsStore_VM(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := metrics.NewStore(metrics.StoreConfig{
|
||||
DBPath: filepath.Join(dir, "metrics.db"),
|
||||
WriteBufferSize: 10,
|
||||
FlushInterval: 100 * time.Millisecond,
|
||||
RetentionRaw: 24 * time.Hour,
|
||||
RetentionMinute: 7 * 24 * time.Hour,
|
||||
RetentionHourly: 30 * 24 * time.Hour,
|
||||
RetentionDaily: 90 * 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics store: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
engine := NewReportEngine(EngineConfig{MetricsStore: store})
|
||||
|
||||
now := time.Now()
|
||||
vmID := "pve-prod:pve1:100"
|
||||
for i := 0; i < 6; i++ {
|
||||
ts := now.Add(time.Duration(-30+i*5) * time.Minute)
|
||||
store.Write("vm", vmID, "cpu", float64(30+i*5), ts)
|
||||
store.Write("vm", vmID, "memory", float64(60+i*2), ts)
|
||||
store.Write("vm", vmID, "disk", float64(40+i), ts)
|
||||
}
|
||||
|
||||
store.Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
req := MetricReportRequest{
|
||||
ResourceType: "vm",
|
||||
ResourceID: vmID,
|
||||
Start: now.Add(-1 * time.Hour),
|
||||
End: now.Add(time.Minute),
|
||||
Format: FormatPDF,
|
||||
}
|
||||
|
||||
data, contentType, err := engine.Generate(req)
|
||||
if err != nil {
|
||||
t.Fatalf("PDF report generation failed: %v", err)
|
||||
}
|
||||
|
||||
if contentType != "application/pdf" {
|
||||
t.Errorf("expected application/pdf, got %s", contentType)
|
||||
}
|
||||
|
||||
if len(data) < 1000 {
|
||||
t.Errorf("PDF seems too small (%d bytes), likely has no data", len(data))
|
||||
}
|
||||
}
|
||||
|
||||
// TestReportEngineStaleStoreAfterClose verifies that querying a closed
|
||||
// metrics store returns an error rather than silently returning zero results.
|
||||
// This documents the behavior that causes blank reports after a monitor reload.
|
||||
func TestReportEngineStaleStoreAfterClose(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := metrics.NewStore(metrics.StoreConfig{
|
||||
DBPath: filepath.Join(dir, "metrics.db"),
|
||||
WriteBufferSize: 10,
|
||||
FlushInterval: 100 * time.Millisecond,
|
||||
RetentionRaw: 24 * time.Hour,
|
||||
RetentionMinute: 7 * 24 * time.Hour,
|
||||
RetentionHourly: 30 * 24 * time.Hour,
|
||||
RetentionDaily: 90 * 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics store: %v", err)
|
||||
}
|
||||
|
||||
engine := NewReportEngine(EngineConfig{MetricsStore: store})
|
||||
|
||||
// Write some data
|
||||
now := time.Now()
|
||||
store.Write("node", "test-node", "cpu", 50.0, now)
|
||||
store.Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Close the store (simulates what happens during monitor reload)
|
||||
store.Close()
|
||||
|
||||
// Attempt to generate a report with the stale store reference
|
||||
req := MetricReportRequest{
|
||||
ResourceType: "node",
|
||||
ResourceID: "test-node",
|
||||
Start: now.Add(-1 * time.Hour),
|
||||
End: now.Add(time.Minute),
|
||||
Format: FormatCSV,
|
||||
}
|
||||
|
||||
_, _, err = engine.Generate(req)
|
||||
// After a store is closed, the engine should return an error (not silent empty results)
|
||||
if err == nil {
|
||||
t.Log("Note: querying a closed store did not return an error — this means blank reports could occur silently after monitor reload")
|
||||
} else {
|
||||
t.Logf("Querying closed store returned error as expected: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReportEngineReplacedStore verifies that replacing the global engine
|
||||
// with a new one pointing to a fresh store produces reports with data.
|
||||
func TestReportEngineReplacedStore(t *testing.T) {
|
||||
dir1 := t.TempDir()
|
||||
store1, err := metrics.NewStore(metrics.StoreConfig{
|
||||
DBPath: filepath.Join(dir1, "metrics.db"),
|
||||
WriteBufferSize: 10,
|
||||
FlushInterval: 100 * time.Millisecond,
|
||||
RetentionRaw: 24 * time.Hour,
|
||||
RetentionMinute: 7 * 24 * time.Hour,
|
||||
RetentionHourly: 30 * 24 * time.Hour,
|
||||
RetentionDaily: 90 * 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create store1: %v", err)
|
||||
}
|
||||
|
||||
engine1 := NewReportEngine(EngineConfig{MetricsStore: store1})
|
||||
SetEngine(engine1)
|
||||
t.Cleanup(func() { SetEngine(nil) })
|
||||
|
||||
now := time.Now()
|
||||
store1.Write("node", "node-1", "cpu", 50.0, now)
|
||||
store1.Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Simulate reload: close old store, create new one (same DB file)
|
||||
store1.Close()
|
||||
|
||||
store2, err := metrics.NewStore(metrics.StoreConfig{
|
||||
DBPath: filepath.Join(dir1, "metrics.db"), // Same file
|
||||
WriteBufferSize: 10,
|
||||
FlushInterval: 100 * time.Millisecond,
|
||||
RetentionRaw: 24 * time.Hour,
|
||||
RetentionMinute: 7 * 24 * time.Hour,
|
||||
RetentionHourly: 30 * 24 * time.Hour,
|
||||
RetentionDaily: 90 * 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create store2: %v", err)
|
||||
}
|
||||
defer store2.Close()
|
||||
|
||||
// Write additional data via new store
|
||||
store2.Write("node", "node-1", "cpu", 55.0, now.Add(5*time.Minute))
|
||||
store2.Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Replace global engine with new store
|
||||
engine2 := NewReportEngine(EngineConfig{MetricsStore: store2})
|
||||
SetEngine(engine2)
|
||||
|
||||
req := MetricReportRequest{
|
||||
ResourceType: "node",
|
||||
ResourceID: "node-1",
|
||||
Start: now.Add(-1 * time.Hour),
|
||||
End: now.Add(1 * time.Hour),
|
||||
Format: FormatCSV,
|
||||
}
|
||||
|
||||
data, _, err := engine2.Generate(req)
|
||||
if err != nil {
|
||||
t.Fatalf("report generation failed after store replacement: %v", err)
|
||||
}
|
||||
|
||||
csv := string(data)
|
||||
lines := strings.Split(csv, "\n")
|
||||
dataStarted := false
|
||||
dataRows := 0
|
||||
for _, line := range lines {
|
||||
if strings.HasPrefix(line, "# DATA") {
|
||||
dataStarted = true
|
||||
continue
|
||||
}
|
||||
if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" {
|
||||
dataRows++
|
||||
}
|
||||
}
|
||||
|
||||
if dataRows == 0 {
|
||||
t.Fatal("report has zero data rows after store replacement — fix did not work")
|
||||
}
|
||||
t.Logf("After store replacement: report generated with %d data rows", dataRows)
|
||||
}
|
||||
|
||||
// TestReportEngineGetterSurvivesReload verifies that an engine configured with
|
||||
// MetricsStoreGetter automatically picks up a new store after the old one is
|
||||
// closed, without needing to recreate the engine. This is the key behavior
|
||||
// that prevents stale store references after monitor reloads.
|
||||
func TestReportEngineGetterSurvivesReload(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "metrics.db")
|
||||
|
||||
newStore := func() *metrics.Store {
|
||||
s, err := metrics.NewStore(metrics.StoreConfig{
|
||||
DBPath: dbPath,
|
||||
WriteBufferSize: 10,
|
||||
FlushInterval: 100 * time.Millisecond,
|
||||
RetentionRaw: 24 * time.Hour,
|
||||
RetentionMinute: 7 * 24 * time.Hour,
|
||||
RetentionHourly: 30 * 24 * time.Hour,
|
||||
RetentionDaily: 90 * 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create store: %v", err)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Simulate what reloadableMonitor.GetMonitor().GetMetricsStore() does:
|
||||
// returns whatever the current store is.
|
||||
var currentStore *metrics.Store
|
||||
currentStore = newStore()
|
||||
|
||||
engine := NewReportEngine(EngineConfig{
|
||||
MetricsStoreGetter: func() *metrics.Store {
|
||||
return currentStore
|
||||
},
|
||||
})
|
||||
|
||||
now := time.Now()
|
||||
req := MetricReportRequest{
|
||||
ResourceType: "node",
|
||||
ResourceID: "node-1",
|
||||
Start: now.Add(-1 * time.Hour),
|
||||
End: now.Add(1 * time.Hour),
|
||||
Format: FormatCSV,
|
||||
}
|
||||
|
||||
// Phase 1: Write data and generate a report — should work
|
||||
currentStore.Write("node", "node-1", "cpu", 50.0, now)
|
||||
currentStore.Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
data, _, err := engine.Generate(req)
|
||||
if err != nil {
|
||||
t.Fatalf("phase 1: report generation failed: %v", err)
|
||||
}
|
||||
rows1 := countCSVDataRows(string(data))
|
||||
if rows1 == 0 {
|
||||
t.Fatal("phase 1: expected data rows, got zero")
|
||||
}
|
||||
t.Logf("Phase 1 (before reload): %d data rows", rows1)
|
||||
|
||||
// Phase 2: Simulate reload — close old store, create new one
|
||||
currentStore.Close()
|
||||
currentStore = newStore()
|
||||
|
||||
// Write new data through the new store
|
||||
currentStore.Write("node", "node-1", "cpu", 60.0, now.Add(5*time.Minute))
|
||||
currentStore.Write("node", "node-1", "cpu", 70.0, now.Add(10*time.Minute))
|
||||
currentStore.Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Phase 3: Same engine instance, should use the new store via getter
|
||||
data, _, err = engine.Generate(req)
|
||||
if err != nil {
|
||||
t.Fatalf("phase 3: report generation failed after reload: %v", err)
|
||||
}
|
||||
rows3 := countCSVDataRows(string(data))
|
||||
if rows3 == 0 {
|
||||
t.Fatal("phase 3: report has zero data rows after reload — getter did not resolve new store")
|
||||
}
|
||||
t.Logf("Phase 3 (after reload): %d data rows", rows3)
|
||||
|
||||
// The new store should see ALL data (original + new) since it opens the same SQLite file
|
||||
if rows3 < rows1 {
|
||||
t.Errorf("phase 3: expected at least %d rows (pre-reload count), got %d", rows1, rows3)
|
||||
}
|
||||
|
||||
currentStore.Close()
|
||||
}
|
||||
|
||||
func countCSVDataRows(csv string) int {
|
||||
lines := strings.Split(csv, "\n")
|
||||
dataStarted := false
|
||||
count := 0
|
||||
for _, line := range lines {
|
||||
if strings.HasPrefix(line, "# DATA") {
|
||||
dataStarted = true
|
||||
continue
|
||||
}
|
||||
if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
|
@ -172,12 +172,16 @@ func Run(ctx context.Context, version string) error {
|
|||
}()
|
||||
}
|
||||
|
||||
// Initialize reporting engine if not already set by enterprise hooks
|
||||
// This ensures Pro license holders get reporting even with the standard binary
|
||||
// Initialize reporting engine if not already set by enterprise hooks.
|
||||
// This ensures Pro license holders get reporting even with the standard binary.
|
||||
// Uses a dynamic store getter so the engine always queries the current monitor's
|
||||
// metrics store, even after monitor reloads (which close and recreate the store).
|
||||
if reporting.GetEngine() == nil {
|
||||
if store := reloadableMonitor.GetMonitor().GetMetricsStore(); store != nil {
|
||||
engine := reporting.NewReportEngine(reporting.EngineConfig{
|
||||
MetricsStore: store,
|
||||
MetricsStoreGetter: func() *metrics.Store {
|
||||
return reloadableMonitor.GetMonitor().GetMetricsStore()
|
||||
},
|
||||
})
|
||||
reporting.SetEngine(engine)
|
||||
log.Info().Msg("Advanced Infrastructure Reporting (PDF/CSV) initialized")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue