diff --git a/internal/monitoring/mock_metrics_history.go b/internal/monitoring/mock_metrics_history.go index 95ea65740..7adbdcfc1 100644 --- a/internal/monitoring/mock_metrics_history.go +++ b/internal/monitoring/mock_metrics_history.go @@ -222,7 +222,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models. queueMetric("node", node.ID, "disk", node.Disk.Usage, now) } - recordGuest := func(metricID, storeType, storeID string, cpuPercent, memPercent, diskPercent float64) { + recordGuest := func(metricID, storeType, storeID string, cpuPercent, memPercent, diskPercent, diskRead, diskWrite, netIn, netOut float64, includeIO bool) { if metricID == "" || storeID == "" { return } @@ -231,6 +231,16 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models. cpuSeries := generateSeededSeries(cpuPercent, numPoints, hashSeed(storeType, storeID, "cpu"), 0, 100) memSeries := generateSeededSeries(memPercent, numPoints, hashSeed(storeType, storeID, "memory"), 0, 100) diskSeries := generateSeededSeries(diskPercent, numPoints, hashSeed(storeType, storeID, "disk"), 0, 100) + var diskReadSeries, diskWriteSeries, netInSeries, netOutSeries []float64 + if includeIO { + ioMax := func(value float64) float64 { + return math.Max(value*1.8, 1) + } + diskReadSeries = generateSeededSeries(diskRead, numPoints, hashSeed(storeType, storeID, "diskread"), 0, ioMax(diskRead)) + diskWriteSeries = generateSeededSeries(diskWrite, numPoints, hashSeed(storeType, storeID, "diskwrite"), 0, ioMax(diskWrite)) + netInSeries = generateSeededSeries(netIn, numPoints, hashSeed(storeType, storeID, "netin"), 0, ioMax(netIn)) + netOutSeries = generateSeededSeries(netOut, numPoints, hashSeed(storeType, storeID, "netout"), 0, ioMax(netOut)) + } startTime := now.Add(-seedDuration) for i := 0; i < numPoints; i++ { @@ -241,6 +251,16 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models. queueMetric(storeType, storeID, "cpu", cpuSeries[i], ts) queueMetric(storeType, storeID, "memory", memSeries[i], ts) queueMetric(storeType, storeID, "disk", diskSeries[i], ts) + if includeIO { + mh.AddGuestMetric(metricID, "diskread", diskReadSeries[i], ts) + mh.AddGuestMetric(metricID, "diskwrite", diskWriteSeries[i], ts) + mh.AddGuestMetric(metricID, "netin", netInSeries[i], ts) + mh.AddGuestMetric(metricID, "netout", netOutSeries[i], ts) + queueMetric(storeType, storeID, "diskread", diskReadSeries[i], ts) + queueMetric(storeType, storeID, "diskwrite", diskWriteSeries[i], ts) + queueMetric(storeType, storeID, "netin", netInSeries[i], ts) + queueMetric(storeType, storeID, "netout", netOutSeries[i], ts) + } } // Ensure the latest point lands at "now" for full-range charts. @@ -250,6 +270,16 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models. queueMetric(storeType, storeID, "cpu", cpuPercent, now) queueMetric(storeType, storeID, "memory", memPercent, now) queueMetric(storeType, storeID, "disk", diskPercent, now) + if includeIO { + mh.AddGuestMetric(metricID, "diskread", diskRead, now) + mh.AddGuestMetric(metricID, "diskwrite", diskWrite, now) + mh.AddGuestMetric(metricID, "netin", netIn, now) + mh.AddGuestMetric(metricID, "netout", netOut, now) + queueMetric(storeType, storeID, "diskread", diskRead, now) + queueMetric(storeType, storeID, "diskwrite", diskWrite, now) + queueMetric(storeType, storeID, "netin", netIn, now) + queueMetric(storeType, storeID, "netout", netOut, now) + } } for _, node := range state.Nodes { @@ -261,7 +291,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models. if vm.Status != "running" { continue } - recordGuest(vm.ID, "vm", vm.ID, vm.CPU*100, vm.Memory.Usage, vm.Disk.Usage) + recordGuest(vm.ID, "vm", vm.ID, vm.CPU*100, vm.Memory.Usage, vm.Disk.Usage, float64(vm.DiskRead), float64(vm.DiskWrite), float64(vm.NetworkIn), float64(vm.NetworkOut), true) time.Sleep(200 * time.Millisecond) } @@ -269,7 +299,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models. if ct.Status != "running" { continue } - recordGuest(ct.ID, "container", ct.ID, ct.CPU*100, ct.Memory.Usage, ct.Disk.Usage) + recordGuest(ct.ID, "container", ct.ID, ct.CPU*100, ct.Memory.Usage, ct.Disk.Usage, float64(ct.DiskRead), float64(ct.DiskWrite), float64(ct.NetworkIn), float64(ct.NetworkOut), true) time.Sleep(200 * time.Millisecond) } @@ -310,7 +340,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models. diskPercent = float64(usedTotal) / float64(totalTotal) * 100 } - recordGuest("dockerHost:"+host.ID, "dockerHost", host.ID, host.CPUUsage, host.Memory.Usage, diskPercent) + recordGuest("dockerHost:"+host.ID, "dockerHost", host.ID, host.CPUUsage, host.Memory.Usage, diskPercent, 0, 0, 0, 0, false) for _, container := range host.Containers { if container.ID == "" || container.State != "running" { @@ -322,7 +352,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models. containerDisk = float64(container.WritableLayerBytes) / float64(container.RootFilesystemBytes) * 100 containerDisk = clampFloat(containerDisk, 0, 100) } - recordGuest("docker:"+container.ID, "docker", container.ID, container.CPUPercent, container.MemoryPercent, containerDisk) + recordGuest("docker:"+container.ID, "docker", container.ID, container.CPUPercent, container.MemoryPercent, containerDisk, 0, 0, 0, 0, false) } } @@ -367,6 +397,10 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, ms *metrics.Store, stat ms.Write("vm", vm.ID, "cpu", vm.CPU*100, ts) ms.Write("vm", vm.ID, "memory", vm.Memory.Usage, ts) ms.Write("vm", vm.ID, "disk", vm.Disk.Usage, ts) + ms.Write("vm", vm.ID, "diskread", float64(vm.DiskRead), ts) + ms.Write("vm", vm.ID, "diskwrite", float64(vm.DiskWrite), ts) + ms.Write("vm", vm.ID, "netin", float64(vm.NetworkIn), ts) + ms.Write("vm", vm.ID, "netout", float64(vm.NetworkOut), ts) } } @@ -386,6 +420,10 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, ms *metrics.Store, stat ms.Write("container", ct.ID, "cpu", ct.CPU*100, ts) ms.Write("container", ct.ID, "memory", ct.Memory.Usage, ts) ms.Write("container", ct.ID, "disk", ct.Disk.Usage, ts) + ms.Write("container", ct.ID, "diskread", float64(ct.DiskRead), ts) + ms.Write("container", ct.ID, "diskwrite", float64(ct.DiskWrite), ts) + ms.Write("container", ct.ID, "netin", float64(ct.NetworkIn), ts) + ms.Write("container", ct.ID, "netout", float64(ct.NetworkOut), ts) } } diff --git a/internal/monitoring/monitor.go b/internal/monitoring/monitor.go index b5a08ae13..c169fa94c 100644 --- a/internal/monitoring/monitor.go +++ b/internal/monitoring/monitor.go @@ -7196,6 +7196,18 @@ func (m *Monitor) pollVMsAndContainersEfficient(ctx context.Context, instanceNam if vm.Disk.Usage >= 0 { m.metricsHistory.AddGuestMetric(vm.ID, "disk", vm.Disk.Usage, now) } + if vm.DiskRead >= 0 { + m.metricsHistory.AddGuestMetric(vm.ID, "diskread", float64(vm.DiskRead), now) + } + if vm.DiskWrite >= 0 { + m.metricsHistory.AddGuestMetric(vm.ID, "diskwrite", float64(vm.DiskWrite), now) + } + if vm.NetworkIn >= 0 { + m.metricsHistory.AddGuestMetric(vm.ID, "netin", float64(vm.NetworkIn), now) + } + if vm.NetworkOut >= 0 { + m.metricsHistory.AddGuestMetric(vm.ID, "netout", float64(vm.NetworkOut), now) + } // Also write to persistent store if m.metricsStore != nil { m.metricsStore.Write("vm", vm.ID, "cpu", vm.CPU*100, now) @@ -7203,6 +7215,18 @@ func (m *Monitor) pollVMsAndContainersEfficient(ctx context.Context, instanceNam if vm.Disk.Usage >= 0 { m.metricsStore.Write("vm", vm.ID, "disk", vm.Disk.Usage, now) } + if vm.DiskRead >= 0 { + m.metricsStore.Write("vm", vm.ID, "diskread", float64(vm.DiskRead), now) + } + if vm.DiskWrite >= 0 { + m.metricsStore.Write("vm", vm.ID, "diskwrite", float64(vm.DiskWrite), now) + } + if vm.NetworkIn >= 0 { + m.metricsStore.Write("vm", vm.ID, "netin", float64(vm.NetworkIn), now) + } + if vm.NetworkOut >= 0 { + m.metricsStore.Write("vm", vm.ID, "netout", float64(vm.NetworkOut), now) + } } } } @@ -7213,6 +7237,18 @@ func (m *Monitor) pollVMsAndContainersEfficient(ctx context.Context, instanceNam if ct.Disk.Usage >= 0 { m.metricsHistory.AddGuestMetric(ct.ID, "disk", ct.Disk.Usage, now) } + if ct.DiskRead >= 0 { + m.metricsHistory.AddGuestMetric(ct.ID, "diskread", float64(ct.DiskRead), now) + } + if ct.DiskWrite >= 0 { + m.metricsHistory.AddGuestMetric(ct.ID, "diskwrite", float64(ct.DiskWrite), now) + } + if ct.NetworkIn >= 0 { + m.metricsHistory.AddGuestMetric(ct.ID, "netin", float64(ct.NetworkIn), now) + } + if ct.NetworkOut >= 0 { + m.metricsHistory.AddGuestMetric(ct.ID, "netout", float64(ct.NetworkOut), now) + } // Also write to persistent store if m.metricsStore != nil { m.metricsStore.Write("container", ct.ID, "cpu", ct.CPU*100, now) @@ -7220,6 +7256,18 @@ func (m *Monitor) pollVMsAndContainersEfficient(ctx context.Context, instanceNam if ct.Disk.Usage >= 0 { m.metricsStore.Write("container", ct.ID, "disk", ct.Disk.Usage, now) } + if ct.DiskRead >= 0 { + m.metricsStore.Write("container", ct.ID, "diskread", float64(ct.DiskRead), now) + } + if ct.DiskWrite >= 0 { + m.metricsStore.Write("container", ct.ID, "diskwrite", float64(ct.DiskWrite), now) + } + if ct.NetworkIn >= 0 { + m.metricsStore.Write("container", ct.ID, "netin", float64(ct.NetworkIn), now) + } + if ct.NetworkOut >= 0 { + m.metricsStore.Write("container", ct.ID, "netout", float64(ct.NetworkOut), now) + } } } } diff --git a/pkg/metrics/store.go b/pkg/metrics/store.go index 443efb134..532242743 100644 --- a/pkg/metrics/store.go +++ b/pkg/metrics/store.go @@ -305,9 +305,33 @@ func (s *Store) writeBatch(metrics []bufferedMetric) { // Query retrieves metrics for a resource within a time range, with optional downsampling func (s *Store) Query(resourceType, resourceID, metricType string, start, end time.Time, stepSecs int64) ([]MetricPoint, error) { - // Select appropriate tier based on time range - tier := s.selectTier(end.Sub(start)) + tiers := s.tierFallbacks(end.Sub(start)) + if len(tiers) == 0 { + return []MetricPoint{}, nil + } + for i, tier := range tiers { + points, err := s.queryWithTier(resourceType, resourceID, metricType, start, end, stepSecs, tier) + if err != nil { + return nil, err + } + if len(points) > 0 || i == len(tiers)-1 { + return points, nil + } + + log.Debug(). + Str("resourceType", resourceType). + Str("resourceId", resourceID). + Str("metric", metricType). + Str("fromTier", string(tier)). + Str("toTier", string(tiers[i+1])). + Msg("Metrics query empty; falling back to more detailed tier") + } + + return []MetricPoint{}, nil +} + +func (s *Store) queryWithTier(resourceType, resourceID, metricType string, start, end time.Time, stepSecs int64, tier Tier) ([]MetricPoint, error) { var rows *sql.Rows var err error @@ -323,7 +347,7 @@ func (s *Store) Query(resourceType, resourceID, metricType string, start, end ti if stepSecs > 1 { sqlQuery = ` SELECT - (timestamp / ?) * ? as bucket_ts, + (timestamp / ?) * ? + (? / 2) as bucket_ts, AVG(value), MIN(COALESCE(min_value, value)), MAX(COALESCE(max_value, value)) @@ -334,7 +358,7 @@ func (s *Store) Query(resourceType, resourceID, metricType string, start, end ti ORDER BY bucket_ts ASC ` queryParams = []interface{}{ - stepSecs, stepSecs, + stepSecs, stepSecs, stepSecs, resourceType, resourceID, metricType, string(tier), start.Unix(), end.Unix(), } } @@ -371,8 +395,57 @@ func (s *Store) Query(resourceType, resourceID, metricType string, start, end ti // QueryAll retrieves all metric types for a resource within a time range, with optional downsampling func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time, stepSecs int64) (map[string][]MetricPoint, error) { - tier := s.selectTier(end.Sub(start)) + tiers := s.tierFallbacks(end.Sub(start)) + if len(tiers) == 0 { + return map[string][]MetricPoint{}, nil + } + result := make(map[string][]MetricPoint) + for i, tier := range tiers { + tierResult, err := s.queryAllWithTier(resourceType, resourceID, start, end, stepSecs, tier) + if err != nil { + return nil, err + } + if len(tierResult) == 0 { + if i < len(tiers)-1 && len(result) == 0 { + log.Debug(). + Str("resourceType", resourceType). + Str("resourceId", resourceID). + Str("fromTier", string(tier)). + Str("toTier", string(tiers[i+1])). + Msg("Metrics query empty; falling back to more detailed tier") + } + continue + } + + // Merge in any metrics missing from higher tier results. + added := 0 + for metric, points := range tierResult { + if len(points) == 0 { + continue + } + if existing, ok := result[metric]; !ok || len(existing) == 0 { + result[metric] = points + added++ + } + } + + // If we already have some metrics and this tier didn't add anything new, + // keep going in case lower tiers have newly introduced metrics. + if added == 0 && i < len(tiers)-1 && len(result) == 0 { + log.Debug(). + Str("resourceType", resourceType). + Str("resourceId", resourceID). + Str("fromTier", string(tier)). + Str("toTier", string(tiers[i+1])). + Msg("Metrics query empty; falling back to more detailed tier") + } + } + + return result, nil +} + +func (s *Store) queryAllWithTier(resourceType, resourceID string, start, end time.Time, stepSecs int64, tier Tier) (map[string][]MetricPoint, error) { var rows *sql.Rows var err error @@ -439,13 +512,13 @@ func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time, // selectTier chooses the appropriate data tier based on time range // Note: Tier selection uses fixed thresholds to ensure queries use tiers with complete data: // - Raw: up to 2 hours (high-resolution real-time data) -// - Minute: up to 6 hours (recent detailed data) +// - Minute: up to 24 hours (recent detailed data) // - Hourly: up to 7 days (medium-term with mock/seeded data coverage) // - Daily: beyond 7 days (long-term historical data) func (s *Store) selectTier(duration time.Duration) Tier { const ( rawThreshold = 2 * time.Hour - minuteThreshold = 6 * time.Hour // 24h queries use hourly tier which has complete historical data + minuteThreshold = 24 * time.Hour hourlyThreshold = 7 * 24 * time.Hour ) @@ -461,6 +534,21 @@ func (s *Store) selectTier(duration time.Duration) Tier { } } +func (s *Store) tierFallbacks(duration time.Duration) []Tier { + switch s.selectTier(duration) { + case TierRaw: + return []Tier{TierRaw} + case TierMinute: + return []Tier{TierMinute, TierRaw} + case TierHourly: + return []Tier{TierHourly, TierMinute, TierRaw} + case TierDaily: + return []Tier{TierDaily, TierHourly, TierMinute, TierRaw} + default: + return []Tier{TierRaw} + } +} + // backgroundWorker runs periodic tasks func (s *Store) backgroundWorker() { defer close(s.doneCh) diff --git a/pkg/metrics/store_test.go b/pkg/metrics/store_test.go index 60fb63195..08b97639a 100644 --- a/pkg/metrics/store_test.go +++ b/pkg/metrics/store_test.go @@ -100,6 +100,39 @@ func TestStoreSelectTierAndStats(t *testing.T) { } } +func TestStoreQueryFallbacksToRaw(t *testing.T) { + dir := t.TempDir() + cfg := DefaultConfig(dir) + cfg.DBPath = filepath.Join(dir, "metrics-test.db") + cfg.FlushInterval = time.Hour + + store, err := NewStore(cfg) + if err != nil { + t.Fatalf("NewStore returned error: %v", err) + } + defer store.Close() + + ts := time.Now() + store.WriteWithTier("vm", "vm-101", "cpu", 42.0, ts, TierRaw) + store.Flush() + + points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-24*time.Hour), ts.Add(time.Second), 0) + if err != nil { + t.Fatalf("Query returned error: %v", err) + } + if len(points) != 1 { + t.Fatalf("expected 1 point, got %d", len(points)) + } + + all, err := store.QueryAll("vm", "vm-101", ts.Add(-24*time.Hour), ts.Add(time.Second), 0) + if err != nil { + t.Fatalf("QueryAll returned error: %v", err) + } + if len(all["cpu"]) != 1 { + t.Fatalf("expected QueryAll to return 1 cpu point, got %+v", all) + } +} + func TestStoreRollupTier(t *testing.T) { dir := t.TempDir() cfg := DefaultConfig(dir)