Pulse/internal/monitoring/monitor_pbs_coverage_test.go
rcourtman 9c3d96cab2 Add unified connections API (list + probe) with Disabled flag
Introduces GET /api/connections and POST /api/connections/probe as the
backend half of the one-ledger / one-editor connection redesign.

- GET /api/connections aggregates PVE/PBS/PMG/VMware/TrueNAS/agent rows
  into a unified Connection shape with derived state (active, paused,
  unauthorized, unreachable, stale, pending) computed from in-memory
  scheduler health plus agent Host.LastSeen. No new persisted state.
- POST /api/connections/probe fingerprints a host across the five
  supported products in parallel (2s dial + 1s read, 3s total, max 5
  concurrent). Admin-gated (RequireAdmin + ScopeSettingsWrite) to block
  unauthenticated SSRF against internal hosts.
- Disabled bool on PVEInstance/PBSInstance/PMGInstance (zero-value =
  enabled, preserves existing nodes.json); pollers skip disabled
  instances at client init, reconnect, and per-node iteration.
- NodeConfigRequest/Response gain Enabled; write path translates
  *bool -> Disabled so omitted field leaves state untouched.
- ConnectionsAPI frontend client (list/probe) typed off the Go shape.

Contracts updated: api-contracts, monitoring, agent-lifecycle,
performance-and-scalability, storage-recovery. Proofs added:
contract_test.go JSON snapshot for Connection and ProbeResponse,
monitoring guardrails for the Disabled-skip behavior, and a vitest
mock-client test for ConnectionsAPI.

Frontend editor / drawer / table rewrite lands in a separate block.
2026-04-19 11:42:53 +01:00

231 lines
7 KiB
Go

package monitoring
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/pkg/pbs"
)
func TestMonitor_PollPBSInstance_AuthFailure(t *testing.T) {
// Setup mock server that returns 401
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
}))
defer server.Close()
// Setup client
client, err := pbs.NewClient(pbs.ClientConfig{
Host: server.URL,
TokenName: "root@pam!token",
TokenValue: "secret",
})
if err != nil {
t.Fatal(err)
}
// Setup monitor
m := &Monitor{
config: &config.Config{
PBSInstances: []config.PBSInstance{
{Name: "pbs-auth-fail", Host: server.URL, MonitorDatastores: true},
},
},
state: models.NewState(),
authFailures: make(map[string]int),
lastAuthAttempt: make(map[string]time.Time),
pollStatusMap: make(map[string]*pollStatus),
circuitBreakers: make(map[string]*circuitBreaker),
// We need connectionHealth map initialized if SetConnectionHealth uses it?
// models.NewState() handles it.
}
// Execute
ctx := context.Background()
m.pollPBSInstance(ctx, "pbs-auth-fail", client)
// Verify
// status should be offline
// recordAuthFailure should have been called?
// Monitor stores auth failures in memory map `authFailures`.
// We can check `m.state.ConnectionHealth` for "pbs-pbs-auth-fail".
// Verify manually using snapshot
snapshot := m.state.GetSnapshot()
if snapshot.ConnectionHealth["pbs-pbs-auth-fail"] {
t.Error("Expected connection health to be false")
}
// We can't easily check authFailures map as it is private and no getter (except checking if it backs off?)
}
func TestMonitor_PollPBSInstance_DatastoreDetails(t *testing.T) {
// Setup mock server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "/version") {
json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]interface{}{"version": "2.0"},
})
return
}
if strings.Contains(r.URL.Path, "/nodes/localhost/status") {
// Fail node status
w.WriteHeader(http.StatusInternalServerError)
return
}
if strings.Contains(r.URL.Path, "/admin/datastore") && strings.HasSuffix(r.URL.Path, "/admin/datastore") {
// GetDatastores list
json.NewEncoder(w).Encode(map[string]interface{}{
"data": []map[string]interface{}{
{"store": "ds1", "comment": "comment1"}, // GetDatastores list returns small subset of fields
{"store": "ds2", "comment": "comment2"},
},
})
return
}
if strings.Contains(r.URL.Path, "/status") {
// Datastore Status
var data map[string]interface{}
if strings.Contains(r.URL.Path, "ds1") {
data = map[string]interface{}{"total": 100.0, "used": 50.0, "avail": 50.0}
} else if strings.Contains(r.URL.Path, "ds2") {
data = map[string]interface{}{"total-space": 200.0, "used-space": 100.0, "avail-space": 100.0, "deduplication-factor": 1.5}
}
json.NewEncoder(w).Encode(map[string]interface{}{"data": data})
return
}
if strings.Contains(r.URL.Path, "/rrd") {
// RRD
json.NewEncoder(w).Encode(map[string]interface{}{"data": []interface{}{}})
return
}
if strings.Contains(r.URL.Path, "/namespace") {
// ListNamespaces
if strings.Contains(r.URL.Path, "ds1") {
// DS 1: Fail namespaces
w.WriteHeader(http.StatusInternalServerError)
return
}
if strings.Contains(r.URL.Path, "ds2") {
// DS 2: Varied namespaces
json.NewEncoder(w).Encode(map[string]interface{}{
"data": []map[string]interface{}{
{"ns": "ns1"},
{"path": "ns2"}, // alternate field
{"name": "ns3"}, // alternate field
},
})
return
}
}
// Catch-all success for rrd/status calls from client.GetDatastores (it calls internal methods)
// Wait, client.GetDatastores calls /api2/json/admin/datastore
// client.ListNamespaces calls /api2/json/admin/datastore/{store}/namespace?
// No, client.ListNamespaces: req to /admin/datastore/%s/namespace
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{"data": []interface{}{}})
}))
defer server.Close()
client, err := pbs.NewClient(pbs.ClientConfig{Host: server.URL, TokenName: "root@pam!token", TokenValue: "val"})
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
m := &Monitor{
config: &config.Config{
PBSInstances: []config.PBSInstance{
{Name: "pbs-details", Host: server.URL, MonitorDatastores: true},
},
},
state: models.NewState(),
authFailures: make(map[string]int),
lastAuthAttempt: make(map[string]time.Time),
pollStatusMap: make(map[string]*pollStatus),
circuitBreakers: make(map[string]*circuitBreaker),
}
m.pollPBSInstance(context.Background(), "pbs-details", client)
// Verify State
snapshot := m.state.GetSnapshot()
var inst *models.PBSInstance
for _, i := range snapshot.PBSInstances {
if i.Name == "pbs-details" {
copy := i
inst = &copy
break
}
}
if inst == nil {
t.Fatal("Instance not found")
}
if len(inst.Datastores) != 2 {
t.Errorf("Expected 2 datastores, got %d", len(inst.Datastores))
}
// Check DS2 size calculation
var ds2 *models.PBSDatastore
for _, ds := range inst.Datastores {
if ds.Name == "ds2" {
copy := ds
ds2 = &copy
break
}
}
if ds2 != nil {
if ds2.Total != 200 {
t.Errorf("Expected DS2 total 200, got %d", ds2.Total)
}
if len(ds2.Namespaces) != 4 {
t.Errorf("Expected 4 namespaces for DS2, got %d", len(ds2.Namespaces))
}
} else {
t.Error("DS2 not found")
}
}
// TestPBSAndPMGPollSkipDisabledInstances asserts that the PBS and PMG poll
// entry points short-circuit when their resolved instance config carries
// `Disabled: true`. This is a source-level guardrail for the discovery
// provider surface: the unified connections ledger surfaces `Disabled` as
// `paused`, and the PBS/PMG pollers must not drive live API calls or
// surface ingest while that flag is set, across restarts or reloads.
func TestPBSAndPMGPollSkipDisabledInstances(t *testing.T) {
data, err := os.ReadFile("monitor_pbs_pmg.go")
if err != nil {
t.Fatalf("failed to read monitor_pbs_pmg.go: %v", err)
}
source := string(data)
// Both PBS and PMG poll flows must explicitly guard on Disabled.
if count := strings.Count(source, "if instanceCfg.Disabled {"); count < 2 {
t.Fatalf("monitor_pbs_pmg.go must contain the Disabled-skip guard in both PBS and PMG poll entry points; found %d", count)
}
// The guards must short-circuit the poll with an early return so no
// downstream API client is constructed for a paused instance.
for _, snippet := range []string{
"Skipping PBS poll: instance is paused",
"Skipping PMG poll: instance is paused",
} {
if !strings.Contains(source, snippet) {
t.Fatalf("monitor_pbs_pmg.go must emit debug-log %q when skipping a disabled instance so operators can correlate paused ledger rows with runtime behavior", snippet)
}
}
}