diff --git a/internal/api/router.go b/internal/api/router.go index 8fdcbfebd..9c0b28c3e 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -52,6 +52,7 @@ type Router struct { wsHub *websocket.Hub reloadFunc func() error updateManager *updates.Manager + updateHistory *updates.UpdateHistory exportLimiter *RateLimiter persistence *config.ConfigPersistence oidcMu sync.Mutex @@ -101,18 +102,27 @@ func NewRouter(cfg *config.Config, monitor *monitoring.Monitor, wsHub *websocket InitSessionStore(cfg.DataPath) InitCSRFStore(cfg.DataPath) + updateHistory, err := updates.NewUpdateHistory(cfg.DataPath) + if err != nil { + log.Error().Err(err).Msg("Failed to initialize update history") + } + projectRoot, err := os.Getwd() if err != nil { projectRoot = "." } + updateManager := updates.NewManager(cfg) + updateManager.SetHistory(updateHistory) + r := &Router{ mux: http.NewServeMux(), config: cfg, monitor: monitor, wsHub: wsHub, reloadFunc: reloadFunc, - updateManager: updates.NewManager(cfg), + updateManager: updateManager, + updateHistory: updateHistory, exportLimiter: NewRateLimiter(5, 1*time.Minute), // 5 attempts per minute persistence: config.NewConfigPersistence(cfg.DataPath), projectRoot: projectRoot, @@ -160,7 +170,7 @@ func (r *Router) setupRoutes() { guestMetadataHandler := NewGuestMetadataHandler(r.config.DataPath) dockerMetadataHandler := NewDockerMetadataHandler(r.config.DataPath) r.configHandlers = NewConfigHandlers(r.config, r.monitor, r.reloadFunc, r.wsHub, guestMetadataHandler, r.reloadSystemSettings) - updateHandlers := NewUpdateHandlers(r.updateManager, r.config.DataPath) + updateHandlers := NewUpdateHandlers(r.updateManager, r.updateHistory) r.dockerAgentHandlers = NewDockerAgentHandlers(r.monitor, r.wsHub) r.hostAgentHandlers = NewHostAgentHandlers(r.monitor, r.wsHub) r.temperatureProxyHandlers = NewTemperatureProxyHandlers(r.persistence) diff --git a/internal/api/updates.go b/internal/api/updates.go index a15a4f230..4d121588b 100644 --- a/internal/api/updates.go +++ b/internal/api/updates.go @@ -25,15 +25,7 @@ type UpdateHandlers struct { } // NewUpdateHandlers creates new update handlers -func NewUpdateHandlers(manager *updates.Manager, dataDir string) *UpdateHandlers { - // Initialize update history using configured data directory - // Empty string defaults to /var/lib/pulse for backward compatibility - history, err := updates.NewUpdateHistory(dataDir) - if err != nil { - log.Error().Err(err).Msg("Failed to initialize update history") - // Continue without history - handlers will check for nil - } - +func NewUpdateHandlers(manager *updates.Manager, history *updates.UpdateHistory) *UpdateHandlers { // Initialize updater registry registry := updates.NewUpdaterRegistry() @@ -108,7 +100,13 @@ func (h *UpdateHandlers) HandleApplyUpdate(w http.ResponseWriter, r *http.Reques // Start update in background with a new context (not request context which gets cancelled) go func() { ctx := context.Background() - if err := h.manager.ApplyUpdate(ctx, req.DownloadURL); err != nil { + applyReq := updates.ApplyUpdateRequest{ + DownloadURL: req.DownloadURL, + Channel: r.URL.Query().Get("channel"), + InitiatedBy: updates.InitiatedByUser, + InitiatedVia: updates.InitiatedViaUI, + } + if err := h.manager.ApplyUpdate(ctx, applyReq); err != nil { log.Error().Err(err).Msg("Failed to apply update") } }() diff --git a/internal/updates/history.go b/internal/updates/history.go index 9ee6793e4..97a6526a2 100644 --- a/internal/updates/history.go +++ b/internal/updates/history.go @@ -47,6 +47,7 @@ type InitiatedVia string const ( InitiatedViaUI InitiatedVia = "ui" + InitiatedViaAPI InitiatedVia = "api" InitiatedViaCLI InitiatedVia = "cli" InitiatedViaScript InitiatedVia = "script" InitiatedViaWebhook InitiatedVia = "webhook" @@ -54,23 +55,23 @@ const ( // UpdateHistoryEntry represents a single update event type UpdateHistoryEntry struct { - EventID string `json:"event_id"` - Timestamp time.Time `json:"timestamp"` - Action UpdateAction `json:"action"` - Channel string `json:"channel"` - VersionFrom string `json:"version_from"` - VersionTo string `json:"version_to"` - DeploymentType string `json:"deployment_type"` - InitiatedBy InitiatedBy `json:"initiated_by"` - InitiatedVia InitiatedVia `json:"initiated_via"` - Status UpdateStatusType `json:"status"` - DurationMs int64 `json:"duration_ms"` - BackupPath string `json:"backup_path,omitempty"` - LogPath string `json:"log_path,omitempty"` - Error *UpdateError `json:"error,omitempty"` - DownloadBytes int64 `json:"download_bytes,omitempty"` - RelatedEventID string `json:"related_event_id,omitempty"` - Notes string `json:"notes,omitempty"` + EventID string `json:"event_id"` + Timestamp time.Time `json:"timestamp"` + Action UpdateAction `json:"action"` + Channel string `json:"channel"` + VersionFrom string `json:"version_from"` + VersionTo string `json:"version_to"` + DeploymentType string `json:"deployment_type"` + InitiatedBy InitiatedBy `json:"initiated_by"` + InitiatedVia InitiatedVia `json:"initiated_via"` + Status UpdateStatusType `json:"status"` + DurationMs int64 `json:"duration_ms"` + BackupPath string `json:"backup_path,omitempty"` + LogPath string `json:"log_path,omitempty"` + Error *UpdateError `json:"error,omitempty"` + DownloadBytes int64 `json:"download_bytes,omitempty"` + RelatedEventID string `json:"related_event_id,omitempty"` + Notes string `json:"notes,omitempty"` } // UpdateError represents error information @@ -82,10 +83,10 @@ type UpdateError struct { // HistoryFilter represents filters for querying update history type HistoryFilter struct { - Status UpdateStatusType - Action UpdateAction + Status UpdateStatusType + Action UpdateAction DeploymentType string - Limit int + Limit int } // UpdateHistory manages the update history log diff --git a/internal/updates/manager.go b/internal/updates/manager.go index 40dad8b7c..0adab396b 100644 --- a/internal/updates/manager.go +++ b/internal/updates/manager.go @@ -14,6 +14,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "runtime" "strconv" "strings" @@ -68,6 +69,7 @@ var ( // Manager handles update operations type Manager struct { config *config.Config + history *UpdateHistory status UpdateStatus statusMu sync.RWMutex checkCache map[string]*UpdateInfo // keyed by channel @@ -78,6 +80,15 @@ type Manager struct { sseBroadcast *SSEBroadcaster } +// ApplyUpdateRequest describes an update request initiated via the API/UI. +type ApplyUpdateRequest struct { + DownloadURL string + Channel string + InitiatedBy InitiatedBy + InitiatedVia InitiatedVia + Notes string +} + // NewManager creates a new update manager func NewManager(cfg *config.Config) *Manager { m := &Manager{ @@ -103,6 +114,11 @@ func NewManager(cfg *config.Config) *Manager { return m } +// SetHistory wires an update history sink for recording update progress. +func (m *Manager) SetHistory(history *UpdateHistory) { + m.history = history +} + // GetProgressChannel returns the channel for update progress func (m *Manager) GetProgressChannel() <-chan UpdateStatus { return m.progressChan @@ -338,10 +354,13 @@ func (m *Manager) CheckForUpdatesWithChannel(ctx context.Context, channel string } // ApplyUpdate downloads and applies an update -func (m *Manager) ApplyUpdate(ctx context.Context, downloadURL string) error { +func (m *Manager) ApplyUpdate(ctx context.Context, req ApplyUpdateRequest) error { // Validate download URL (allow test server URLs when PULSE_UPDATE_SERVER is set) + if req.DownloadURL == "" { + return fmt.Errorf("download URL is required") + } if os.Getenv("PULSE_UPDATE_SERVER") == "" { - if !strings.HasPrefix(downloadURL, "https://github.com/rcourtman/Pulse/releases/download/") { + if !strings.HasPrefix(req.DownloadURL, "https://github.com/rcourtman/Pulse/releases/download/") { return fmt.Errorf("invalid download URL") } } @@ -358,7 +377,7 @@ func (m *Manager) ApplyUpdate(ctx context.Context, downloadURL string) error { } // Enqueue the update job - job, accepted := m.queue.Enqueue(downloadURL) + job, accepted := m.queue.Enqueue(req.DownloadURL) if !accepted { return fmt.Errorf("update already in progress") } @@ -371,6 +390,42 @@ func (m *Manager) ApplyUpdate(ctx context.Context, downloadURL string) error { m.updateStatus("downloading", 10, "Downloading update...") + channel := m.resolveChannel(req.Channel, currentInfo) + targetVersion := inferVersionFromDownloadURL(req.DownloadURL) + initiatedBy := req.InitiatedBy + if initiatedBy == "" { + initiatedBy = InitiatedByUser + } + initiatedVia := req.InitiatedVia + if initiatedVia == "" { + initiatedVia = InitiatedViaAPI + } + + start := time.Now() + eventID := m.createHistoryEntry(ctx, UpdateHistoryEntry{ + Action: ActionUpdate, + Channel: channel, + VersionFrom: currentInfo.Version, + VersionTo: targetVersion, + DeploymentType: currentInfo.DeploymentType, + InitiatedBy: initiatedBy, + InitiatedVia: initiatedVia, + Status: StatusInProgress, + Notes: req.Notes, + }) + + var runErr error + defer func() { + if eventID == "" { + return + } + status := StatusSuccess + if runErr != nil { + status = StatusFailed + } + m.completeHistoryEntry(ctx, eventID, status, start, runErr) + }() + // Create temp directory in a location we can write to // Try multiple locations in order of preference var tempDir string @@ -393,6 +448,8 @@ func (m *Manager) ApplyUpdate(ctx context.Context, downloadURL string) error { if err != nil { tempErr := fmt.Errorf("failed to create temp directory in any location: %w", err) m.updateStatus("error", 10, "Failed to create temp directory", tempErr) + runErr = tempErr + m.queue.MarkCompleted(job.ID, tempErr) return tempErr } } @@ -401,20 +458,28 @@ func (m *Manager) ApplyUpdate(ctx context.Context, downloadURL string) error { // Download update tarballPath := filepath.Join(tempDir, "update.tar.gz") - if err := m.downloadFile(ctx, downloadURL, tarballPath); err != nil { + downloadBytes, err := m.downloadFile(ctx, req.DownloadURL, tarballPath) + if err != nil { downloadErr := fmt.Errorf("failed to download update: %w", err) m.updateStatus("error", 20, "Failed to download update", downloadErr) m.queue.MarkCompleted(job.ID, downloadErr) - return downloadErr + runErr = downloadErr + return runErr + } + if downloadBytes > 0 { + m.updateHistoryEntry(ctx, eventID, func(entry *UpdateHistoryEntry) { + entry.DownloadBytes = downloadBytes + }) } // Verify checksum if available m.updateStatus("verifying", 30, "Verifying download...") - if err := m.verifyChecksum(ctx, downloadURL, tarballPath); err != nil { + if err := m.verifyChecksum(ctx, req.DownloadURL, tarballPath); err != nil { checksumErr := fmt.Errorf("checksum verification failed: %w", err) m.updateStatus("error", 30, "Failed to verify update checksum", checksumErr) m.queue.MarkCompleted(job.ID, checksumErr) - return checksumErr + runErr = checksumErr + return runErr } log.Info().Msg("Checksum verification passed") @@ -426,7 +491,8 @@ func (m *Manager) ApplyUpdate(ctx context.Context, downloadURL string) error { extractErr := fmt.Errorf("failed to extract update: %w", err) m.updateStatus("error", 40, "Failed to extract update", extractErr) m.queue.MarkCompleted(job.ID, extractErr) - return extractErr + runErr = extractErr + return runErr } m.updateStatus("backing-up", 60, "Creating backup...") @@ -437,24 +503,16 @@ func (m *Manager) ApplyUpdate(ctx context.Context, downloadURL string) error { backupErr := fmt.Errorf("failed to create backup: %w", err) m.updateStatus("error", 60, "Failed to create backup", backupErr) m.queue.MarkCompleted(job.ID, backupErr) - return backupErr + runErr = backupErr + return runErr } log.Info().Str("backup", backupPath).Msg("Created backup") + m.updateHistoryEntry(ctx, eventID, func(entry *UpdateHistoryEntry) { + entry.BackupPath = backupPath + }) m.updateStatus("applying", 80, "Applying update...") - // Extract version from download URL or use timestamp - version := "unknown" - if parts := strings.Split(downloadURL, "/"); len(parts) > 0 { - for _, part := range parts { - if strings.HasPrefix(part, "v") { - version = strings.TrimPrefix(part, "v") - version = strings.TrimSuffix(version, ".tar.gz") - break - } - } - } - // Apply the update files // With the new directory structure (/opt/pulse/bin/), the pulse user has write access log.Info().Msg("Applying update files") @@ -463,11 +521,12 @@ func (m *Manager) ApplyUpdate(ctx context.Context, downloadURL string) error { applyErr := fmt.Errorf("failed to apply update: %w", err) m.updateStatus("error", 80, "Failed to apply update", applyErr) m.queue.MarkCompleted(job.ID, applyErr) + runErr = applyErr // Attempt to restore backup if restoreErr := m.restoreBackup(backupPath); restoreErr != nil { log.Error().Err(restoreErr).Msg("Failed to restore backup") } - return applyErr + return runErr } m.updateStatus("restarting", 95, "Restarting service...") @@ -713,38 +772,112 @@ func (m *Manager) getLatestReleaseForChannel(ctx context.Context, channel string return nil, fmt.Errorf("no releases found for channel %s", channel) } +func (m *Manager) resolveChannel(requested string, currentInfo *VersionInfo) string { + if requested != "" { + return requested + } + if m.config != nil && m.config.UpdateChannel != "" { + return m.config.UpdateChannel + } + if currentInfo != nil && currentInfo.Channel != "" { + return currentInfo.Channel + } + return "stable" +} + +func (m *Manager) createHistoryEntry(ctx context.Context, entry UpdateHistoryEntry) string { + if m.history == nil { + return "" + } + eventID, err := m.history.CreateEntry(ctx, entry) + if err != nil { + log.Error().Err(err).Msg("Failed to create update history entry") + return "" + } + return eventID +} + +func (m *Manager) updateHistoryEntry(ctx context.Context, eventID string, updateFn func(entry *UpdateHistoryEntry)) { + if m.history == nil || eventID == "" { + return + } + if err := m.history.UpdateEntry(ctx, eventID, func(e *UpdateHistoryEntry) error { + updateFn(e) + return nil + }); err != nil { + log.Error().Err(err).Str("event_id", eventID).Msg("Failed to update history entry") + } +} + +func (m *Manager) completeHistoryEntry(ctx context.Context, eventID string, status UpdateStatusType, start time.Time, runErr error) { + if m.history == nil || eventID == "" { + return + } + if err := m.history.UpdateEntry(ctx, eventID, func(e *UpdateHistoryEntry) error { + e.Status = status + e.DurationMs = time.Since(start).Milliseconds() + if runErr != nil { + e.Error = &UpdateError{ + Message: runErr.Error(), + Code: "update_failed", + } + } else { + e.Error = nil + } + return nil + }); err != nil { + log.Error().Err(err).Str("event_id", eventID).Msg("Failed to finalize history entry") + } +} + +var versionInURLRegex = regexp.MustCompile(`v\d+\.\d+\.\d+(?:-[A-Za-z0-9\.]*\d[A-Za-z0-9\.]*)?`) + +func inferVersionFromDownloadURL(downloadURL string) string { + if downloadURL == "" { + return "" + } + if match := versionInURLRegex.FindString(downloadURL); match != "" { + return match + } + base := filepath.Base(downloadURL) + if match := versionInURLRegex.FindString(base); match != "" { + return match + } + return "" +} + // downloadFile downloads a file from URL to dest -func (m *Manager) downloadFile(ctx context.Context, url, dest string) error { +func (m *Manager) downloadFile(ctx context.Context, url, dest string) (int64, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - return err + return 0, err } client := &http.Client{Timeout: 5 * time.Minute} resp, err := client.Do(req) if err != nil { - return err + return 0, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return fmt.Errorf("download failed with status %d", resp.StatusCode) + return 0, fmt.Errorf("download failed with status %d", resp.StatusCode) } out, err := os.Create(dest) if err != nil { - return err + return 0, err } defer out.Close() // Copy with progress updates written, err := io.Copy(out, resp.Body) if err != nil { - return err + return 0, err } log.Info().Int64("bytes", written).Str("file", dest).Msg("Downloaded file") - return nil + return written, nil } // verifyChecksum downloads and verifies the SHA256 checksum of a file diff --git a/internal/updates/manager_test.go b/internal/updates/manager_test.go index 0caa8afbe..f95ea0902 100644 --- a/internal/updates/manager_test.go +++ b/internal/updates/manager_test.go @@ -11,6 +11,7 @@ import ( "os" "strings" "testing" + "time" "github.com/rcourtman/pulse-go-rewrite/internal/config" ) @@ -301,7 +302,7 @@ func TestApplyUpdateFailsOnChecksumError(t *testing.T) { downloadURL := server.URL + "/pulse-v0.0.1-linux-amd64.tar.gz" - err := manager.ApplyUpdate(context.Background(), downloadURL) + err := manager.ApplyUpdate(context.Background(), ApplyUpdateRequest{DownloadURL: downloadURL}) if err == nil { t.Fatalf("expected update to fail, got nil") } @@ -405,3 +406,80 @@ func TestVersionSemverOrdering(t *testing.T) { }) } } + +func TestManagerHistoryEntryLifecycle(t *testing.T) { + t.Setenv("PULSE_DATA_DIR", t.TempDir()) + + cfg := &config.Config{} + manager := NewManager(cfg) + + historyDir := t.TempDir() + history, err := NewUpdateHistory(historyDir) + if err != nil { + t.Fatalf("NewUpdateHistory: %v", err) + } + manager.SetHistory(history) + + ctx := context.Background() + eventID := manager.createHistoryEntry(ctx, UpdateHistoryEntry{ + Action: ActionUpdate, + Status: StatusInProgress, + VersionFrom: "v4.24.0", + VersionTo: "v4.25.0", + DeploymentType: "systemd", + Channel: "stable", + }) + if eventID == "" { + t.Fatalf("expected event ID") + } + + backupPath := "/tmp/pulse-backup" + manager.updateHistoryEntry(ctx, eventID, func(entry *UpdateHistoryEntry) { + entry.BackupPath = backupPath + entry.DownloadBytes = 2048 + }) + + start := time.Now().Add(-1500 * time.Millisecond) + manager.completeHistoryEntry(ctx, eventID, StatusSuccess, start, nil) + + entry, err := history.GetEntry(eventID) + if err != nil { + t.Fatalf("GetEntry: %v", err) + } + + if entry.Status != StatusSuccess { + t.Fatalf("unexpected status %s", entry.Status) + } + if entry.BackupPath != backupPath { + t.Fatalf("expected backup path %s, got %s", backupPath, entry.BackupPath) + } + if entry.DownloadBytes != 2048 { + t.Fatalf("expected download bytes 2048, got %d", entry.DownloadBytes) + } + if entry.DurationMs <= 0 { + t.Fatalf("expected positive duration, got %d", entry.DurationMs) + } + if entry.Error != nil { + t.Fatalf("expected no error, got %+v", entry.Error) + } +} + +func TestInferVersionFromDownloadURL(t *testing.T) { + tests := []struct { + url string + expected string + }{ + {"https://github.com/rcourtman/Pulse/releases/download/v4.25.0/pulse-v4.25.0-linux-amd64.tar.gz", "v4.25.0"}, + {"https://example.com/pulse-v4.25.0-rc.1-linux-arm64.tar.gz", "v4.25.0-rc.1"}, + {"https://example.com/assets/pulse.tar.gz", ""}, + {"pulse-v4.30.0-linux-amd64.tar.gz", "v4.30.0"}, + } + + for _, tt := range tests { + t.Run(tt.url, func(t *testing.T) { + if got := inferVersionFromDownloadURL(tt.url); got != tt.expected { + t.Fatalf("expected %s, got %s", tt.expected, got) + } + }) + } +}