feat: add metrics store point limiting and mock improvements

- Add point limiting to metrics queries
- Improve mock metrics history for testing
- Add monitor enhancements
This commit is contained in:
rcourtman 2026-01-22 22:29:56 +00:00
parent c2f43c995e
commit 8963d69764
4 changed files with 219 additions and 12 deletions

View file

@ -222,7 +222,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.
queueMetric("node", node.ID, "disk", node.Disk.Usage, now)
}
recordGuest := func(metricID, storeType, storeID string, cpuPercent, memPercent, diskPercent float64) {
recordGuest := func(metricID, storeType, storeID string, cpuPercent, memPercent, diskPercent, diskRead, diskWrite, netIn, netOut float64, includeIO bool) {
if metricID == "" || storeID == "" {
return
}
@ -231,6 +231,16 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.
cpuSeries := generateSeededSeries(cpuPercent, numPoints, hashSeed(storeType, storeID, "cpu"), 0, 100)
memSeries := generateSeededSeries(memPercent, numPoints, hashSeed(storeType, storeID, "memory"), 0, 100)
diskSeries := generateSeededSeries(diskPercent, numPoints, hashSeed(storeType, storeID, "disk"), 0, 100)
var diskReadSeries, diskWriteSeries, netInSeries, netOutSeries []float64
if includeIO {
ioMax := func(value float64) float64 {
return math.Max(value*1.8, 1)
}
diskReadSeries = generateSeededSeries(diskRead, numPoints, hashSeed(storeType, storeID, "diskread"), 0, ioMax(diskRead))
diskWriteSeries = generateSeededSeries(diskWrite, numPoints, hashSeed(storeType, storeID, "diskwrite"), 0, ioMax(diskWrite))
netInSeries = generateSeededSeries(netIn, numPoints, hashSeed(storeType, storeID, "netin"), 0, ioMax(netIn))
netOutSeries = generateSeededSeries(netOut, numPoints, hashSeed(storeType, storeID, "netout"), 0, ioMax(netOut))
}
startTime := now.Add(-seedDuration)
for i := 0; i < numPoints; i++ {
@ -241,6 +251,16 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.
queueMetric(storeType, storeID, "cpu", cpuSeries[i], ts)
queueMetric(storeType, storeID, "memory", memSeries[i], ts)
queueMetric(storeType, storeID, "disk", diskSeries[i], ts)
if includeIO {
mh.AddGuestMetric(metricID, "diskread", diskReadSeries[i], ts)
mh.AddGuestMetric(metricID, "diskwrite", diskWriteSeries[i], ts)
mh.AddGuestMetric(metricID, "netin", netInSeries[i], ts)
mh.AddGuestMetric(metricID, "netout", netOutSeries[i], ts)
queueMetric(storeType, storeID, "diskread", diskReadSeries[i], ts)
queueMetric(storeType, storeID, "diskwrite", diskWriteSeries[i], ts)
queueMetric(storeType, storeID, "netin", netInSeries[i], ts)
queueMetric(storeType, storeID, "netout", netOutSeries[i], ts)
}
}
// Ensure the latest point lands at "now" for full-range charts.
@ -250,6 +270,16 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.
queueMetric(storeType, storeID, "cpu", cpuPercent, now)
queueMetric(storeType, storeID, "memory", memPercent, now)
queueMetric(storeType, storeID, "disk", diskPercent, now)
if includeIO {
mh.AddGuestMetric(metricID, "diskread", diskRead, now)
mh.AddGuestMetric(metricID, "diskwrite", diskWrite, now)
mh.AddGuestMetric(metricID, "netin", netIn, now)
mh.AddGuestMetric(metricID, "netout", netOut, now)
queueMetric(storeType, storeID, "diskread", diskRead, now)
queueMetric(storeType, storeID, "diskwrite", diskWrite, now)
queueMetric(storeType, storeID, "netin", netIn, now)
queueMetric(storeType, storeID, "netout", netOut, now)
}
}
for _, node := range state.Nodes {
@ -261,7 +291,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.
if vm.Status != "running" {
continue
}
recordGuest(vm.ID, "vm", vm.ID, vm.CPU*100, vm.Memory.Usage, vm.Disk.Usage)
recordGuest(vm.ID, "vm", vm.ID, vm.CPU*100, vm.Memory.Usage, vm.Disk.Usage, float64(vm.DiskRead), float64(vm.DiskWrite), float64(vm.NetworkIn), float64(vm.NetworkOut), true)
time.Sleep(200 * time.Millisecond)
}
@ -269,7 +299,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.
if ct.Status != "running" {
continue
}
recordGuest(ct.ID, "container", ct.ID, ct.CPU*100, ct.Memory.Usage, ct.Disk.Usage)
recordGuest(ct.ID, "container", ct.ID, ct.CPU*100, ct.Memory.Usage, ct.Disk.Usage, float64(ct.DiskRead), float64(ct.DiskWrite), float64(ct.NetworkIn), float64(ct.NetworkOut), true)
time.Sleep(200 * time.Millisecond)
}
@ -310,7 +340,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.
diskPercent = float64(usedTotal) / float64(totalTotal) * 100
}
recordGuest("dockerHost:"+host.ID, "dockerHost", host.ID, host.CPUUsage, host.Memory.Usage, diskPercent)
recordGuest("dockerHost:"+host.ID, "dockerHost", host.ID, host.CPUUsage, host.Memory.Usage, diskPercent, 0, 0, 0, 0, false)
for _, container := range host.Containers {
if container.ID == "" || container.State != "running" {
@ -322,7 +352,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.
containerDisk = float64(container.WritableLayerBytes) / float64(container.RootFilesystemBytes) * 100
containerDisk = clampFloat(containerDisk, 0, 100)
}
recordGuest("docker:"+container.ID, "docker", container.ID, container.CPUPercent, container.MemoryPercent, containerDisk)
recordGuest("docker:"+container.ID, "docker", container.ID, container.CPUPercent, container.MemoryPercent, containerDisk, 0, 0, 0, 0, false)
}
}
@ -367,6 +397,10 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, ms *metrics.Store, stat
ms.Write("vm", vm.ID, "cpu", vm.CPU*100, ts)
ms.Write("vm", vm.ID, "memory", vm.Memory.Usage, ts)
ms.Write("vm", vm.ID, "disk", vm.Disk.Usage, ts)
ms.Write("vm", vm.ID, "diskread", float64(vm.DiskRead), ts)
ms.Write("vm", vm.ID, "diskwrite", float64(vm.DiskWrite), ts)
ms.Write("vm", vm.ID, "netin", float64(vm.NetworkIn), ts)
ms.Write("vm", vm.ID, "netout", float64(vm.NetworkOut), ts)
}
}
@ -386,6 +420,10 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, ms *metrics.Store, stat
ms.Write("container", ct.ID, "cpu", ct.CPU*100, ts)
ms.Write("container", ct.ID, "memory", ct.Memory.Usage, ts)
ms.Write("container", ct.ID, "disk", ct.Disk.Usage, ts)
ms.Write("container", ct.ID, "diskread", float64(ct.DiskRead), ts)
ms.Write("container", ct.ID, "diskwrite", float64(ct.DiskWrite), ts)
ms.Write("container", ct.ID, "netin", float64(ct.NetworkIn), ts)
ms.Write("container", ct.ID, "netout", float64(ct.NetworkOut), ts)
}
}

View file

@ -7196,6 +7196,18 @@ func (m *Monitor) pollVMsAndContainersEfficient(ctx context.Context, instanceNam
if vm.Disk.Usage >= 0 {
m.metricsHistory.AddGuestMetric(vm.ID, "disk", vm.Disk.Usage, now)
}
if vm.DiskRead >= 0 {
m.metricsHistory.AddGuestMetric(vm.ID, "diskread", float64(vm.DiskRead), now)
}
if vm.DiskWrite >= 0 {
m.metricsHistory.AddGuestMetric(vm.ID, "diskwrite", float64(vm.DiskWrite), now)
}
if vm.NetworkIn >= 0 {
m.metricsHistory.AddGuestMetric(vm.ID, "netin", float64(vm.NetworkIn), now)
}
if vm.NetworkOut >= 0 {
m.metricsHistory.AddGuestMetric(vm.ID, "netout", float64(vm.NetworkOut), now)
}
// Also write to persistent store
if m.metricsStore != nil {
m.metricsStore.Write("vm", vm.ID, "cpu", vm.CPU*100, now)
@ -7203,6 +7215,18 @@ func (m *Monitor) pollVMsAndContainersEfficient(ctx context.Context, instanceNam
if vm.Disk.Usage >= 0 {
m.metricsStore.Write("vm", vm.ID, "disk", vm.Disk.Usage, now)
}
if vm.DiskRead >= 0 {
m.metricsStore.Write("vm", vm.ID, "diskread", float64(vm.DiskRead), now)
}
if vm.DiskWrite >= 0 {
m.metricsStore.Write("vm", vm.ID, "diskwrite", float64(vm.DiskWrite), now)
}
if vm.NetworkIn >= 0 {
m.metricsStore.Write("vm", vm.ID, "netin", float64(vm.NetworkIn), now)
}
if vm.NetworkOut >= 0 {
m.metricsStore.Write("vm", vm.ID, "netout", float64(vm.NetworkOut), now)
}
}
}
}
@ -7213,6 +7237,18 @@ func (m *Monitor) pollVMsAndContainersEfficient(ctx context.Context, instanceNam
if ct.Disk.Usage >= 0 {
m.metricsHistory.AddGuestMetric(ct.ID, "disk", ct.Disk.Usage, now)
}
if ct.DiskRead >= 0 {
m.metricsHistory.AddGuestMetric(ct.ID, "diskread", float64(ct.DiskRead), now)
}
if ct.DiskWrite >= 0 {
m.metricsHistory.AddGuestMetric(ct.ID, "diskwrite", float64(ct.DiskWrite), now)
}
if ct.NetworkIn >= 0 {
m.metricsHistory.AddGuestMetric(ct.ID, "netin", float64(ct.NetworkIn), now)
}
if ct.NetworkOut >= 0 {
m.metricsHistory.AddGuestMetric(ct.ID, "netout", float64(ct.NetworkOut), now)
}
// Also write to persistent store
if m.metricsStore != nil {
m.metricsStore.Write("container", ct.ID, "cpu", ct.CPU*100, now)
@ -7220,6 +7256,18 @@ func (m *Monitor) pollVMsAndContainersEfficient(ctx context.Context, instanceNam
if ct.Disk.Usage >= 0 {
m.metricsStore.Write("container", ct.ID, "disk", ct.Disk.Usage, now)
}
if ct.DiskRead >= 0 {
m.metricsStore.Write("container", ct.ID, "diskread", float64(ct.DiskRead), now)
}
if ct.DiskWrite >= 0 {
m.metricsStore.Write("container", ct.ID, "diskwrite", float64(ct.DiskWrite), now)
}
if ct.NetworkIn >= 0 {
m.metricsStore.Write("container", ct.ID, "netin", float64(ct.NetworkIn), now)
}
if ct.NetworkOut >= 0 {
m.metricsStore.Write("container", ct.ID, "netout", float64(ct.NetworkOut), now)
}
}
}
}

View file

@ -305,9 +305,33 @@ func (s *Store) writeBatch(metrics []bufferedMetric) {
// Query retrieves metrics for a resource within a time range, with optional downsampling
func (s *Store) Query(resourceType, resourceID, metricType string, start, end time.Time, stepSecs int64) ([]MetricPoint, error) {
// Select appropriate tier based on time range
tier := s.selectTier(end.Sub(start))
tiers := s.tierFallbacks(end.Sub(start))
if len(tiers) == 0 {
return []MetricPoint{}, nil
}
for i, tier := range tiers {
points, err := s.queryWithTier(resourceType, resourceID, metricType, start, end, stepSecs, tier)
if err != nil {
return nil, err
}
if len(points) > 0 || i == len(tiers)-1 {
return points, nil
}
log.Debug().
Str("resourceType", resourceType).
Str("resourceId", resourceID).
Str("metric", metricType).
Str("fromTier", string(tier)).
Str("toTier", string(tiers[i+1])).
Msg("Metrics query empty; falling back to more detailed tier")
}
return []MetricPoint{}, nil
}
func (s *Store) queryWithTier(resourceType, resourceID, metricType string, start, end time.Time, stepSecs int64, tier Tier) ([]MetricPoint, error) {
var rows *sql.Rows
var err error
@ -323,7 +347,7 @@ func (s *Store) Query(resourceType, resourceID, metricType string, start, end ti
if stepSecs > 1 {
sqlQuery = `
SELECT
(timestamp / ?) * ? as bucket_ts,
(timestamp / ?) * ? + (? / 2) as bucket_ts,
AVG(value),
MIN(COALESCE(min_value, value)),
MAX(COALESCE(max_value, value))
@ -334,7 +358,7 @@ func (s *Store) Query(resourceType, resourceID, metricType string, start, end ti
ORDER BY bucket_ts ASC
`
queryParams = []interface{}{
stepSecs, stepSecs,
stepSecs, stepSecs, stepSecs,
resourceType, resourceID, metricType, string(tier), start.Unix(), end.Unix(),
}
}
@ -371,8 +395,57 @@ func (s *Store) Query(resourceType, resourceID, metricType string, start, end ti
// QueryAll retrieves all metric types for a resource within a time range, with optional downsampling
func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time, stepSecs int64) (map[string][]MetricPoint, error) {
tier := s.selectTier(end.Sub(start))
tiers := s.tierFallbacks(end.Sub(start))
if len(tiers) == 0 {
return map[string][]MetricPoint{}, nil
}
result := make(map[string][]MetricPoint)
for i, tier := range tiers {
tierResult, err := s.queryAllWithTier(resourceType, resourceID, start, end, stepSecs, tier)
if err != nil {
return nil, err
}
if len(tierResult) == 0 {
if i < len(tiers)-1 && len(result) == 0 {
log.Debug().
Str("resourceType", resourceType).
Str("resourceId", resourceID).
Str("fromTier", string(tier)).
Str("toTier", string(tiers[i+1])).
Msg("Metrics query empty; falling back to more detailed tier")
}
continue
}
// Merge in any metrics missing from higher tier results.
added := 0
for metric, points := range tierResult {
if len(points) == 0 {
continue
}
if existing, ok := result[metric]; !ok || len(existing) == 0 {
result[metric] = points
added++
}
}
// If we already have some metrics and this tier didn't add anything new,
// keep going in case lower tiers have newly introduced metrics.
if added == 0 && i < len(tiers)-1 && len(result) == 0 {
log.Debug().
Str("resourceType", resourceType).
Str("resourceId", resourceID).
Str("fromTier", string(tier)).
Str("toTier", string(tiers[i+1])).
Msg("Metrics query empty; falling back to more detailed tier")
}
}
return result, nil
}
func (s *Store) queryAllWithTier(resourceType, resourceID string, start, end time.Time, stepSecs int64, tier Tier) (map[string][]MetricPoint, error) {
var rows *sql.Rows
var err error
@ -439,13 +512,13 @@ func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time,
// selectTier chooses the appropriate data tier based on time range
// Note: Tier selection uses fixed thresholds to ensure queries use tiers with complete data:
// - Raw: up to 2 hours (high-resolution real-time data)
// - Minute: up to 6 hours (recent detailed data)
// - Minute: up to 24 hours (recent detailed data)
// - Hourly: up to 7 days (medium-term with mock/seeded data coverage)
// - Daily: beyond 7 days (long-term historical data)
func (s *Store) selectTier(duration time.Duration) Tier {
const (
rawThreshold = 2 * time.Hour
minuteThreshold = 6 * time.Hour // 24h queries use hourly tier which has complete historical data
minuteThreshold = 24 * time.Hour
hourlyThreshold = 7 * 24 * time.Hour
)
@ -461,6 +534,21 @@ func (s *Store) selectTier(duration time.Duration) Tier {
}
}
func (s *Store) tierFallbacks(duration time.Duration) []Tier {
switch s.selectTier(duration) {
case TierRaw:
return []Tier{TierRaw}
case TierMinute:
return []Tier{TierMinute, TierRaw}
case TierHourly:
return []Tier{TierHourly, TierMinute, TierRaw}
case TierDaily:
return []Tier{TierDaily, TierHourly, TierMinute, TierRaw}
default:
return []Tier{TierRaw}
}
}
// backgroundWorker runs periodic tasks
func (s *Store) backgroundWorker() {
defer close(s.doneCh)

View file

@ -100,6 +100,39 @@ func TestStoreSelectTierAndStats(t *testing.T) {
}
}
func TestStoreQueryFallbacksToRaw(t *testing.T) {
dir := t.TempDir()
cfg := DefaultConfig(dir)
cfg.DBPath = filepath.Join(dir, "metrics-test.db")
cfg.FlushInterval = time.Hour
store, err := NewStore(cfg)
if err != nil {
t.Fatalf("NewStore returned error: %v", err)
}
defer store.Close()
ts := time.Now()
store.WriteWithTier("vm", "vm-101", "cpu", 42.0, ts, TierRaw)
store.Flush()
points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-24*time.Hour), ts.Add(time.Second), 0)
if err != nil {
t.Fatalf("Query returned error: %v", err)
}
if len(points) != 1 {
t.Fatalf("expected 1 point, got %d", len(points))
}
all, err := store.QueryAll("vm", "vm-101", ts.Add(-24*time.Hour), ts.Add(time.Second), 0)
if err != nil {
t.Fatalf("QueryAll returned error: %v", err)
}
if len(all["cpu"]) != 1 {
t.Fatalf("expected QueryAll to return 1 cpu point, got %+v", all)
}
}
func TestStoreRollupTier(t *testing.T) {
dir := t.TempDir()
cfg := DefaultConfig(dir)