From ee0e89871d916aa1eba0a5dbdd60f48dde285258 Mon Sep 17 00:00:00 2001 From: rcourtman Date: Wed, 4 Feb 2026 19:49:52 +0000 Subject: [PATCH] fix: reduce metrics memory 86x by reverting buffer and adding LTTB downsampling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The in-memory metrics buffer was changed from 1000 to 86400 points per metric to support 30-day sparklines, but this pre-allocated ~18 MB per guest (7 slices × 86400 × 32 bytes). With 50 guests that's 920 MB — explaining why users needed to double their LXC memory after upgrading to 5.1.0. - Revert in-memory buffer to 1000 points / 24h retention - Remove eager slice pre-allocation (use append growth instead) - Add LTTB (Largest Triangle Three Buckets) downsampling algorithm - Chart endpoints now use a two-tier strategy: in-memory for ranges ≤ 2h, SQLite persistent store + LTTB for longer ranges - Reduce frontend ring buffer from 86400 to 2000 points Related to #1190 --- docs/monitoring/METRICS_DATA_FLOW.md | 17 +-- frontend-modern/src/stores/metricsHistory.ts | 6 +- internal/api/router.go | 30 ++--- internal/monitoring/lttb.go | 86 +++++++++++++ internal/monitoring/lttb_test.go | 126 +++++++++++++++++++ internal/monitoring/metrics_history.go | 23 +--- internal/monitoring/monitor.go | 82 +++++++++++- 7 files changed, 319 insertions(+), 51 deletions(-) create mode 100644 internal/monitoring/lttb.go create mode 100644 internal/monitoring/lttb_test.go diff --git a/docs/monitoring/METRICS_DATA_FLOW.md b/docs/monitoring/METRICS_DATA_FLOW.md index 5a0e1f54b..3d06705cf 100644 --- a/docs/monitoring/METRICS_DATA_FLOW.md +++ b/docs/monitoring/METRICS_DATA_FLOW.md @@ -15,14 +15,9 @@ 2. `/api/metrics-store/history` (`handleMetricsHistory`) queries `metrics.Store` (`Query/QueryAll`) with tiered downsampling and license gating. 3. `GuestDrawer` History charts call `ChartsAPI.getMetricsHistory()` for CPU/memory/disk and ranges `24h/7d/30d/90d`. -## Audit notes / inconsistencies -- In-memory retention is `NewMetricsHistory(1000, 24h)` (`monitor.go`). At 30s samples, 1000 points is ~8.3h, so sparklines now cap at 8h to avoid over-promising. -- Sparkline UI ranges (`15m/1h/4h/8h`) are a subset of `TimeRange` support (`5m/15m/30m/1h/4h/8h/12h/7d`) and differ from History tab ranges (`24h/7d/30d/90d`). -- Sparkline ring buffer keeps 7d locally, but server seeding is effectively ~8h at 30s sampling (1000-point cap); longer spans require staying in sparklines mode without reload. -- Docker resource keys differ: in-memory uses `docker:` (via `handleCharts`), persistent store uses `resourceType=dockerContainer`. Mapping is handled client-side when building metric keys; keep consistent when adding resource types. The history API accepts `docker` as an alias for short-range fallback, but persistent data uses `dockerContainer`. - -## DB-backed `/api/charts` assessment -- Feasible approach: add a `source=metrics-store` param to `/api/charts`, enumerate resources from state, then query `metrics.Store` per resource. -- Cost: `N resources x M metric types` → `N*M` queries + SQLite I/O (single-writer). For large fleets this is likely heavier than the current in-memory path. -- Optimization needed for viability: add a bulk store query keyed by resource type/time range (grouped by `resource_id`, `metric_type`) or cache pre-aggregated slices. -- Recommendation: keep `/api/charts` in-memory for table-wide sparklines; use the metrics-store path for per-resource charts or small, explicit batches. +## Architecture notes +- In-memory retention is `NewMetricsHistory(1000, 24h)` (`monitor.go`). At 10s polling, 1000 points covers ~2.8h of data. +- `/api/charts` uses a two-tier strategy: ranges ≤ 2h are served from the in-memory buffer; longer ranges (4h, 8h, 24h, 7d, 30d) fall back to the SQLite persistent store with LTTB downsampling to ~500 points per metric. +- Frontend sparkline ring buffer keeps up to 8h locally (`metricsHistory.ts`). +- Docker resource keys differ: in-memory uses `docker:`, persistent store uses `resourceType=dockerContainer`. The `GetGuestMetricsForChart` method maps between these automatically. +- History charts in the guest drawer use `/api/metrics-store/history` (SQLite) for ranges `24h/7d/30d/90d`. diff --git a/frontend-modern/src/stores/metricsHistory.ts b/frontend-modern/src/stores/metricsHistory.ts index 92011f728..4df862ded 100644 --- a/frontend-modern/src/stores/metricsHistory.ts +++ b/frontend-modern/src/stores/metricsHistory.ts @@ -23,11 +23,11 @@ interface RingBuffer { } // Configuration -const MAX_AGE_MS = 30 * 24 * 60 * 60 * 1000; // 30 days (to support all time ranges) +const MAX_AGE_MS = 30 * 24 * 60 * 60 * 1000; // 30 days (age filter for all sparkline ranges) const SAMPLE_INTERVAL_MS = 30 * 1000; // 30 seconds -const MAX_POINTS = Math.ceil(MAX_AGE_MS / SAMPLE_INTERVAL_MS); // ~86400 points +const MAX_POINTS = 2000; // Ring buffer capacity: holds seed data + ~12h of live updates const STORAGE_KEY = 'pulse_metrics_history'; -const STORAGE_VERSION = 3; // Bumped version due to increased buffer size +const STORAGE_VERSION = 4; // Bumped: reduced buffer from 86400 to 2000 points /** * Convert TimeRange string to milliseconds diff --git a/internal/api/router.go b/internal/api/router.go index 9d11f041b..5dfc8873f 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -5086,8 +5086,8 @@ func (r *Router) handleCharts(w http.ResponseWriter, req *http.Request) { chartData[vm.ID] = make(VMChartData) } - // Get historical metrics - metrics := monitor.GetGuestMetrics(vm.ID, duration) + // Get historical metrics (falls back to SQLite + LTTB for long ranges) + metrics := monitor.GetGuestMetricsForChart(vm.ID, "vm", vm.ID, duration) // Convert metric points to API format for metricType, points := range metrics { @@ -5136,8 +5136,8 @@ func (r *Router) handleCharts(w http.ResponseWriter, req *http.Request) { chartData[ct.ID] = make(VMChartData) } - // Get historical metrics - metrics := monitor.GetGuestMetrics(ct.ID, duration) + // Get historical metrics (falls back to SQLite + LTTB for long ranges) + metrics := monitor.GetGuestMetricsForChart(ct.ID, "container", ct.ID, duration) // Convert metric points to API format for metricType, points := range metrics { @@ -5187,8 +5187,8 @@ func (r *Router) handleCharts(w http.ResponseWriter, req *http.Request) { storageData[storage.ID] = make(StorageChartData) } - // Get historical metrics - metrics := monitor.GetStorageMetrics(storage.ID, duration) + // Get historical metrics (falls back to SQLite + LTTB for long ranges) + metrics := monitor.GetStorageMetricsForChart(storage.ID, duration) // Convert usage metrics to chart format if usagePoints, ok := metrics["usage"]; ok && len(usagePoints) > 0 { @@ -5222,9 +5222,9 @@ func (r *Router) handleCharts(w http.ResponseWriter, req *http.Request) { nodeData[node.ID] = make(NodeChartData) } - // Get historical metrics for each type + // Get historical metrics for each type (falls back to SQLite + LTTB for long ranges) for _, metricType := range []string{"cpu", "memory", "disk"} { - points := monitor.GetNodeMetrics(node.ID, metricType, duration) + points := monitor.GetNodeMetricsForChart(node.ID, metricType, duration) nodeData[node.ID][metricType] = make([]MetricPoint, len(points)) for i, point := range points { ts := point.Timestamp.Unix() * 1000 @@ -5276,9 +5276,9 @@ func (r *Router) handleCharts(w http.ResponseWriter, req *http.Request) { dockerData[container.ID] = make(VMChartData) } - // Get historical metrics using the docker: prefix key + // Get historical metrics using the docker: prefix key (falls back to SQLite + LTTB for long ranges) metricKey := fmt.Sprintf("docker:%s", container.ID) - metrics := monitor.GetGuestMetrics(metricKey, duration) + metrics := monitor.GetGuestMetricsForChart(metricKey, "dockerContainer", container.ID, duration) // Convert metric points to API format for metricType, points := range metrics { @@ -5329,9 +5329,9 @@ func (r *Router) handleCharts(w http.ResponseWriter, req *http.Request) { dockerHostData[host.ID] = make(VMChartData) } - // Get historical metrics using the dockerHost: prefix key + // Get historical metrics using the dockerHost: prefix key (falls back to SQLite + LTTB for long ranges) metricKey := fmt.Sprintf("dockerHost:%s", host.ID) - metrics := monitor.GetGuestMetrics(metricKey, duration) + metrics := monitor.GetGuestMetricsForChart(metricKey, "dockerHost", host.ID, duration) // Convert metric points to API format for metricType, points := range metrics { @@ -5378,9 +5378,9 @@ func (r *Router) handleCharts(w http.ResponseWriter, req *http.Request) { hostData[host.ID] = make(VMChartData) } - // Get historical metrics using the host: prefix key + // Get historical metrics using the host: prefix key (falls back to SQLite + LTTB for long ranges) metricKey := fmt.Sprintf("host:%s", host.ID) - metrics := monitor.GetGuestMetrics(metricKey, duration) + metrics := monitor.GetGuestMetricsForChart(metricKey, "host", host.ID, duration) // Convert metric points to API format for metricType, points := range metrics { @@ -5477,7 +5477,7 @@ func (r *Router) handleStorageCharts(w http.ResponseWriter, req *http.Request) { storageData := make(StorageChartsResponse) for _, storage := range state.Storage { - metrics := monitor.GetStorageMetrics(storage.ID, duration) + metrics := monitor.GetStorageMetricsForChart(storage.ID, duration) storageData[storage.ID] = StorageMetrics{ Usage: metrics["usage"], diff --git a/internal/monitoring/lttb.go b/internal/monitoring/lttb.go new file mode 100644 index 000000000..f06cfc5f7 --- /dev/null +++ b/internal/monitoring/lttb.go @@ -0,0 +1,86 @@ +package monitoring + +import ( + "math" +) + +// lttb performs Largest Triangle Three Buckets downsampling on a slice of +// MetricPoints. It reduces data to targetPoints while preserving the visual +// shape of the data — peaks, valleys and trends are retained. +// +// If len(data) <= targetPoints or targetPoints < 3, data is returned as-is. +func lttb(data []MetricPoint, targetPoints int) []MetricPoint { + n := len(data) + if targetPoints >= n || targetPoints < 3 { + return data + } + + result := make([]MetricPoint, 0, targetPoints) + + // Always keep the first point. + result = append(result, data[0]) + + bucketSize := float64(n-2) / float64(targetPoints-2) + prevSelected := 0 + + for i := 0; i < targetPoints-2; i++ { + // Current bucket range. + bucketStart := int(math.Floor(float64(i)*bucketSize)) + 1 + bucketEnd := int(math.Floor(float64(i+1)*bucketSize)) + 1 + if bucketEnd > n-1 { + bucketEnd = n - 1 + } + + // Next bucket range — used to compute the "third point" average. + nextStart := bucketEnd + nextEnd := int(math.Floor(float64(i+2)*bucketSize)) + 1 + if nextEnd > n-1 { + nextEnd = n - 1 + } + if nextStart >= nextEnd { + nextEnd = nextStart + 1 + if nextEnd > n { + nextEnd = n + } + } + + // Average of next bucket (the "C" vertex of the triangle). + avgTs := float64(0) + avgVal := float64(0) + nextCount := nextEnd - nextStart + for j := nextStart; j < nextEnd; j++ { + avgTs += float64(data[j].Timestamp.UnixMilli()) + avgVal += data[j].Value + } + avgTs /= float64(nextCount) + avgVal /= float64(nextCount) + + // Previously selected point (the "A" vertex). + aTs := float64(data[prevSelected].Timestamp.UnixMilli()) + aVal := data[prevSelected].Value + + // Find the point in the current bucket that maximises the triangle area. + maxArea := float64(-1) + bestIdx := bucketStart + + for j := bucketStart; j < bucketEnd; j++ { + bTs := float64(data[j].Timestamp.UnixMilli()) + bVal := data[j].Value + + // Twice the triangle area (sign doesn't matter, we compare magnitudes). + area := math.Abs((aTs-avgTs)*(bVal-aVal) - (aTs-bTs)*(avgVal-aVal)) + if area > maxArea { + maxArea = area + bestIdx = j + } + } + + result = append(result, data[bestIdx]) + prevSelected = bestIdx + } + + // Always keep the last point. + result = append(result, data[n-1]) + + return result +} diff --git a/internal/monitoring/lttb_test.go b/internal/monitoring/lttb_test.go new file mode 100644 index 000000000..49a01b886 --- /dev/null +++ b/internal/monitoring/lttb_test.go @@ -0,0 +1,126 @@ +package monitoring + +import ( + "math" + "testing" + "time" +) + +func TestLTTB_PassthroughSmallData(t *testing.T) { + // Data smaller than target should be returned unchanged. + data := makeLinear(5, time.Now(), time.Second) + result := lttb(data, 10) + if len(result) != 5 { + t.Fatalf("expected 5 points, got %d", len(result)) + } +} + +func TestLTTB_PassthroughTargetLessThan3(t *testing.T) { + data := makeLinear(100, time.Now(), time.Second) + result := lttb(data, 2) + if len(result) != 100 { + t.Fatalf("expected passthrough for target<3, got %d", len(result)) + } +} + +func TestLTTB_ExactTarget(t *testing.T) { + data := makeLinear(50, time.Now(), time.Second) + result := lttb(data, 50) + if len(result) != 50 { + t.Fatalf("expected 50 points, got %d", len(result)) + } +} + +func TestLTTB_KeepsFirstAndLast(t *testing.T) { + data := makeLinear(100, time.Now(), time.Second) + result := lttb(data, 10) + if result[0] != data[0] { + t.Fatal("first point not preserved") + } + if result[len(result)-1] != data[len(data)-1] { + t.Fatal("last point not preserved") + } +} + +func TestLTTB_OutputLength(t *testing.T) { + data := makeLinear(1000, time.Now(), time.Second) + for _, target := range []int{3, 10, 50, 100, 200, 500} { + result := lttb(data, target) + if len(result) != target { + t.Errorf("target %d: got %d points", target, len(result)) + } + } +} + +func TestLTTB_PreservesPeak(t *testing.T) { + // Create data with a clear spike — LTTB should keep the peak. + start := time.Now() + data := make([]MetricPoint, 200) + for i := range data { + data[i] = MetricPoint{ + Value: 0, + Timestamp: start.Add(time.Duration(i) * time.Second), + } + } + // Insert a spike at position 100. + data[100].Value = 100 + + result := lttb(data, 20) + + // The spike should be preserved. + maxVal := float64(0) + for _, p := range result { + if p.Value > maxVal { + maxVal = p.Value + } + } + if maxVal != 100 { + t.Errorf("peak not preserved: max value in result = %f", maxVal) + } +} + +func TestLTTB_PreservesValley(t *testing.T) { + start := time.Now() + data := make([]MetricPoint, 200) + for i := range data { + data[i] = MetricPoint{ + Value: 50, + Timestamp: start.Add(time.Duration(i) * time.Second), + } + } + data[100].Value = 0 + + result := lttb(data, 20) + + minVal := math.MaxFloat64 + for _, p := range result { + if p.Value < minVal { + minVal = p.Value + } + } + if minVal != 0 { + t.Errorf("valley not preserved: min value in result = %f", minVal) + } +} + +func TestLTTB_MonotonicTimestamps(t *testing.T) { + data := makeLinear(500, time.Now(), time.Second) + result := lttb(data, 50) + for i := 1; i < len(result); i++ { + if !result[i].Timestamp.After(result[i-1].Timestamp) { + t.Fatalf("timestamps not monotonic at index %d", i) + } + } +} + +// makeLinear creates n linearly increasing MetricPoints. +func makeLinear(n int, start time.Time, interval time.Duration) []MetricPoint { + data := make([]MetricPoint, n) + for i := range data { + data[i] = MetricPoint{ + Value: float64(i), + Timestamp: start.Add(time.Duration(i) * interval), + } + } + return data +} diff --git a/internal/monitoring/metrics_history.go b/internal/monitoring/metrics_history.go index f2c4302ad..b488b5df3 100644 --- a/internal/monitoring/metrics_history.go +++ b/internal/monitoring/metrics_history.go @@ -68,15 +68,7 @@ func (mh *MetricsHistory) AddGuestMetric(guestID string, metricType string, valu // Initialize guest metrics if not exists if _, exists := mh.guestMetrics[guestID]; !exists { - mh.guestMetrics[guestID] = &GuestMetrics{ - CPU: make([]MetricPoint, 0, mh.maxDataPoints), - Memory: make([]MetricPoint, 0, mh.maxDataPoints), - Disk: make([]MetricPoint, 0, mh.maxDataPoints), - DiskRead: make([]MetricPoint, 0, mh.maxDataPoints), - DiskWrite: make([]MetricPoint, 0, mh.maxDataPoints), - NetworkIn: make([]MetricPoint, 0, mh.maxDataPoints), - NetworkOut: make([]MetricPoint, 0, mh.maxDataPoints), - } + mh.guestMetrics[guestID] = &GuestMetrics{} } metrics := mh.guestMetrics[guestID] @@ -108,11 +100,7 @@ func (mh *MetricsHistory) AddNodeMetric(nodeID string, metricType string, value // Initialize node metrics if not exists if _, exists := mh.nodeMetrics[nodeID]; !exists { - mh.nodeMetrics[nodeID] = &GuestMetrics{ - CPU: make([]MetricPoint, 0, mh.maxDataPoints), - Memory: make([]MetricPoint, 0, mh.maxDataPoints), - Disk: make([]MetricPoint, 0, mh.maxDataPoints), - } + mh.nodeMetrics[nodeID] = &GuestMetrics{} } metrics := mh.nodeMetrics[nodeID] @@ -277,12 +265,7 @@ func (mh *MetricsHistory) AddStorageMetric(storageID string, metricType string, // Initialize storage metrics if not exists if _, exists := mh.storageMetrics[storageID]; !exists { - mh.storageMetrics[storageID] = &StorageMetrics{ - Usage: make([]MetricPoint, 0, mh.maxDataPoints), - Used: make([]MetricPoint, 0, mh.maxDataPoints), - Total: make([]MetricPoint, 0, mh.maxDataPoints), - Avail: make([]MetricPoint, 0, mh.maxDataPoints), - } + mh.storageMetrics[storageID] = &StorageMetrics{} } metrics := mh.storageMetrics[storageID] diff --git a/internal/monitoring/monitor.go b/internal/monitoring/monitor.go index acaeab0e7..0292bd5c0 100644 --- a/internal/monitoring/monitor.go +++ b/internal/monitoring/monitor.go @@ -3719,8 +3719,8 @@ func New(cfg *config.Config) (*Monitor, error) { hostMetadataStore: config.NewHostMetadataStore(cfg.DataPath, nil), startTime: time.Now(), rateTracker: NewRateTracker(), - metricsHistory: NewMetricsHistory(86400, 30*24*time.Hour), // Keep up to 86400 points (30 days @ 30s) - metricsStore: metricsStore, // Persistent SQLite storage + metricsHistory: NewMetricsHistory(1000, 24*time.Hour), // Keep up to 1000 points (~8h @ 30s) + metricsStore: metricsStore, // Persistent SQLite storage alertManager: alerts.NewManagerWithDataDir(cfg.DataPath), incidentStore: incidentStore, notificationMgr: notifications.NewNotificationManagerWithDataDir(cfg.PublicURL, cfg.DataPath), @@ -8721,21 +8721,99 @@ func (m *Monitor) DisableTemperatureMonitoring() { log.Info().Msg("Temperature monitoring disabled") } +// inMemoryChartThreshold is the max duration reliably covered by the in-memory +// metrics buffer. With 1000 data-points at a 10 s polling interval the buffer +// holds ~2.8 h of data; 2 h is a safe conservative cut-off. +const inMemoryChartThreshold = 2 * time.Hour + +// chartDownsampleTarget is the number of points returned per metric when +// falling back to the persistent store. 500 is more than enough for any +// sparkline or thumbnail chart. +const chartDownsampleTarget = 500 + // GetGuestMetrics returns historical metrics for a guest func (m *Monitor) GetGuestMetrics(guestID string, duration time.Duration) map[string][]MetricPoint { return m.metricsHistory.GetAllGuestMetrics(guestID, duration) } +// GetGuestMetricsForChart returns guest metrics optimised for chart display. +// Short ranges are served from the in-memory ring buffer; longer ranges fall +// back to the persistent SQLite store with LTTB downsampling. +// +// inMemoryKey is the key used in the in-memory buffer (e.g. "docker:abc123"). +// sqlResourceType/sqlResourceID are the type/id used in the SQLite store +// (e.g. "dockerContainer"/"abc123"). +func (m *Monitor) GetGuestMetricsForChart(inMemoryKey, sqlResourceType, sqlResourceID string, duration time.Duration) map[string][]MetricPoint { + if duration <= inMemoryChartThreshold || m.metricsStore == nil { + return m.metricsHistory.GetAllGuestMetrics(inMemoryKey, duration) + } + end := time.Now() + start := end.Add(-duration) + sqlResult, err := m.metricsStore.QueryAll(sqlResourceType, sqlResourceID, start, end, 0) + if err != nil || len(sqlResult) == 0 { + return m.metricsHistory.GetAllGuestMetrics(inMemoryKey, duration) + } + return convertAndDownsample(sqlResult, chartDownsampleTarget) +} + // GetNodeMetrics returns historical metrics for a node func (m *Monitor) GetNodeMetrics(nodeID string, metricType string, duration time.Duration) []MetricPoint { return m.metricsHistory.GetNodeMetrics(nodeID, metricType, duration) } +// GetNodeMetricsForChart returns node metrics for a single metric type, +// falling back to SQLite + LTTB for longer ranges. +func (m *Monitor) GetNodeMetricsForChart(nodeID, metricType string, duration time.Duration) []MetricPoint { + if duration <= inMemoryChartThreshold || m.metricsStore == nil { + return m.metricsHistory.GetNodeMetrics(nodeID, metricType, duration) + } + end := time.Now() + start := end.Add(-duration) + sqlPoints, err := m.metricsStore.Query("node", nodeID, metricType, start, end, 0) + if err != nil || len(sqlPoints) == 0 { + return m.metricsHistory.GetNodeMetrics(nodeID, metricType, duration) + } + converted := make([]MetricPoint, len(sqlPoints)) + for i, p := range sqlPoints { + converted[i] = MetricPoint{Value: p.Value, Timestamp: p.Timestamp} + } + return lttb(converted, chartDownsampleTarget) +} + // GetStorageMetrics returns historical metrics for storage func (m *Monitor) GetStorageMetrics(storageID string, duration time.Duration) map[string][]MetricPoint { return m.metricsHistory.GetAllStorageMetrics(storageID, duration) } +// GetStorageMetricsForChart returns storage metrics, falling back to SQLite + +// LTTB for longer ranges. +func (m *Monitor) GetStorageMetricsForChart(storageID string, duration time.Duration) map[string][]MetricPoint { + if duration <= inMemoryChartThreshold || m.metricsStore == nil { + return m.metricsHistory.GetAllStorageMetrics(storageID, duration) + } + end := time.Now() + start := end.Add(-duration) + sqlResult, err := m.metricsStore.QueryAll("storage", storageID, start, end, 0) + if err != nil || len(sqlResult) == 0 { + return m.metricsHistory.GetAllStorageMetrics(storageID, duration) + } + return convertAndDownsample(sqlResult, chartDownsampleTarget) +} + +// convertAndDownsample converts pkg/metrics.MetricPoint slices to +// internal/types.MetricPoint slices and applies LTTB downsampling. +func convertAndDownsample(sqlResult map[string][]metrics.MetricPoint, target int) map[string][]MetricPoint { + result := make(map[string][]MetricPoint, len(sqlResult)) + for metric, points := range sqlResult { + converted := make([]MetricPoint, len(points)) + for i, p := range points { + converted[i] = MetricPoint{Value: p.Value, Timestamp: p.Timestamp} + } + result[metric] = lttb(converted, target) + } + return result +} + // GetAlertManager returns the alert manager func (m *Monitor) GetAlertManager() *alerts.Manager { return m.alertManager