Pulse/internal/monitoring/task_queue_test.go
rcourtman b1ed1a2802 test: Add taskHeap.Less tiebreaker test case
When tasks have identical NextRun and Priority, the Less function
falls back to comparing InstanceName alphabetically. Add test to
cover this edge case branch, improving Less coverage to 100%.
2025-12-01 14:36:32 +00:00

847 lines
20 KiB
Go

package monitoring
import (
"testing"
"time"
)
func TestTaskQueue_Snapshot(t *testing.T) {
tests := []struct {
name string
tasks []ScheduledTask
wantDepth int
wantDueWithinSeconds int
wantPerType map[string]int
}{
{
name: "empty queue",
tasks: []ScheduledTask{},
wantDepth: 0,
wantDueWithinSeconds: 0,
wantPerType: map[string]int{},
},
{
name: "single task due soon (within 12 seconds)",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 1,
wantPerType: map[string]int{
"pve": 1,
},
},
{
name: "single task NOT due soon (>12 seconds away)",
tasks: []ScheduledTask{
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(20 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 0,
wantPerType: map[string]int{
"pbs": 1,
},
},
{
name: "multiple tasks of same type",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(15 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 3,
wantDueWithinSeconds: 2, // pve-1 and pve-3 are within 12 seconds
wantPerType: map[string]int{
"pve": 3,
},
},
{
name: "multiple tasks of different types",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(8 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pmg-1",
InstanceType: InstanceTypePMG,
NextRun: time.Now().Add(25 * time.Second),
Interval: 45 * time.Second,
Priority: 1.0,
},
},
wantDepth: 3,
wantDueWithinSeconds: 2, // pve-1 and pbs-1 are within 12 seconds
wantPerType: map[string]int{
"pve": 1,
"pbs": 1,
"pmg": 1,
},
},
{
name: "boundary case: task exactly 12 seconds away",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(12 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 1, // <= 12 seconds should be included
wantPerType: map[string]int{
"pve": 1,
},
},
{
name: "mix of due-soon and not-due-soon tasks",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(1 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(50 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(11 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-2",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(100 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pmg-1",
InstanceType: InstanceTypePMG,
NextRun: time.Now().Add(30 * time.Second),
Interval: 45 * time.Second,
Priority: 1.0,
},
},
wantDepth: 5,
wantDueWithinSeconds: 2, // pve-1 and pbs-1 are within 12 seconds
wantPerType: map[string]int{
"pve": 2,
"pbs": 2,
"pmg": 1,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queue := NewTaskQueue()
// Add all tasks to the queue
for _, task := range tt.tasks {
queue.Upsert(task)
}
// Get snapshot
snapshot := queue.Snapshot()
// Verify depth
if snapshot.Depth != tt.wantDepth {
t.Errorf("Depth = %d, want %d", snapshot.Depth, tt.wantDepth)
}
// Verify DueWithinSeconds
if snapshot.DueWithinSeconds != tt.wantDueWithinSeconds {
t.Errorf("DueWithinSeconds = %d, want %d", snapshot.DueWithinSeconds, tt.wantDueWithinSeconds)
}
// Verify PerType map
if len(snapshot.PerType) != len(tt.wantPerType) {
t.Errorf("PerType has %d entries, want %d", len(snapshot.PerType), len(tt.wantPerType))
}
for typeStr, wantCount := range tt.wantPerType {
gotCount, ok := snapshot.PerType[typeStr]
if !ok {
t.Errorf("PerType missing entry for %s", typeStr)
continue
}
if gotCount != wantCount {
t.Errorf("PerType[%s] = %d, want %d", typeStr, gotCount, wantCount)
}
}
// Verify no extra keys in PerType
for typeStr := range snapshot.PerType {
if _, ok := tt.wantPerType[typeStr]; !ok {
t.Errorf("PerType has unexpected entry for %s", typeStr)
}
}
})
}
}
func TestTaskQueue_Upsert(t *testing.T) {
t.Run("insert into empty queue", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("upsert existing entry with different NextRun", func(t *testing.T) {
queue := NewTaskQueue()
task1 := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task1)
// Update same task with different NextRun
task2 := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task2)
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1 (not 2)", queue.Size())
}
// Verify the task was updated
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-1")
entry := queue.entries[key]
if !entry.task.NextRun.Equal(task2.NextRun) {
t.Errorf("NextRun not updated: got %v, want %v", entry.task.NextRun, task2.NextRun)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("insert multiple entries - verify heap ordering", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify heap root is earliest NextRun
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-1" {
t.Errorf("heap root = %s, want pve-1 (earliest NextRun)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert changes heap position - earlier NextRun", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Update pve-3 to have earliest NextRun
updatedTask := ScheduledTask{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(updatedTask)
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify pve-3 is now at heap root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-3" {
t.Errorf("heap root = %s, want pve-3 (updated to earliest)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert changes heap position - later NextRun", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Update pve-1 to have latest NextRun
updatedTask := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(40 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(updatedTask)
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify pve-2 is now at heap root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (pve-1 moved down)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert with priority ordering", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
// Same NextRun, different priorities
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 0.5,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Higher priority should be at root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (higher priority)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert with same time and priority - uses instance name for tiebreak", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
sameTime := now.Add(10 * time.Second)
// Same NextRun, same priorities - should use InstanceName for tiebreak
tasks := []ScheduledTask{
{
InstanceName: "pve-zebra",
InstanceType: InstanceTypePVE,
NextRun: sameTime,
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-alpha",
InstanceType: InstanceTypePVE,
NextRun: sameTime,
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-middle",
InstanceType: InstanceTypePVE,
NextRun: sameTime,
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// When NextRun and Priority are equal, alphabetically earlier name should be first
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-alpha" {
t.Errorf("heap root = %s, want pve-alpha (alphabetically first when time and priority equal)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
}
func TestTaskQueue_Remove(t *testing.T) {
t.Run("remove from empty queue", func(t *testing.T) {
queue := NewTaskQueue()
// Should not panic
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 0 {
t.Errorf("Size() = %d, want 0", queue.Size())
}
})
t.Run("remove non-existent key", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
// Remove different instance
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1 (pve-1 should still exist)", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("remove only entry", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 0 {
t.Errorf("Size() = %d, want 0", queue.Size())
}
queue.mu.Lock()
if len(queue.entries) != 0 {
t.Errorf("entries map has %d items, want 0", len(queue.entries))
}
if len(queue.heap) != 0 {
t.Errorf("heap has %d items, want 0", len(queue.heap))
}
queue.mu.Unlock()
})
t.Run("remove entry from middle of queue", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove middle entry
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 2 {
t.Errorf("Size() = %d, want 2", queue.Size())
}
// Verify pve-2 is not in entries map
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-2")
if _, exists := queue.entries[key]; exists {
t.Errorf("pve-2 still exists in entries map")
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("remove heap root", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove root (pve-1 has earliest NextRun)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 2 {
t.Errorf("Size() = %d, want 2", queue.Size())
}
// Verify pve-2 is now the root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (next earliest)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("remove all entries sequentially", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove each entry and verify invariant after each removal
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 2 {
t.Errorf("Size after removing pve-2 = %d, want 2", queue.Size())
}
verifyHeapInvariant(t, queue)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 1 {
t.Errorf("Size after removing pve-1 = %d, want 1", queue.Size())
}
verifyHeapInvariant(t, queue)
queue.Remove(InstanceTypePVE, "pve-3")
if queue.Size() != 0 {
t.Errorf("Size after removing pve-3 = %d, want 0", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("remove different instance types", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: now.Add(20 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove PBS instance
queue.Remove(InstanceTypePBS, "pbs-1")
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1", queue.Size())
}
// Verify PVE instance still exists
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-1")
if _, exists := queue.entries[key]; !exists {
t.Errorf("pve-1 should still exist in entries map")
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
}
// verifyHeapInvariant checks that the heap maintains its invariants:
// 1. len(entries) matches heap size
// 2. Each entry's index matches its actual position in heap
// 3. Heap property: parent is less than or equal to children
func verifyHeapInvariant(t *testing.T, queue *TaskQueue) {
t.Helper()
queue.mu.Lock()
defer queue.mu.Unlock()
// Check entries count matches heap size
if len(queue.entries) != len(queue.heap) {
t.Errorf("entries count %d != heap size %d", len(queue.entries), len(queue.heap))
}
// Check each entry's index is correct
for _, entry := range queue.heap {
if entry.index < 0 || entry.index >= len(queue.heap) {
t.Errorf("entry %s has invalid index %d (heap size: %d)", entry.key(), entry.index, len(queue.heap))
continue
}
if queue.heap[entry.index] != entry {
t.Errorf("entry %s has index %d but is not at that position in heap", entry.key(), entry.index)
}
}
// Check all entries in map are also in heap
for key, entry := range queue.entries {
found := false
for _, heapEntry := range queue.heap {
if heapEntry == entry {
found = true
break
}
}
if !found {
t.Errorf("entry %s in map but not in heap", key)
}
}
// Check heap property: parent <= children
for i := 0; i < len(queue.heap); i++ {
leftChild := 2*i + 1
rightChild := 2*i + 2
if leftChild < len(queue.heap) {
if queue.heap.Less(leftChild, i) {
t.Errorf("heap violation: child at %d is less than parent at %d", leftChild, i)
}
}
if rightChild < len(queue.heap) {
if queue.heap.Less(rightChild, i) {
t.Errorf("heap violation: child at %d is less than parent at %d", rightChild, i)
}
}
}
}