mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-04-28 03:20:11 +00:00
Self-heal stale Proxmox auto-register markers (#1267)
This commit is contained in:
parent
9c2a56d351
commit
93475f3941
4 changed files with 311 additions and 5 deletions
|
|
@ -5330,12 +5330,63 @@ type AutoRegisterRequest struct {
|
|||
SetupCode string `json:"setupCode,omitempty"` // One-time setup code for authentication (deprecated)
|
||||
AuthToken string `json:"authToken,omitempty"` // Direct auth token from URL (new approach)
|
||||
Source string `json:"source,omitempty"` // "agent" or "script" - indicates how the node was registered
|
||||
// CheckRegistration asks Pulse whether this Proxmox type already exists in config.
|
||||
// Used by agents to validate stale local registration marker files after server reinstalls.
|
||||
CheckRegistration bool `json:"checkRegistration,omitempty"`
|
||||
// New secure fields
|
||||
RequestToken bool `json:"requestToken,omitempty"` // If true, Pulse will generate and return a token
|
||||
Username string `json:"username,omitempty"` // Username for creating token (e.g., "root@pam")
|
||||
Password string `json:"password,omitempty"` // Password for authentication (never stored)
|
||||
}
|
||||
|
||||
func autoRegisterNodeMatchesHost(nodeHost, requestedHost string) bool {
|
||||
nodeHost = strings.TrimSpace(nodeHost)
|
||||
requestedHost = strings.TrimSpace(requestedHost)
|
||||
if nodeHost == "" || requestedHost == "" {
|
||||
return false
|
||||
}
|
||||
if nodeHost == requestedHost {
|
||||
return true
|
||||
}
|
||||
|
||||
requestedIP := extractHostIP(requestedHost)
|
||||
if requestedIP == "" {
|
||||
requestedIP = resolveHostnameToIP(requestedHost)
|
||||
}
|
||||
if requestedIP == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
nodeIP := extractHostIP(nodeHost)
|
||||
if nodeIP == "" {
|
||||
nodeIP = resolveHostnameToIP(nodeHost)
|
||||
}
|
||||
return nodeIP != "" && nodeIP == requestedIP
|
||||
}
|
||||
|
||||
func (h *ConfigHandlers) autoRegisteredNodeExists(ctx context.Context, req *AutoRegisterRequest) bool {
|
||||
if req == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
switch strings.ToLower(strings.TrimSpace(req.Type)) {
|
||||
case "pve":
|
||||
for _, node := range h.getConfig(ctx).PVEInstances {
|
||||
if autoRegisterNodeMatchesHost(node.Host, req.Host) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
case "pbs":
|
||||
for _, node := range h.getConfig(ctx).PBSInstances {
|
||||
if autoRegisterNodeMatchesHost(node.Host, req.Host) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func refreshClusterCredentialsFromAutoRegister(instance *config.PVEInstance, nodeConfig NodeConfigRequest, req *AutoRegisterRequest) {
|
||||
if instance == nil {
|
||||
return
|
||||
|
|
@ -5547,6 +5598,27 @@ func (h *ConfigHandlers) HandleAutoRegister(w http.ResponseWriter, r *http.Reque
|
|||
Str("serverName", req.ServerName).
|
||||
Msg("Processing auto-register request")
|
||||
|
||||
if req.CheckRegistration {
|
||||
if req.Type == "" {
|
||||
http.Error(w, "Missing required fields", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
host := strings.TrimSpace(req.Host)
|
||||
if host != "" {
|
||||
if normalizedHost, err := normalizeNodeHost(host, req.Type); err == nil {
|
||||
host = normalizedHost
|
||||
}
|
||||
}
|
||||
req.Host = host
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"registered": h.autoRegisteredNodeExists(r.Context(), &req),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this is a new secure registration request
|
||||
if req.RequestToken {
|
||||
// New secure mode - generate token on Pulse side
|
||||
|
|
|
|||
|
|
@ -117,6 +117,67 @@ func TestHandleAutoRegisterAcceptsWithSetupToken(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHandleAutoRegister_CheckRegistration(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
t.Setenv("PULSE_DATA_DIR", tempDir)
|
||||
|
||||
cfg := &config.Config{
|
||||
DataPath: tempDir,
|
||||
ConfigPath: tempDir,
|
||||
PVEInstances: []config.PVEInstance{
|
||||
{
|
||||
Name: "pve.local",
|
||||
Host: "https://pve.local:8006",
|
||||
TokenName: "pulse-monitor@pam!pulse-test",
|
||||
Source: "agent",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
handler := newTestConfigHandlers(t, cfg)
|
||||
|
||||
const tokenValue = "TEMP-TOKEN-CHECK"
|
||||
tokenHash := internalauth.HashAPIToken(tokenValue)
|
||||
handler.codeMutex.Lock()
|
||||
handler.setupCodes[tokenHash] = &SetupCode{
|
||||
ExpiresAt: time.Now().Add(5 * time.Minute),
|
||||
NodeType: "pve",
|
||||
}
|
||||
handler.codeMutex.Unlock()
|
||||
|
||||
reqBody := AutoRegisterRequest{
|
||||
Type: "pve",
|
||||
Host: "https://pve.local:8006",
|
||||
ServerName: "pve.local",
|
||||
AuthToken: tokenValue,
|
||||
CheckRegistration: true,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to marshal request: %v", err)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/auto-register", bytes.NewReader(body))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
handler.HandleAutoRegister(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected status 200, got %d, body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
|
||||
var resp struct {
|
||||
Registered bool `json:"registered"`
|
||||
}
|
||||
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
if !resp.Registered {
|
||||
t.Fatalf("expected registered=true, got false")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleAutoRegisterAcceptsRecentlyUsedSetupToken(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
t.Setenv("PULSE_DATA_DIR", tempDir)
|
||||
|
|
|
|||
|
|
@ -40,6 +40,10 @@ type ProxmoxSetupResult struct {
|
|||
Registered bool // Whether the node was successfully registered with Pulse
|
||||
}
|
||||
|
||||
type registrationCheckResponse struct {
|
||||
Registered bool `json:"registered"`
|
||||
}
|
||||
|
||||
const (
|
||||
proxmoxUser = "pulse-monitor"
|
||||
proxmoxUserPVE = "pulse-monitor@pam"
|
||||
|
|
@ -342,10 +346,27 @@ func (p *ProxmoxSetup) RunAll(ctx context.Context) ([]*ProxmoxSetupResult, error
|
|||
|
||||
// runForType executes setup for a specific Proxmox type.
|
||||
func (p *ProxmoxSetup) runForType(ctx context.Context, ptype string) (*ProxmoxSetupResult, error) {
|
||||
hostURL := p.getHostURL(ptype)
|
||||
|
||||
// Check if this type is already registered
|
||||
if p.isTypeRegistered(ptype) {
|
||||
p.logger.Info().Str("type", ptype).Msg("Proxmox type already registered, skipping")
|
||||
return nil, nil
|
||||
registered, err := p.checkRegistrationWithPulse(ctx, ptype, hostURL)
|
||||
if err != nil {
|
||||
p.logger.Warn().
|
||||
Err(err).
|
||||
Str("type", ptype).
|
||||
Msg("Failed to verify Proxmox registration state with Pulse; keeping local marker behavior")
|
||||
return nil, nil
|
||||
}
|
||||
if registered {
|
||||
p.logger.Info().Str("type", ptype).Msg("Proxmox type already registered, skipping")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
p.logger.Info().
|
||||
Str("type", ptype).
|
||||
Str("host", hostURL).
|
||||
Msg("Local Proxmox registration marker exists but Pulse has no matching node; re-registering")
|
||||
}
|
||||
|
||||
p.logger.Info().Str("type", ptype).Msg("Setting up Proxmox type")
|
||||
|
|
@ -358,9 +379,6 @@ func (p *ProxmoxSetup) runForType(ctx context.Context, ptype string) (*ProxmoxSe
|
|||
|
||||
p.logger.Info().Str("type", ptype).Str("token_id", tokenID).Msg("Created Proxmox API token")
|
||||
|
||||
// Get the host URL for registration
|
||||
hostURL := p.getHostURL(ptype)
|
||||
|
||||
// Register with Pulse
|
||||
registered := false
|
||||
if err := p.registerWithPulse(ctx, ptype, hostURL, tokenID, tokenValue); err != nil {
|
||||
|
|
@ -380,6 +398,51 @@ func (p *ProxmoxSetup) runForType(ctx context.Context, ptype string) (*ProxmoxSe
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (p *ProxmoxSetup) checkRegistrationWithPulse(ctx context.Context, ptype, hostURL string) (bool, error) {
|
||||
payload := map[string]interface{}{
|
||||
"type": ptype,
|
||||
"host": hostURL,
|
||||
"serverName": p.hostname,
|
||||
"source": "agent",
|
||||
"checkRegistration": true,
|
||||
}
|
||||
if token := strings.TrimSpace(p.apiToken); token != "" {
|
||||
payload["authToken"] = token
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("marshal registration check payload: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.pulseURL+"/api/auto-register", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("create registration check request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if token := strings.TrimSpace(p.apiToken); token != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req.Header.Set("X-API-Token", token)
|
||||
}
|
||||
|
||||
resp, err := p.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("registration check request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
return false, fmt.Errorf("registration check returned HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(bodyBytes)))
|
||||
}
|
||||
|
||||
var result registrationCheckResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return false, fmt.Errorf("decode registration check response: %w", err)
|
||||
}
|
||||
return result.Registered, nil
|
||||
}
|
||||
|
||||
// detectProxmoxType checks for pvesh (PVE) or proxmox-backup-manager (PBS).
|
||||
// For backward compatibility, returns the first detected type.
|
||||
// Use detectProxmoxTypes() to get all detected types.
|
||||
|
|
@ -627,6 +690,10 @@ func (p *ProxmoxSetup) getIPThatReachesPulse() string {
|
|||
p.logger.Debug().Err(err).Str("target", host).Msg("Could not determine local IP for Pulse connection (routing check failed)")
|
||||
return ""
|
||||
}
|
||||
if conn == nil {
|
||||
p.logger.Debug().Str("target", host).Msg("Could not determine local IP for Pulse connection (routing check returned nil connection)")
|
||||
return ""
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
localAddr, ok := conn.LocalAddr().(*net.UDPAddr)
|
||||
|
|
|
|||
|
|
@ -572,6 +572,112 @@ func TestProxmoxSetup_RunAll(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestProxmoxSetup_RunAll_ReRegistersWhenPulseHasNoMatchingNode(t *testing.T) {
|
||||
mc := &mockCollector{}
|
||||
requestKinds := make([]string, 0, 2)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var payload map[string]interface{}
|
||||
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
||||
t.Fatalf("decode payload: %v", err)
|
||||
}
|
||||
if check, _ := payload["checkRegistration"].(bool); check {
|
||||
requestKinds = append(requestKinds, "check")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{"registered":false}`))
|
||||
return
|
||||
}
|
||||
requestKinds = append(requestKinds, "register")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
p := NewProxmoxSetup(zerolog.Nop(), server.Client(), mc, server.URL, "token", "", "host", "", false)
|
||||
mc.lookPathFn = func(file string) (string, error) {
|
||||
if file == "pvesh" {
|
||||
return "/bin/pvesh", nil
|
||||
}
|
||||
return "", os.ErrNotExist
|
||||
}
|
||||
mc.statFn = func(name string) (os.FileInfo, error) {
|
||||
if name == stateFilePath {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
mc.commandCombinedOutputFn = func(ctx context.Context, name string, arg ...string) (string, error) {
|
||||
if name == "pveum" && len(arg) > 2 && arg[1] == "token" {
|
||||
return "│ value │ v1 │", nil
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
mc.mkdirAllFn = func(p string, m os.FileMode) error { return nil }
|
||||
mc.writeFileFn = func(f string, d []byte, m os.FileMode) error { return nil }
|
||||
mc.dialTimeoutFn = func(n, a string, d time.Duration) (net.Conn, error) {
|
||||
return &mockConn{localAddr: &net.UDPAddr{IP: net.ParseIP("10.0.0.1")}}, nil
|
||||
}
|
||||
|
||||
results, err := p.RunAll(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("RunAll: %v", err)
|
||||
}
|
||||
if len(results) != 1 || !results[0].Registered || results[0].ProxmoxType != "pve" {
|
||||
t.Fatalf("expected one re-registered pve result, got %#v", results)
|
||||
}
|
||||
if strings.Join(requestKinds, ",") != "check,register" {
|
||||
t.Fatalf("unexpected request sequence: %v", requestKinds)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxmoxSetup_RunAll_SkipsWhenPulseConfirmsRegistration(t *testing.T) {
|
||||
mc := &mockCollector{}
|
||||
requestKinds := make([]string, 0, 1)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var payload map[string]interface{}
|
||||
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
||||
t.Fatalf("decode payload: %v", err)
|
||||
}
|
||||
if check, _ := payload["checkRegistration"].(bool); check {
|
||||
requestKinds = append(requestKinds, "check")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{"registered":true}`))
|
||||
return
|
||||
}
|
||||
requestKinds = append(requestKinds, "register")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
p := NewProxmoxSetup(zerolog.Nop(), server.Client(), mc, server.URL, "token", "", "host", "", false)
|
||||
mc.lookPathFn = func(file string) (string, error) {
|
||||
if file == "pvesh" {
|
||||
return "/bin/pvesh", nil
|
||||
}
|
||||
return "", os.ErrNotExist
|
||||
}
|
||||
mc.statFn = func(name string) (os.FileInfo, error) {
|
||||
if name == stateFilePath {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
mc.dialTimeoutFn = func(n, a string, d time.Duration) (net.Conn, error) {
|
||||
return &mockConn{localAddr: &net.UDPAddr{IP: net.ParseIP("10.0.0.1")}}, nil
|
||||
}
|
||||
|
||||
results, err := p.RunAll(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("RunAll: %v", err)
|
||||
}
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("expected no results when registration already exists, got %#v", results)
|
||||
}
|
||||
if strings.Join(requestKinds, ",") != "check" {
|
||||
t.Fatalf("unexpected request sequence: %v", requestKinds)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsTypeRegistered_Legacy(t *testing.T) {
|
||||
mc := &mockCollector{}
|
||||
p := &ProxmoxSetup{collector: mc}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue