package monitoring import ( "testing" "time" ) func TestTaskQueue_Snapshot(t *testing.T) { tests := []struct { name string tasks []ScheduledTask wantDepth int wantDueWithinSeconds int wantPerType map[string]int }{ { name: "empty queue", tasks: []ScheduledTask{}, wantDepth: 0, wantDueWithinSeconds: 0, wantPerType: map[string]int{}, }, { name: "single task due soon (within 12 seconds)", tasks: []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(5 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, }, wantDepth: 1, wantDueWithinSeconds: 1, wantPerType: map[string]int{ "pve": 1, }, }, { name: "single task NOT due soon (>12 seconds away)", tasks: []ScheduledTask{ { InstanceName: "pbs-1", InstanceType: InstanceTypePBS, NextRun: time.Now().Add(20 * time.Second), Interval: 60 * time.Second, Priority: 1.0, }, }, wantDepth: 1, wantDueWithinSeconds: 0, wantPerType: map[string]int{ "pbs": 1, }, }, { name: "multiple tasks of same type", tasks: []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(5 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(15 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-3", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, }, wantDepth: 3, wantDueWithinSeconds: 2, // pve-1 and pve-3 are within 12 seconds wantPerType: map[string]int{ "pve": 3, }, }, { name: "multiple tasks of different types", tasks: []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(5 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pbs-1", InstanceType: InstanceTypePBS, NextRun: time.Now().Add(8 * time.Second), Interval: 60 * time.Second, Priority: 1.0, }, { InstanceName: "pmg-1", InstanceType: InstanceTypePMG, NextRun: time.Now().Add(25 * time.Second), Interval: 45 * time.Second, Priority: 1.0, }, }, wantDepth: 3, wantDueWithinSeconds: 2, // pve-1 and pbs-1 are within 12 seconds wantPerType: map[string]int{ "pve": 1, "pbs": 1, "pmg": 1, }, }, { name: "boundary case: task exactly 12 seconds away", tasks: []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(12 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, }, wantDepth: 1, wantDueWithinSeconds: 1, // <= 12 seconds should be included wantPerType: map[string]int{ "pve": 1, }, }, { name: "mix of due-soon and not-due-soon tasks", tasks: []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(1 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(50 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pbs-1", InstanceType: InstanceTypePBS, NextRun: time.Now().Add(11 * time.Second), Interval: 60 * time.Second, Priority: 1.0, }, { InstanceName: "pbs-2", InstanceType: InstanceTypePBS, NextRun: time.Now().Add(100 * time.Second), Interval: 60 * time.Second, Priority: 1.0, }, { InstanceName: "pmg-1", InstanceType: InstanceTypePMG, NextRun: time.Now().Add(30 * time.Second), Interval: 45 * time.Second, Priority: 1.0, }, }, wantDepth: 5, wantDueWithinSeconds: 2, // pve-1 and pbs-1 are within 12 seconds wantPerType: map[string]int{ "pve": 2, "pbs": 2, "pmg": 1, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { queue := NewTaskQueue() // Add all tasks to the queue for _, task := range tt.tasks { queue.Upsert(task) } // Get snapshot snapshot := queue.Snapshot() // Verify depth if snapshot.Depth != tt.wantDepth { t.Errorf("Depth = %d, want %d", snapshot.Depth, tt.wantDepth) } // Verify DueWithinSeconds if snapshot.DueWithinSeconds != tt.wantDueWithinSeconds { t.Errorf("DueWithinSeconds = %d, want %d", snapshot.DueWithinSeconds, tt.wantDueWithinSeconds) } // Verify PerType map if len(snapshot.PerType) != len(tt.wantPerType) { t.Errorf("PerType has %d entries, want %d", len(snapshot.PerType), len(tt.wantPerType)) } for typeStr, wantCount := range tt.wantPerType { gotCount, ok := snapshot.PerType[typeStr] if !ok { t.Errorf("PerType missing entry for %s", typeStr) continue } if gotCount != wantCount { t.Errorf("PerType[%s] = %d, want %d", typeStr, gotCount, wantCount) } } // Verify no extra keys in PerType for typeStr := range snapshot.PerType { if _, ok := tt.wantPerType[typeStr]; !ok { t.Errorf("PerType has unexpected entry for %s", typeStr) } } }) } } func TestTaskQueue_Upsert(t *testing.T) { t.Run("insert into empty queue", func(t *testing.T) { queue := NewTaskQueue() task := ScheduledTask{ InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, } queue.Upsert(task) if queue.Size() != 1 { t.Errorf("Size() = %d, want 1", queue.Size()) } verifyHeapInvariant(t, queue) }) t.Run("upsert existing entry with different NextRun", func(t *testing.T) { queue := NewTaskQueue() task1 := ScheduledTask{ InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, } queue.Upsert(task1) // Update same task with different NextRun task2 := ScheduledTask{ InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(20 * time.Second), Interval: 30 * time.Second, Priority: 1.0, } queue.Upsert(task2) if queue.Size() != 1 { t.Errorf("Size() = %d, want 1 (not 2)", queue.Size()) } // Verify the task was updated queue.mu.Lock() key := schedulerKey(InstanceTypePVE, "pve-1") entry := queue.entries[key] if !entry.task.NextRun.Equal(task2.NextRun) { t.Errorf("NextRun not updated: got %v, want %v", entry.task.NextRun, task2.NextRun) } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) t.Run("insert multiple entries - verify heap ordering", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() tasks := []ScheduledTask{ { InstanceName: "pve-3", InstanceType: InstanceTypePVE, NextRun: now.Add(30 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: now.Add(20 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } if queue.Size() != 3 { t.Errorf("Size() = %d, want 3", queue.Size()) } // Verify heap root is earliest NextRun queue.mu.Lock() root := queue.heap[0] if root.task.InstanceName != "pve-1" { t.Errorf("heap root = %s, want pve-1 (earliest NextRun)", root.task.InstanceName) } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) t.Run("upsert changes heap position - earlier NextRun", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() tasks := []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: now.Add(20 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-3", InstanceType: InstanceTypePVE, NextRun: now.Add(30 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } // Update pve-3 to have earliest NextRun updatedTask := ScheduledTask{ InstanceName: "pve-3", InstanceType: InstanceTypePVE, NextRun: now.Add(5 * time.Second), Interval: 30 * time.Second, Priority: 1.0, } queue.Upsert(updatedTask) if queue.Size() != 3 { t.Errorf("Size() = %d, want 3", queue.Size()) } // Verify pve-3 is now at heap root queue.mu.Lock() root := queue.heap[0] if root.task.InstanceName != "pve-3" { t.Errorf("heap root = %s, want pve-3 (updated to earliest)", root.task.InstanceName) } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) t.Run("upsert changes heap position - later NextRun", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() tasks := []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: now.Add(20 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-3", InstanceType: InstanceTypePVE, NextRun: now.Add(30 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } // Update pve-1 to have latest NextRun updatedTask := ScheduledTask{ InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(40 * time.Second), Interval: 30 * time.Second, Priority: 1.0, } queue.Upsert(updatedTask) if queue.Size() != 3 { t.Errorf("Size() = %d, want 3", queue.Size()) } // Verify pve-2 is now at heap root queue.mu.Lock() root := queue.heap[0] if root.task.InstanceName != "pve-2" { t.Errorf("heap root = %s, want pve-2 (pve-1 moved down)", root.task.InstanceName) } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) t.Run("upsert with priority ordering", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() // Same NextRun, different priorities tasks := []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 0.5, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } // Higher priority should be at root queue.mu.Lock() root := queue.heap[0] if root.task.InstanceName != "pve-2" { t.Errorf("heap root = %s, want pve-2 (higher priority)", root.task.InstanceName) } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) t.Run("upsert with same time and priority - uses instance name for tiebreak", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() sameTime := now.Add(10 * time.Second) // Same NextRun, same priorities - should use InstanceName for tiebreak tasks := []ScheduledTask{ { InstanceName: "pve-zebra", InstanceType: InstanceTypePVE, NextRun: sameTime, Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-alpha", InstanceType: InstanceTypePVE, NextRun: sameTime, Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-middle", InstanceType: InstanceTypePVE, NextRun: sameTime, Interval: 30 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } // When NextRun and Priority are equal, alphabetically earlier name should be first queue.mu.Lock() root := queue.heap[0] if root.task.InstanceName != "pve-alpha" { t.Errorf("heap root = %s, want pve-alpha (alphabetically first when time and priority equal)", root.task.InstanceName) } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) } func TestTaskQueue_Remove(t *testing.T) { t.Run("remove from empty queue", func(t *testing.T) { queue := NewTaskQueue() // Should not panic queue.Remove(InstanceTypePVE, "pve-1") if queue.Size() != 0 { t.Errorf("Size() = %d, want 0", queue.Size()) } }) t.Run("remove non-existent key", func(t *testing.T) { queue := NewTaskQueue() task := ScheduledTask{ InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, } queue.Upsert(task) // Remove different instance queue.Remove(InstanceTypePVE, "pve-2") if queue.Size() != 1 { t.Errorf("Size() = %d, want 1 (pve-1 should still exist)", queue.Size()) } verifyHeapInvariant(t, queue) }) t.Run("remove only entry", func(t *testing.T) { queue := NewTaskQueue() task := ScheduledTask{ InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: time.Now().Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, } queue.Upsert(task) queue.Remove(InstanceTypePVE, "pve-1") if queue.Size() != 0 { t.Errorf("Size() = %d, want 0", queue.Size()) } queue.mu.Lock() if len(queue.entries) != 0 { t.Errorf("entries map has %d items, want 0", len(queue.entries)) } if len(queue.heap) != 0 { t.Errorf("heap has %d items, want 0", len(queue.heap)) } queue.mu.Unlock() }) t.Run("remove entry from middle of queue", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() tasks := []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: now.Add(20 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-3", InstanceType: InstanceTypePVE, NextRun: now.Add(30 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } // Remove middle entry queue.Remove(InstanceTypePVE, "pve-2") if queue.Size() != 2 { t.Errorf("Size() = %d, want 2", queue.Size()) } // Verify pve-2 is not in entries map queue.mu.Lock() key := schedulerKey(InstanceTypePVE, "pve-2") if _, exists := queue.entries[key]; exists { t.Errorf("pve-2 still exists in entries map") } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) t.Run("remove heap root", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() tasks := []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: now.Add(20 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-3", InstanceType: InstanceTypePVE, NextRun: now.Add(30 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } // Remove root (pve-1 has earliest NextRun) queue.Remove(InstanceTypePVE, "pve-1") if queue.Size() != 2 { t.Errorf("Size() = %d, want 2", queue.Size()) } // Verify pve-2 is now the root queue.mu.Lock() root := queue.heap[0] if root.task.InstanceName != "pve-2" { t.Errorf("heap root = %s, want pve-2 (next earliest)", root.task.InstanceName) } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) t.Run("remove all entries sequentially", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() tasks := []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-2", InstanceType: InstanceTypePVE, NextRun: now.Add(20 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pve-3", InstanceType: InstanceTypePVE, NextRun: now.Add(30 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } // Remove each entry and verify invariant after each removal queue.Remove(InstanceTypePVE, "pve-2") if queue.Size() != 2 { t.Errorf("Size after removing pve-2 = %d, want 2", queue.Size()) } verifyHeapInvariant(t, queue) queue.Remove(InstanceTypePVE, "pve-1") if queue.Size() != 1 { t.Errorf("Size after removing pve-1 = %d, want 1", queue.Size()) } verifyHeapInvariant(t, queue) queue.Remove(InstanceTypePVE, "pve-3") if queue.Size() != 0 { t.Errorf("Size after removing pve-3 = %d, want 0", queue.Size()) } verifyHeapInvariant(t, queue) }) t.Run("remove different instance types", func(t *testing.T) { queue := NewTaskQueue() now := time.Now() tasks := []ScheduledTask{ { InstanceName: "pve-1", InstanceType: InstanceTypePVE, NextRun: now.Add(10 * time.Second), Interval: 30 * time.Second, Priority: 1.0, }, { InstanceName: "pbs-1", InstanceType: InstanceTypePBS, NextRun: now.Add(20 * time.Second), Interval: 60 * time.Second, Priority: 1.0, }, } for _, task := range tasks { queue.Upsert(task) } // Remove PBS instance queue.Remove(InstanceTypePBS, "pbs-1") if queue.Size() != 1 { t.Errorf("Size() = %d, want 1", queue.Size()) } // Verify PVE instance still exists queue.mu.Lock() key := schedulerKey(InstanceTypePVE, "pve-1") if _, exists := queue.entries[key]; !exists { t.Errorf("pve-1 should still exist in entries map") } queue.mu.Unlock() verifyHeapInvariant(t, queue) }) } // verifyHeapInvariant checks that the heap maintains its invariants: // 1. len(entries) matches heap size // 2. Each entry's index matches its actual position in heap // 3. Heap property: parent is less than or equal to children func verifyHeapInvariant(t *testing.T, queue *TaskQueue) { t.Helper() queue.mu.Lock() defer queue.mu.Unlock() // Check entries count matches heap size if len(queue.entries) != len(queue.heap) { t.Errorf("entries count %d != heap size %d", len(queue.entries), len(queue.heap)) } // Check each entry's index is correct for _, entry := range queue.heap { if entry.index < 0 || entry.index >= len(queue.heap) { t.Errorf("entry %s has invalid index %d (heap size: %d)", entry.key(), entry.index, len(queue.heap)) continue } if queue.heap[entry.index] != entry { t.Errorf("entry %s has index %d but is not at that position in heap", entry.key(), entry.index) } } // Check all entries in map are also in heap for key, entry := range queue.entries { found := false for _, heapEntry := range queue.heap { if heapEntry == entry { found = true break } } if !found { t.Errorf("entry %s in map but not in heap", key) } } // Check heap property: parent <= children for i := 0; i < len(queue.heap); i++ { leftChild := 2*i + 1 rightChild := 2*i + 2 if leftChild < len(queue.heap) { if queue.heap.Less(leftChild, i) { t.Errorf("heap violation: child at %d is less than parent at %d", leftChild, i) } } if rightChild < len(queue.heap) { if queue.heap.Less(rightChild, i) { t.Errorf("heap violation: child at %d is less than parent at %d", rightChild, i) } } } }