fix: smooth I/O rates using sliding window like Prometheus rate()

Proxmox reports cumulative byte counters that update unevenly across
polling intervals, causing a steady 100 Mbps download to appear as
spikes up to 450 Mbps in sparkline charts. Replace per-interval rate
calculation with a 4-sample sliding window (30s at 10s polling) that
averages over the full span — the same approach Prometheus rate() uses.
This commit is contained in:
rcourtman 2026-02-04 19:04:17 +00:00
parent 7e55c4dc52
commit 64e57f0e0e
2 changed files with 338 additions and 484 deletions

View file

@ -10,10 +10,48 @@ import (
// Use IOMetrics from types package
type IOMetrics = types.IOMetrics
// rateWindowSize is the number of counter samples retained per guest.
// Rate is computed from the oldest to the newest sample, giving an average
// over (rateWindowSize-1) polling intervals. With a 10s poll interval and
// window size 4, this produces a 30-second sliding window — the same approach
// Prometheus rate() uses to smooth out per-interval counter jitter.
const rateWindowSize = 4
// counterRing is a fixed-size ring buffer of IOMetrics samples.
type counterRing struct {
entries [rateWindowSize]IOMetrics
count int // number of entries stored (up to rateWindowSize)
head int // next write position
}
func (r *counterRing) add(m IOMetrics) {
r.entries[r.head] = m
r.head = (r.head + 1) % rateWindowSize
if r.count < rateWindowSize {
r.count++
}
}
func (r *counterRing) oldest() IOMetrics {
if r.count < rateWindowSize {
return r.entries[0]
}
return r.entries[r.head] // head points to the oldest when full
}
func (r *counterRing) newest() IOMetrics {
return r.entries[(r.head-1+rateWindowSize)%rateWindowSize]
}
// newestTimestamp returns the timestamp of the most recent entry.
func (r *counterRing) newestTimestamp() time.Time {
return r.newest().Timestamp
}
// RateTracker tracks I/O metrics to calculate rates
type RateTracker struct {
mu sync.RWMutex
previous map[string]IOMetrics
history map[string]*counterRing
lastRates map[string]RateCache
}
@ -28,7 +66,7 @@ type RateCache struct {
// NewRateTracker creates a new rate tracker
func NewRateTracker() *RateTracker {
return &RateTracker{
previous: make(map[string]IOMetrics),
history: make(map[string]*counterRing),
lastRates: make(map[string]RateCache),
}
}
@ -39,14 +77,18 @@ func (rt *RateTracker) CalculateRates(guestID string, current IOMetrics) (diskRe
rt.mu.Lock()
defer rt.mu.Unlock()
prev, exists := rt.previous[guestID]
ring, exists := rt.history[guestID]
if !exists {
// No previous data, store it and return -1 to indicate no data available
rt.previous[guestID] = current
ring = &counterRing{}
ring.add(current)
rt.history[guestID] = ring
return -1, -1, -1, -1
}
prev := ring.newest()
// Check if the values have actually changed (detect stale data)
// If all cumulative values are the same, we're getting cached data from Proxmox
if current.DiskRead == prev.DiskRead &&
@ -61,11 +103,14 @@ func (rt *RateTracker) CalculateRates(guestID string, current IOMetrics) (diskRe
return 0, 0, 0, 0
}
// Data has changed, update our cache
rt.previous[guestID] = current
// Data has changed, add to ring buffer
ring.add(current)
// Calculate time difference in seconds
timeDiff := current.Timestamp.Sub(prev.Timestamp).Seconds()
// Calculate rate over the full window (oldest to current), like Prometheus rate().
// This naturally smooths out per-interval jitter from Proxmox's lumpy counter
// reporting by averaging over a wider time span.
oldest := ring.oldest()
timeDiff := current.Timestamp.Sub(oldest.Timestamp).Seconds()
if timeDiff <= 0 {
// Return last known rates if time hasn't advanced
if lastRate, hasRate := rt.lastRates[guestID]; hasRate {
@ -74,18 +119,18 @@ func (rt *RateTracker) CalculateRates(guestID string, current IOMetrics) (diskRe
return 0, 0, 0, 0
}
// Calculate rates (bytes per second)
if current.DiskRead >= prev.DiskRead {
diskReadRate = float64(current.DiskRead-prev.DiskRead) / timeDiff
// Calculate rates (bytes per second) over the window
if current.DiskRead >= oldest.DiskRead {
diskReadRate = float64(current.DiskRead-oldest.DiskRead) / timeDiff
}
if current.DiskWrite >= prev.DiskWrite {
diskWriteRate = float64(current.DiskWrite-prev.DiskWrite) / timeDiff
if current.DiskWrite >= oldest.DiskWrite {
diskWriteRate = float64(current.DiskWrite-oldest.DiskWrite) / timeDiff
}
if current.NetworkIn >= prev.NetworkIn {
netInRate = float64(current.NetworkIn-prev.NetworkIn) / timeDiff
if current.NetworkIn >= oldest.NetworkIn {
netInRate = float64(current.NetworkIn-oldest.NetworkIn) / timeDiff
}
if current.NetworkOut >= prev.NetworkOut {
netOutRate = float64(current.NetworkOut-prev.NetworkOut) / timeDiff
if current.NetworkOut >= oldest.NetworkOut {
netOutRate = float64(current.NetworkOut-oldest.NetworkOut) / timeDiff
}
// Cache the calculated rates
@ -103,7 +148,7 @@ func (rt *RateTracker) CalculateRates(guestID string, current IOMetrics) (diskRe
func (rt *RateTracker) Clear() {
rt.mu.Lock()
defer rt.mu.Unlock()
rt.previous = make(map[string]IOMetrics)
rt.history = make(map[string]*counterRing)
rt.lastRates = make(map[string]RateCache)
}
@ -113,9 +158,9 @@ func (rt *RateTracker) Cleanup(cutoff time.Time) (removed int) {
rt.mu.Lock()
defer rt.mu.Unlock()
for guestID, metrics := range rt.previous {
if metrics.Timestamp.Before(cutoff) {
delete(rt.previous, guestID)
for guestID, ring := range rt.history {
if ring.newestTimestamp().Before(cutoff) {
delete(rt.history, guestID)
delete(rt.lastRates, guestID)
removed++
}

View file

@ -7,254 +7,201 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/types"
)
func TestCalculateRates(t *testing.T) {
tests := []struct {
name string
guestID string
previous map[string]types.IOMetrics
lastRates map[string]RateCache
current types.IOMetrics
wantDiskReadRate float64
wantDiskWriteRate float64
wantNetInRate float64
wantNetOutRate float64
}{
{
name: "first call for guest returns -1 for all rates",
guestID: "vm-100",
previous: map[string]types.IOMetrics{},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Now()},
wantDiskReadRate: -1,
wantDiskWriteRate: -1,
wantNetInRate: -1,
wantNetOutRate: -1,
},
{
name: "stale data with cached rates returns cached rates",
guestID: "vm-101",
previous: map[string]types.IOMetrics{
"vm-101": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Now().Add(-5 * time.Second)},
},
lastRates: map[string]RateCache{
"vm-101": {DiskReadRate: 100, DiskWriteRate: 200, NetInRate: 300, NetOutRate: 400},
},
current: types.IOMetrics{DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Now()},
wantDiskReadRate: 100,
wantDiskWriteRate: 200,
wantNetInRate: 300,
wantNetOutRate: 400,
},
{
name: "stale data without cached rates returns zeros",
guestID: "vm-102",
previous: map[string]types.IOMetrics{
"vm-102": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Now().Add(-5 * time.Second)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Now()},
wantDiskReadRate: 0,
wantDiskWriteRate: 0,
wantNetInRate: 0,
wantNetOutRate: 0,
},
{
name: "zero time difference with cached rates returns cached rates",
guestID: "vm-103",
previous: map[string]types.IOMetrics{
"vm-103": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{
"vm-103": {DiskReadRate: 50, DiskWriteRate: 100, NetInRate: 150, NetOutRate: 200},
},
current: types.IOMetrics{DiskRead: 2000, DiskWrite: 3000, NetworkIn: 4000, NetworkOut: 5000, Timestamp: time.Unix(1000, 0)},
wantDiskReadRate: 50,
wantDiskWriteRate: 100,
wantNetInRate: 150,
wantNetOutRate: 200,
},
{
name: "zero time difference without cached rates returns zeros",
guestID: "vm-104",
previous: map[string]types.IOMetrics{
"vm-104": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 2000, DiskWrite: 3000, NetworkIn: 4000, NetworkOut: 5000, Timestamp: time.Unix(1000, 0)},
wantDiskReadRate: 0,
wantDiskWriteRate: 0,
wantNetInRate: 0,
wantNetOutRate: 0,
},
{
name: "negative time difference with cached rates returns cached rates",
guestID: "vm-105",
previous: map[string]types.IOMetrics{
"vm-105": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(2000, 0)},
},
lastRates: map[string]RateCache{
"vm-105": {DiskReadRate: 75, DiskWriteRate: 125, NetInRate: 175, NetOutRate: 225},
},
current: types.IOMetrics{DiskRead: 2000, DiskWrite: 3000, NetworkIn: 4000, NetworkOut: 5000, Timestamp: time.Unix(1000, 0)},
wantDiskReadRate: 75,
wantDiskWriteRate: 125,
wantNetInRate: 175,
wantNetOutRate: 225,
},
{
name: "normal rate calculation",
guestID: "vm-106",
previous: map[string]types.IOMetrics{
"vm-106": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 6000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000, Timestamp: time.Unix(1010, 0)},
wantDiskReadRate: 500, // (6000-1000)/10 = 500
wantDiskWriteRate: 1000, // (12000-2000)/10 = 1000
wantNetInRate: 1500, // (18000-3000)/10 = 1500
wantNetOutRate: 2000, // (24000-4000)/10 = 2000
},
{
name: "counter rollover on disk read returns zero for that metric",
guestID: "vm-107",
previous: map[string]types.IOMetrics{
"vm-107": {DiskRead: 5000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 1000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000, Timestamp: time.Unix(1010, 0)},
wantDiskReadRate: 0, // counter decreased, return 0
wantDiskWriteRate: 1000, // (12000-2000)/10 = 1000
wantNetInRate: 1500, // (18000-3000)/10 = 1500
wantNetOutRate: 2000, // (24000-4000)/10 = 2000
},
{
name: "counter rollover on disk write returns zero for that metric",
guestID: "vm-108",
previous: map[string]types.IOMetrics{
"vm-108": {DiskRead: 1000, DiskWrite: 12000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 6000, DiskWrite: 2000, NetworkIn: 18000, NetworkOut: 24000, Timestamp: time.Unix(1010, 0)},
wantDiskReadRate: 500, // (6000-1000)/10 = 500
wantDiskWriteRate: 0, // counter decreased, return 0
wantNetInRate: 1500,
wantNetOutRate: 2000,
},
{
name: "counter rollover on network in returns zero for that metric",
guestID: "vm-109",
previous: map[string]types.IOMetrics{
"vm-109": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 18000, NetworkOut: 4000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 6000, DiskWrite: 12000, NetworkIn: 3000, NetworkOut: 24000, Timestamp: time.Unix(1010, 0)},
wantDiskReadRate: 500,
wantDiskWriteRate: 1000,
wantNetInRate: 0, // counter decreased, return 0
wantNetOutRate: 2000,
},
{
name: "counter rollover on network out returns zero for that metric",
guestID: "vm-110",
previous: map[string]types.IOMetrics{
"vm-110": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 24000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 6000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 4000, Timestamp: time.Unix(1010, 0)},
wantDiskReadRate: 500,
wantDiskWriteRate: 1000,
wantNetInRate: 1500,
wantNetOutRate: 0, // counter decreased, return 0
},
{
name: "all counters rollover returns zeros",
guestID: "vm-111",
previous: map[string]types.IOMetrics{
"vm-111": {DiskRead: 6000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(1010, 0)},
wantDiskReadRate: 0,
wantDiskWriteRate: 0,
wantNetInRate: 0,
wantNetOutRate: 0,
},
{
name: "mixed scenario: disk read stale, others changed",
guestID: "vm-112",
previous: map[string]types.IOMetrics{
"vm-112": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 1000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000, Timestamp: time.Unix(1010, 0)},
wantDiskReadRate: 0, // no change: (1000-1000)/10 = 0
wantDiskWriteRate: 1000,
wantNetInRate: 1500,
wantNetOutRate: 2000,
},
{
name: "zero values with time elapsed",
guestID: "vm-113",
previous: map[string]types.IOMetrics{
"vm-113": {DiskRead: 0, DiskWrite: 0, NetworkIn: 0, NetworkOut: 0, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 5000, DiskWrite: 10000, NetworkIn: 15000, NetworkOut: 20000, Timestamp: time.Unix(1005, 0)},
wantDiskReadRate: 1000, // 5000/5 = 1000
wantDiskWriteRate: 2000, // 10000/5 = 2000
wantNetInRate: 3000, // 15000/5 = 3000
wantNetOutRate: 4000, // 20000/5 = 4000
},
{
name: "fractional time difference",
guestID: "vm-114",
previous: map[string]types.IOMetrics{
"vm-114": {DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 1500, DiskWrite: 2500, NetworkIn: 3500, NetworkOut: 4500, Timestamp: time.Unix(1000, 500000000)},
wantDiskReadRate: 1000, // 500/0.5 = 1000
wantDiskWriteRate: 1000, // 500/0.5 = 1000
wantNetInRate: 1000, // 500/0.5 = 1000
wantNetOutRate: 1000, // 500/0.5 = 1000
},
{
name: "large values",
guestID: "vm-115",
previous: map[string]types.IOMetrics{
"vm-115": {DiskRead: 1000000000, DiskWrite: 2000000000, NetworkIn: 3000000000, NetworkOut: 4000000000, Timestamp: time.Unix(1000, 0)},
},
lastRates: map[string]RateCache{},
current: types.IOMetrics{DiskRead: 1100000000, DiskWrite: 2200000000, NetworkIn: 3300000000, NetworkOut: 4400000000, Timestamp: time.Unix(1100, 0)},
wantDiskReadRate: 1000000, // 100000000/100 = 1000000
wantDiskWriteRate: 2000000, // 200000000/100 = 2000000
wantNetInRate: 3000000, // 300000000/100 = 3000000
wantNetOutRate: 4000000, // 400000000/100 = 4000000
},
func TestCalculateRates_FirstCallReturnsNegativeOnes(t *testing.T) {
rt := NewRateTracker()
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: time.Now(),
})
if d != -1 || w != -1 || ni != -1 || no != -1 {
t.Errorf("first call: got (%v, %v, %v, %v), want (-1, -1, -1, -1)", d, w, ni, no)
}
}
func TestCalculateRates_StaleDataReturnsCachedRates(t *testing.T) {
rt := NewRateTracker()
base := time.Unix(1000, 0)
// Seed
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: base,
})
// Establish rates
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 6000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000,
Timestamp: base.Add(10 * time.Second),
})
// Send stale data (same counter values, different timestamp)
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 6000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000,
Timestamp: base.Add(20 * time.Second),
})
if d != 500 || w != 1000 || ni != 1500 || no != 2000 {
t.Errorf("stale data: got (%v, %v, %v, %v), want (500, 1000, 1500, 2000)", d, w, ni, no)
}
}
func TestCalculateRates_StaleDataWithoutCachedRatesReturnsZeros(t *testing.T) {
rt := NewRateTracker()
base := time.Unix(1000, 0)
// Seed with values
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: base,
})
// Send identical values (stale) — no cached rates yet
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: base.Add(10 * time.Second),
})
if d != 0 || w != 0 || ni != 0 || no != 0 {
t.Errorf("stale without cache: got (%v, %v, %v, %v), want (0, 0, 0, 0)", d, w, ni, no)
}
}
func TestCalculateRates_NormalRateCalculation(t *testing.T) {
rt := NewRateTracker()
base := time.Unix(1000, 0)
// Seed
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: base,
})
// Second call — rate over 1 interval (ring only has 2 entries)
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 6000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000,
Timestamp: base.Add(10 * time.Second),
})
if d != 500 || w != 1000 || ni != 1500 || no != 2000 {
t.Errorf("normal rate: got (%v, %v, %v, %v), want (500, 1000, 1500, 2000)", d, w, ni, no)
}
}
func TestCalculateRates_CounterRolloverReturnsZero(t *testing.T) {
rt := NewRateTracker()
base := time.Unix(1000, 0)
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 5000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: base,
})
// DiskRead decreased (counter rollover)
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000,
Timestamp: base.Add(10 * time.Second),
})
if d != 0 {
t.Errorf("DiskRead rollover: got %v, want 0", d)
}
if w != 1000 || ni != 1500 || no != 2000 {
t.Errorf("other rates: got (%v, %v, %v), want (1000, 1500, 2000)", w, ni, no)
}
}
func TestCalculateRates_AllCountersRollover(t *testing.T) {
rt := NewRateTracker()
base := time.Unix(1000, 0)
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 6000, DiskWrite: 12000, NetworkIn: 18000, NetworkOut: 24000,
Timestamp: base,
})
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: base.Add(10 * time.Second),
})
if d != 0 || w != 0 || ni != 0 || no != 0 {
t.Errorf("all rollover: got (%v, %v, %v, %v), want (0, 0, 0, 0)", d, w, ni, no)
}
}
func TestCalculateRates_FractionalTimeDifference(t *testing.T) {
rt := NewRateTracker()
base := time.Unix(1000, 0)
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: base,
})
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1500, DiskWrite: 2500, NetworkIn: 3500, NetworkOut: 4500,
Timestamp: base.Add(500 * time.Millisecond),
})
// 500 / 0.5 = 1000
if d != 1000 || w != 1000 || ni != 1000 || no != 1000 {
t.Errorf("fractional time: got (%v, %v, %v, %v), want (1000, 1000, 1000, 1000)", d, w, ni, no)
}
}
func TestCalculateRates_LargeValues(t *testing.T) {
rt := NewRateTracker()
base := time.Unix(1000, 0)
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000000000, DiskWrite: 2000000000, NetworkIn: 3000000000, NetworkOut: 4000000000,
Timestamp: base,
})
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1100000000, DiskWrite: 2200000000, NetworkIn: 3300000000, NetworkOut: 4400000000,
Timestamp: base.Add(100 * time.Second),
})
if d != 1000000 || w != 2000000 || ni != 3000000 || no != 4000000 {
t.Errorf("large values: got (%v, %v, %v, %v), want (1000000, 2000000, 3000000, 4000000)", d, w, ni, no)
}
}
func TestCalculateRates_WindowSmooths(t *testing.T) {
rt := NewRateTracker()
base := time.Unix(1000, 0)
// Simulate a steady 1000 bytes/sec download with Proxmox's lumpy counter updates.
// Over 4 intervals (40 seconds), 40000 bytes should arrive.
// But Proxmox distributes them unevenly across intervals.
// T=0: seed
rt.CalculateRates("vm-100", types.IOMetrics{
NetworkIn: 0, Timestamp: base,
})
// T=10: normal interval (10000 bytes in 10s = 1000 B/s)
rt.CalculateRates("vm-100", types.IOMetrics{
NetworkIn: 10000, Timestamp: base.Add(10 * time.Second),
})
// T=20: short-changed interval (only 5000 bytes reported)
rt.CalculateRates("vm-100", types.IOMetrics{
NetworkIn: 15000, Timestamp: base.Add(20 * time.Second),
})
// T=30: lumpy interval (15000 bytes — makes up for the deficit + normal)
// Without windowing, raw rate would be 15000/10 = 1500 B/s (50% spike).
// With windowing (oldest=T=0, current=T=30), rate = 30000/30 = 1000 B/s.
_, _, ni, _ := rt.CalculateRates("vm-100", types.IOMetrics{
NetworkIn: 30000, Timestamp: base.Add(30 * time.Second),
})
if ni != 1000 {
t.Errorf("windowed rate during lumpy interval: got %v, want 1000", ni)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rt := &RateTracker{
previous: tt.previous,
lastRates: tt.lastRates,
}
// T=40: ring is now full (4 entries), oldest is T=10.
// Rate = (40000-10000)/(40-10) = 30000/30 = 1000 B/s
_, _, ni, _ = rt.CalculateRates("vm-100", types.IOMetrics{
NetworkIn: 40000, Timestamp: base.Add(40 * time.Second),
})
gotDiskRead, gotDiskWrite, gotNetIn, gotNetOut := rt.CalculateRates(tt.guestID, tt.current)
if gotDiskRead != tt.wantDiskReadRate {
t.Errorf("DiskReadRate = %v, want %v", gotDiskRead, tt.wantDiskReadRate)
}
if gotDiskWrite != tt.wantDiskWriteRate {
t.Errorf("DiskWriteRate = %v, want %v", gotDiskWrite, tt.wantDiskWriteRate)
}
if gotNetIn != tt.wantNetInRate {
t.Errorf("NetInRate = %v, want %v", gotNetIn, tt.wantNetInRate)
}
if gotNetOut != tt.wantNetOutRate {
t.Errorf("NetOutRate = %v, want %v", gotNetOut, tt.wantNetOutRate)
}
})
if ni != 1000 {
t.Errorf("windowed rate after ring full: got %v, want 1000", ni)
}
}
@ -263,102 +210,61 @@ func TestCalculateRates_MultipleGuestsTrackedIndependently(t *testing.T) {
baseTime := time.Unix(1000, 0)
// First call for guest A - should return -1 for all
metricsA1 := types.IOMetrics{
DiskRead: 1000,
DiskWrite: 2000,
NetworkIn: 3000,
NetworkOut: 4000,
Timestamp: baseTime,
}
diskReadA1, diskWriteA1, netInA1, netOutA1 := rt.CalculateRates("vm-100", metricsA1)
diskReadA1, diskWriteA1, netInA1, netOutA1 := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: baseTime,
})
if diskReadA1 != -1 || diskWriteA1 != -1 || netInA1 != -1 || netOutA1 != -1 {
t.Errorf("first call for vm-100: got (%v, %v, %v, %v), want (-1, -1, -1, -1)",
diskReadA1, diskWriteA1, netInA1, netOutA1)
}
// First call for guest B - should also return -1 for all
metricsB1 := types.IOMetrics{
DiskRead: 5000,
DiskWrite: 6000,
NetworkIn: 7000,
NetworkOut: 8000,
Timestamp: baseTime,
}
diskReadB1, diskWriteB1, netInB1, netOutB1 := rt.CalculateRates("vm-200", metricsB1)
diskReadB1, diskWriteB1, netInB1, netOutB1 := rt.CalculateRates("vm-200", types.IOMetrics{
DiskRead: 5000, DiskWrite: 6000, NetworkIn: 7000, NetworkOut: 8000,
Timestamp: baseTime,
})
if diskReadB1 != -1 || diskWriteB1 != -1 || netInB1 != -1 || netOutB1 != -1 {
t.Errorf("first call for vm-200: got (%v, %v, %v, %v), want (-1, -1, -1, -1)",
diskReadB1, diskWriteB1, netInB1, netOutB1)
}
// Second call for guest A - should calculate rates
metricsA2 := types.IOMetrics{
DiskRead: 11000,
DiskWrite: 22000,
NetworkIn: 33000,
NetworkOut: 44000,
Timestamp: baseTime.Add(10 * time.Second),
}
diskReadA2, diskWriteA2, netInA2, netOutA2 := rt.CalculateRates("vm-100", metricsA2)
// (11000-1000)/10 = 1000, (22000-2000)/10 = 2000, etc.
// Second call for guest A
diskReadA2, diskWriteA2, netInA2, netOutA2 := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 11000, DiskWrite: 22000, NetworkIn: 33000, NetworkOut: 44000,
Timestamp: baseTime.Add(10 * time.Second),
})
if diskReadA2 != 1000 || diskWriteA2 != 2000 || netInA2 != 3000 || netOutA2 != 4000 {
t.Errorf("second call for vm-100: got (%v, %v, %v, %v), want (1000, 2000, 3000, 4000)",
diskReadA2, diskWriteA2, netInA2, netOutA2)
}
// Second call for guest B - should calculate different rates
metricsB2 := types.IOMetrics{
DiskRead: 10000,
DiskWrite: 16000,
NetworkIn: 22000,
NetworkOut: 28000,
Timestamp: baseTime.Add(5 * time.Second),
}
diskReadB2, diskWriteB2, netInB2, netOutB2 := rt.CalculateRates("vm-200", metricsB2)
// (10000-5000)/5 = 1000, (16000-6000)/5 = 2000, etc.
// Second call for guest B - different rates
diskReadB2, diskWriteB2, netInB2, netOutB2 := rt.CalculateRates("vm-200", types.IOMetrics{
DiskRead: 10000, DiskWrite: 16000, NetworkIn: 22000, NetworkOut: 28000,
Timestamp: baseTime.Add(5 * time.Second),
})
if diskReadB2 != 1000 || diskWriteB2 != 2000 || netInB2 != 3000 || netOutB2 != 4000 {
t.Errorf("second call for vm-200: got (%v, %v, %v, %v), want (1000, 2000, 3000, 4000)",
diskReadB2, diskWriteB2, netInB2, netOutB2)
}
// Third call for guest A - should use new previous values, not affected by guest B
metricsA3 := types.IOMetrics{
DiskRead: 21000,
DiskWrite: 42000,
NetworkIn: 63000,
NetworkOut: 84000,
Timestamp: baseTime.Add(20 * time.Second),
}
diskReadA3, diskWriteA3, netInA3, netOutA3 := rt.CalculateRates("vm-100", metricsA3)
// (21000-11000)/10 = 1000, (42000-22000)/10 = 2000, etc.
if diskReadA3 != 1000 || diskWriteA3 != 2000 || netInA3 != 3000 || netOutA3 != 4000 {
t.Errorf("third call for vm-100: got (%v, %v, %v, %v), want (1000, 2000, 3000, 4000)",
diskReadA3, diskWriteA3, netInA3, netOutA3)
}
}
func TestCalculateRates_CachesRates(t *testing.T) {
rt := NewRateTracker()
baseTime := time.Unix(1000, 0)
// First call
metrics1 := types.IOMetrics{
DiskRead: 1000,
DiskWrite: 2000,
NetworkIn: 3000,
NetworkOut: 4000,
Timestamp: baseTime,
}
rt.CalculateRates("vm-100", metrics1)
// Seed
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, DiskWrite: 2000, NetworkIn: 3000, NetworkOut: 4000,
Timestamp: baseTime,
})
// Second call with changed values - should cache the calculated rates
metrics2 := types.IOMetrics{
DiskRead: 11000,
DiskWrite: 22000,
NetworkIn: 33000,
NetworkOut: 44000,
Timestamp: baseTime.Add(10 * time.Second),
}
diskRead2, diskWrite2, netIn2, netOut2 := rt.CalculateRates("vm-100", metrics2)
// Calculate rates
diskRead2, diskWrite2, netIn2, netOut2 := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 11000, DiskWrite: 22000, NetworkIn: 33000, NetworkOut: 44000,
Timestamp: baseTime.Add(10 * time.Second),
})
// Verify rates are cached
cachedRates, exists := rt.lastRates["vm-100"]
@ -378,145 +284,73 @@ func TestCalculateRates_CachesRates(t *testing.T) {
t.Errorf("cached NetOutRate = %v, want %v", cachedRates.NetOutRate, netOut2)
}
// Third call with stale data - should return cached rates
metrics3 := types.IOMetrics{
DiskRead: 11000, // same as metrics2
DiskWrite: 22000,
NetworkIn: 33000,
NetworkOut: 44000,
Timestamp: baseTime.Add(15 * time.Second), // different time but same values
}
diskRead3, diskWrite3, netIn3, netOut3 := rt.CalculateRates("vm-100", metrics3)
// Stale data returns cached rates
diskRead3, diskWrite3, netIn3, netOut3 := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 11000, DiskWrite: 22000, NetworkIn: 33000, NetworkOut: 44000,
Timestamp: baseTime.Add(15 * time.Second),
})
if diskRead3 != diskRead2 || diskWrite3 != diskWrite2 || netIn3 != netIn2 || netOut3 != netOut2 {
t.Errorf("stale data call: got (%v, %v, %v, %v), want cached (%v, %v, %v, %v)",
diskRead3, diskWrite3, netIn3, netOut3, diskRead2, diskWrite2, netIn2, netOut2)
}
}
func TestCalculateRates_UpdatesPreviousMetrics(t *testing.T) {
func TestCalculateRates_DoesNotAddStaleDataToRing(t *testing.T) {
rt := NewRateTracker()
baseTime := time.Unix(1000, 0)
base := time.Unix(1000, 0)
// First call
metrics1 := types.IOMetrics{
DiskRead: 1000,
DiskWrite: 2000,
NetworkIn: 3000,
NetworkOut: 4000,
Timestamp: baseTime,
}
rt.CalculateRates("vm-100", metrics1)
// Seed
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, NetworkIn: 3000, Timestamp: base,
})
// Verify previous metrics are stored
prev, exists := rt.previous["vm-100"]
if !exists {
t.Fatal("expected previous metrics to be stored for vm-100")
}
if prev.DiskRead != metrics1.DiskRead {
t.Errorf("previous DiskRead = %v, want %v", prev.DiskRead, metrics1.DiskRead)
}
// Real data
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 6000, NetworkIn: 18000, Timestamp: base.Add(10 * time.Second),
})
// Second call with changed values
metrics2 := types.IOMetrics{
DiskRead: 11000,
DiskWrite: 22000,
NetworkIn: 33000,
NetworkOut: 44000,
Timestamp: baseTime.Add(10 * time.Second),
}
rt.CalculateRates("vm-100", metrics2)
// Stale data — should not be added to ring
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 6000, NetworkIn: 18000, Timestamp: base.Add(20 * time.Second),
})
// Verify previous metrics are updated
prev2, exists := rt.previous["vm-100"]
if !exists {
t.Fatal("expected previous metrics to still be stored for vm-100")
}
if prev2.DiskRead != metrics2.DiskRead {
t.Errorf("updated previous DiskRead = %v, want %v", prev2.DiskRead, metrics2.DiskRead)
}
if prev2.DiskWrite != metrics2.DiskWrite {
t.Errorf("updated previous DiskWrite = %v, want %v", prev2.DiskWrite, metrics2.DiskWrite)
}
if prev2.NetworkIn != metrics2.NetworkIn {
t.Errorf("updated previous NetworkIn = %v, want %v", prev2.NetworkIn, metrics2.NetworkIn)
}
if prev2.NetworkOut != metrics2.NetworkOut {
t.Errorf("updated previous NetworkOut = %v, want %v", prev2.NetworkOut, metrics2.NetworkOut)
}
if !prev2.Timestamp.Equal(metrics2.Timestamp) {
t.Errorf("updated previous Timestamp = %v, want %v", prev2.Timestamp, metrics2.Timestamp)
}
}
func TestCalculateRates_DoesNotUpdatePreviousOnStaleData(t *testing.T) {
rt := NewRateTracker()
baseTime := time.Unix(1000, 0)
// First call
metrics1 := types.IOMetrics{
DiskRead: 1000,
DiskWrite: 2000,
NetworkIn: 3000,
NetworkOut: 4000,
Timestamp: baseTime,
}
rt.CalculateRates("vm-100", metrics1)
// Second call with stale data (same values, different timestamp)
metrics2 := types.IOMetrics{
DiskRead: 1000, // same
DiskWrite: 2000, // same
NetworkIn: 3000, // same
NetworkOut: 4000, // same
Timestamp: baseTime.Add(10 * time.Second), // different time
}
rt.CalculateRates("vm-100", metrics2)
// Previous should still be metrics1, not metrics2
prev, exists := rt.previous["vm-100"]
if !exists {
t.Fatal("expected previous metrics to be stored for vm-100")
}
if !prev.Timestamp.Equal(metrics1.Timestamp) {
t.Errorf("previous should not be updated on stale data: got timestamp %v, want %v", prev.Timestamp, metrics1.Timestamp)
// Ring should still have 2 entries (seed + one real update)
ring := rt.history["vm-100"]
if ring.count != 2 {
t.Errorf("ring count after stale data: got %d, want 2", ring.count)
}
}
func TestClear(t *testing.T) {
rt := NewRateTracker()
baseTime := time.Unix(1000, 0)
base := time.Unix(1000, 0)
// Add some data
metrics := types.IOMetrics{
DiskRead: 1000,
DiskWrite: 2000,
NetworkIn: 3000,
NetworkOut: 4000,
Timestamp: baseTime,
}
rt.CalculateRates("vm-100", metrics)
rt.CalculateRates("vm-200", metrics)
rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, Timestamp: base,
})
rt.CalculateRates("vm-200", types.IOMetrics{
DiskRead: 1000, Timestamp: base,
})
// Verify data exists
if len(rt.previous) != 2 {
t.Fatalf("expected 2 entries in previous, got %d", len(rt.previous))
if len(rt.history) != 2 {
t.Fatalf("expected 2 entries in history, got %d", len(rt.history))
}
// Clear
rt.Clear()
// Verify all data is cleared
if len(rt.previous) != 0 {
t.Errorf("expected previous to be empty after Clear, got %d entries", len(rt.previous))
if len(rt.history) != 0 {
t.Errorf("expected history to be empty after Clear, got %d entries", len(rt.history))
}
if len(rt.lastRates) != 0 {
t.Errorf("expected lastRates to be empty after Clear, got %d entries", len(rt.lastRates))
}
// Verify next call after clear returns -1 (like first call)
diskRead, diskWrite, netIn, netOut := rt.CalculateRates("vm-100", metrics)
if diskRead != -1 || diskWrite != -1 || netIn != -1 || netOut != -1 {
t.Errorf("after Clear, first call should return -1s: got (%v, %v, %v, %v)", diskRead, diskWrite, netIn, netOut)
// After clear, first call returns -1
d, w, ni, no := rt.CalculateRates("vm-100", types.IOMetrics{
DiskRead: 1000, Timestamp: base,
})
if d != -1 || w != -1 || ni != -1 || no != -1 {
t.Errorf("after Clear: got (%v, %v, %v, %v), want (-1, -1, -1, -1)", d, w, ni, no)
}
}
@ -524,53 +358,30 @@ func TestRateTrackerCleanup(t *testing.T) {
rt := NewRateTracker()
now := time.Now()
// Add metrics for three guests:
// - "active-guest" has recent data
// - "stale-guest" has old data (will be removed)
// - "mixed-guest" has recent data (last update is recent)
rt.CalculateRates("active-guest", types.IOMetrics{
DiskRead: 1000, Timestamp: now.Add(-1 * time.Hour),
})
rt.CalculateRates("stale-guest", types.IOMetrics{
DiskRead: 1000, Timestamp: now.Add(-48 * time.Hour),
})
activeMetrics := types.IOMetrics{
DiskRead: 1000,
DiskWrite: 2000,
NetworkIn: 3000,
NetworkOut: 4000,
Timestamp: now.Add(-1 * time.Hour), // Recent
if len(rt.history) != 2 {
t.Fatalf("expected 2 entries, got %d", len(rt.history))
}
staleMetrics := types.IOMetrics{
DiskRead: 1000,
DiskWrite: 2000,
NetworkIn: 3000,
NetworkOut: 4000,
Timestamp: now.Add(-48 * time.Hour), // Old
}
rt.CalculateRates("active-guest", activeMetrics)
rt.CalculateRates("stale-guest", staleMetrics)
// Verify both entries exist
if len(rt.previous) != 2 {
t.Fatalf("expected 2 entries in previous, got %d", len(rt.previous))
}
// Run cleanup with 24-hour cutoff
cutoff := now.Add(-24 * time.Hour)
removed := rt.Cleanup(cutoff)
// Verify stale entry was removed
if removed != 1 {
t.Errorf("expected 1 entry removed, got %d", removed)
}
if len(rt.previous) != 1 {
t.Errorf("expected 1 entry remaining in previous, got %d", len(rt.previous))
if len(rt.history) != 1 {
t.Errorf("expected 1 entry remaining, got %d", len(rt.history))
}
if _, exists := rt.previous["active-guest"]; !exists {
if _, exists := rt.history["active-guest"]; !exists {
t.Error("active-guest should still exist after cleanup")
}
if _, exists := rt.previous["stale-guest"]; exists {
if _, exists := rt.history["stale-guest"]; exists {
t.Error("stale-guest should be removed after cleanup")
}
}
@ -579,9 +390,7 @@ func TestRateTrackerCleanupEmpty(t *testing.T) {
rt := NewRateTracker()
cutoff := time.Now().Add(-24 * time.Hour)
// Cleanup on empty tracker should not panic
removed := rt.Cleanup(cutoff)
if removed != 0 {
t.Errorf("expected 0 entries removed from empty tracker, got %d", removed)
}