mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-19 07:54:10 +00:00
registerWithPulse() was a one-shot call at agent startup — if it failed (timing, transient network, Pulse not ready), the agent silently continued as a generic Host forever. Wrap the HTTP POST in a retry loop with exponential backoff (5s, 10s, 20s, 40s, 60s) and distinguish 4xx errors (no retry) from 5xx/network errors (retry).
This commit is contained in:
parent
9d81de9fa5
commit
00afaec2ae
2 changed files with 172 additions and 14 deletions
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
|
@ -27,6 +28,7 @@ type ProxmoxSetup struct {
|
|||
reportIP string
|
||||
insecureSkipVerify bool
|
||||
collector SystemCollector
|
||||
retryBackoffs []time.Duration // overridable for testing; nil uses defaults
|
||||
}
|
||||
|
||||
// ProxmoxSetupResult contains the result of a successful Proxmox setup.
|
||||
|
|
@ -640,6 +642,41 @@ func scoreIPv4(ip string) int {
|
|||
}
|
||||
}
|
||||
|
||||
// clientError represents a non-retryable HTTP client error (4xx).
|
||||
type clientError struct {
|
||||
statusCode int
|
||||
body string
|
||||
}
|
||||
|
||||
func (e *clientError) Error() string {
|
||||
return fmt.Sprintf("auto-register returned HTTP %d: %s", e.statusCode, e.body)
|
||||
}
|
||||
|
||||
func (p *ProxmoxSetup) doRegisterRequest(ctx context.Context, body []byte) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.pulseURL+"/api/auto-register", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-API-Token", p.apiToken)
|
||||
|
||||
resp, err := p.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
bodyStr := strings.TrimSpace(string(bodyBytes))
|
||||
if resp.StatusCode < 500 {
|
||||
return &clientError{statusCode: resp.StatusCode, body: bodyStr}
|
||||
}
|
||||
return fmt.Errorf("auto-register returned HTTP %d: %s", resp.StatusCode, bodyStr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// registerWithPulse calls the auto-register endpoint to add the node.
|
||||
func (p *ProxmoxSetup) registerWithPulse(ctx context.Context, ptype, hostURL, tokenID, tokenValue string) error {
|
||||
payload := map[string]interface{}{
|
||||
|
|
@ -656,26 +693,50 @@ func (p *ProxmoxSetup) registerWithPulse(ctx context.Context, ptype, hostURL, to
|
|||
return fmt.Errorf("marshal payload: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.pulseURL+"/api/auto-register", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
backoffs := p.retryBackoffs
|
||||
if backoffs == nil {
|
||||
backoffs = []time.Duration{5 * time.Second, 10 * time.Second, 20 * time.Second, 40 * time.Second, 60 * time.Second}
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-API-Token", p.apiToken)
|
||||
var lastErr error
|
||||
|
||||
resp, err := p.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
for attempt := 0; attempt <= len(backoffs); attempt++ {
|
||||
if attempt > 0 {
|
||||
p.logger.Info().
|
||||
Int("attempt", attempt+1).
|
||||
Int("max_attempts", len(backoffs)+1).
|
||||
Str("type", ptype).
|
||||
Msg("Retrying Proxmox auto-registration with Pulse")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(backoffs[attempt-1]):
|
||||
}
|
||||
}
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
return fmt.Errorf("auto-register returned HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(bodyBytes)))
|
||||
err := p.doRegisterRequest(ctx, body)
|
||||
if err == nil {
|
||||
if attempt > 0 {
|
||||
p.logger.Info().Int("attempt", attempt+1).Str("type", ptype).Msg("Proxmox auto-registration succeeded after retry")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
|
||||
// Don't retry client errors (4xx) - they won't self-resolve.
|
||||
var ce *clientError
|
||||
if errors.As(err, &ce) {
|
||||
return err
|
||||
}
|
||||
|
||||
p.logger.Warn().Err(err).
|
||||
Int("attempt", attempt+1).
|
||||
Int("max_attempts", len(backoffs)+1).
|
||||
Str("type", ptype).
|
||||
Msg("Proxmox auto-registration attempt failed")
|
||||
}
|
||||
|
||||
return nil
|
||||
return fmt.Errorf("all %d registration attempts failed: %w", len(backoffs)+1, lastErr)
|
||||
}
|
||||
|
||||
// isAlreadyRegistered checks if we've already done Proxmox setup.
|
||||
|
|
|
|||
|
|
@ -2,12 +2,14 @@ package hostagent
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -234,6 +236,99 @@ func TestProxmoxSetup_RunForType(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestRegisterWithPulseRetry(t *testing.T) {
|
||||
// Server returns 503 twice, then 200
|
||||
var attempt int32
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
n := atomic.AddInt32(&attempt, 1)
|
||||
if n <= 2 {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
p := &ProxmoxSetup{
|
||||
logger: zerolog.Nop(),
|
||||
httpClient: server.Client(),
|
||||
pulseURL: server.URL,
|
||||
apiToken: "test-token",
|
||||
hostname: "test-host",
|
||||
retryBackoffs: []time.Duration{10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond},
|
||||
}
|
||||
|
||||
err := p.registerWithPulse(context.Background(), "pve", "https://10.0.0.1:8006", "user!token", "secret")
|
||||
if err != nil {
|
||||
t.Fatalf("expected success after retries, got: %v", err)
|
||||
}
|
||||
if atomic.LoadInt32(&attempt) != 3 {
|
||||
t.Errorf("expected 3 attempts, got %d", atomic.LoadInt32(&attempt))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterWithPulseNoRetryOn4xx(t *testing.T) {
|
||||
// Server returns 401 - should NOT retry
|
||||
var attempt int32
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&attempt, 1)
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
_, _ = w.Write([]byte("invalid token"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
p := &ProxmoxSetup{
|
||||
logger: zerolog.Nop(),
|
||||
httpClient: server.Client(),
|
||||
pulseURL: server.URL,
|
||||
apiToken: "bad-token",
|
||||
hostname: "test-host",
|
||||
retryBackoffs: []time.Duration{10 * time.Millisecond, 10 * time.Millisecond},
|
||||
}
|
||||
|
||||
err := p.registerWithPulse(context.Background(), "pve", "https://10.0.0.1:8006", "user!token", "secret")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for 401")
|
||||
}
|
||||
if atomic.LoadInt32(&attempt) != 1 {
|
||||
t.Errorf("expected exactly 1 attempt (no retry on 4xx), got %d", atomic.LoadInt32(&attempt))
|
||||
}
|
||||
// Verify it's a clientError
|
||||
var ce *clientError
|
||||
if !errors.As(err, &ce) {
|
||||
t.Errorf("expected clientError, got %T: %v", err, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterWithPulseAllRetriesFail(t *testing.T) {
|
||||
// Server always returns 503
|
||||
var attempt int32
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&attempt, 1)
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
_, _ = w.Write([]byte("service unavailable"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
p := &ProxmoxSetup{
|
||||
logger: zerolog.Nop(),
|
||||
httpClient: server.Client(),
|
||||
pulseURL: server.URL,
|
||||
apiToken: "test-token",
|
||||
hostname: "test-host",
|
||||
retryBackoffs: []time.Duration{10 * time.Millisecond, 10 * time.Millisecond},
|
||||
}
|
||||
|
||||
err := p.registerWithPulse(context.Background(), "pve", "https://10.0.0.1:8006", "user!token", "secret")
|
||||
if err == nil {
|
||||
t.Fatal("expected error when all retries fail")
|
||||
}
|
||||
// 3 attempts: initial + 2 retries (len(retryBackoffs) = 2)
|
||||
if atomic.LoadInt32(&attempt) != 3 {
|
||||
t.Errorf("expected 3 attempts, got %d", atomic.LoadInt32(&attempt))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxmoxSetup_PVEPrivilegeProbe_FallsBackToGuestAgentAudit(t *testing.T) {
|
||||
mc := &mockCollector{}
|
||||
var pulseMonitorPrivs string
|
||||
|
|
@ -338,6 +433,8 @@ func TestProxmoxSetup_RunAll(t *testing.T) {
|
|||
return "", nil
|
||||
}
|
||||
|
||||
p.httpClient = &http.Client{Transport: &mockTransport{statusCode: 200}}
|
||||
|
||||
results, _ := p.RunAll(context.Background())
|
||||
if len(results) != 1 || results[0].ProxmoxType != "pbs" {
|
||||
t.Errorf("expected pbs result")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue