diff --git a/internal/api/docker_agents.go b/internal/api/docker_agents.go index e8c451fb0..55227b200 100644 --- a/internal/api/docker_agents.go +++ b/internal/api/docker_agents.go @@ -25,9 +25,10 @@ type DockerAgentHandlers struct { } type dockerCommandAckRequest struct { - HostID string `json:"hostId"` - Status string `json:"status"` - Message string `json:"message,omitempty"` + HostID string `json:"hostId"` + Status string `json:"status"` + Message string `json:"message,omitempty"` + Payload map[string]any `json:"payload,omitempty"` } // errInvalidCommandStatus is returned when an unrecognized command status is provided. @@ -231,25 +232,45 @@ func (h *DockerAgentHandlers) HandleCommandAck(w http.ResponseWriter, r *http.Re return } - commandStatus, hostID, shouldRemove, err := h.getMonitor(r.Context()).AcknowledgeDockerHostCommand(commandID, req.HostID, status, req.Message) + mon := h.getMonitor(r.Context()) + commandStatus, hostID, shouldRemove, err := mon.AcknowledgeDockerHostCommand(commandID, req.HostID, status, req.Message) if err != nil { writeErrorResponse(w, http.StatusBadRequest, "docker_command_ack_failed", err.Error(), nil) return } + // If a container update succeeded, migrate persisted container metadata (custom URLs, notes, tags) + // so the UI doesn't "lose" it when the container runtime ID changes. + if status == monitoring.DockerCommandStatusCompleted && commandStatus.Type == monitoring.DockerCommandTypeUpdateContainer && len(req.Payload) > 0 { + oldContainerID, _ := req.Payload["oldContainerId"].(string) + newContainerID, _ := req.Payload["newContainerId"].(string) + oldContainerID = strings.TrimSpace(oldContainerID) + newContainerID = strings.TrimSpace(newContainerID) + if oldContainerID != "" && newContainerID != "" && oldContainerID != newContainerID { + if err := mon.CopyDockerContainerMetadata(hostID, oldContainerID, newContainerID); err != nil { + log.Warn(). + Err(err). + Str("dockerHostID", hostID). + Str("oldContainerID", oldContainerID). + Str("newContainerID", newContainerID). + Msg("Failed to migrate docker container metadata after update") + } + } + } + if shouldRemove { - if _, removeErr := h.getMonitor(r.Context()).RemoveDockerHost(hostID); removeErr != nil { + if _, removeErr := mon.RemoveDockerHost(hostID); removeErr != nil { log.Error().Err(removeErr).Str("dockerHostID", hostID).Str("commandID", commandID).Msg("Failed to remove docker host after command completion") } else { // Clear the removal block since the agent has confirmed it stopped successfully. // This allows immediate re-enrollment without waiting for the 24-hour TTL. - if reenrollErr := h.getMonitor(r.Context()).AllowDockerHostReenroll(hostID); reenrollErr != nil { + if reenrollErr := mon.AllowDockerHostReenroll(hostID); reenrollErr != nil { log.Warn().Err(reenrollErr).Str("dockerHostID", hostID).Msg("Failed to clear removal block after successful stop") } } } - go h.wsHub.BroadcastState(h.getMonitor(r.Context()).GetState().ToFrontend()) + go h.wsHub.BroadcastState(mon.GetState().ToFrontend()) if err := utils.WriteJSONResponse(w, map[string]any{ "success": true, diff --git a/internal/dockeragent/agent.go b/internal/dockeragent/agent.go index 2ce6a3946..9c3703866 100644 --- a/internal/dockeragent/agent.go +++ b/internal/dockeragent/agent.go @@ -935,6 +935,10 @@ func removeFileIfExists(path string) error { } func (a *Agent) sendCommandAck(ctx context.Context, target TargetConfig, commandID, status, message string) error { + return a.sendCommandAckWithPayload(ctx, target, commandID, status, message, nil) +} + +func (a *Agent) sendCommandAckWithPayload(ctx context.Context, target TargetConfig, commandID, status, message string, payload map[string]any) error { if a.hostID == "" { return fmt.Errorf("host identifier unavailable; cannot acknowledge command") } @@ -943,6 +947,7 @@ func (a *Agent) sendCommandAck(ctx context.Context, target TargetConfig, command HostID: a.hostID, Status: status, Message: message, + Payload: payload, } body, err := jsonMarshalFn(ackPayload) diff --git a/internal/dockeragent/container_update.go b/internal/dockeragent/container_update.go index d22e049d4..329c161ea 100644 --- a/internal/dockeragent/container_update.go +++ b/internal/dockeragent/container_update.go @@ -17,6 +17,8 @@ import ( type ContainerUpdateResult struct { Success bool `json:"success"` ContainerID string `json:"containerId"` + OldContainerID string `json:"oldContainerId,omitempty"` + NewContainerID string `json:"newContainerId,omitempty"` ContainerName string `json:"containerName"` OldImageDigest string `json:"oldImageDigest,omitempty"` NewImageDigest string `json:"newImageDigest,omitempty"` @@ -66,7 +68,23 @@ func (a *Agent) handleUpdateContainerCommand(ctx context.Context, target TargetC message = result.Error } - if err := a.sendCommandAck(ctx, target, command.ID, status, message); err != nil { + var payload map[string]any + if result.Success && result.OldContainerID != "" && result.NewContainerID != "" && result.OldContainerID != result.NewContainerID { + // Provide a stable mapping so the server can migrate persisted metadata (custom URLs, notes, tags) + // from the old container runtime ID to the new one. + payload = map[string]any{ + "oldContainerId": result.OldContainerID, + "newContainerId": result.NewContainerID, + } + } + + var err error + if payload != nil { + err = a.sendCommandAckWithPayload(ctx, target, command.ID, status, message, payload) + } else { + err = a.sendCommandAck(ctx, target, command.ID, status, message) + } + if err != nil { a.logger.Error().Err(err).Msg("Failed to send completion acknowledgement to Pulse") } @@ -85,7 +103,8 @@ func (a *Agent) handleUpdateContainerCommand(ctx context.Context, target TargetC // The progressFn callback is called at each step to report progress to Pulse. func (a *Agent) updateContainerWithProgress(ctx context.Context, containerID string, progressFn func(step string)) ContainerUpdateResult { result := ContainerUpdateResult{ - ContainerID: containerID, + ContainerID: containerID, + OldContainerID: containerID, } // Helper to report progress (handles nil progressFn) @@ -207,6 +226,9 @@ func (a *Agent) updateContainerWithProgress(ctx context.Context, containerID str } newContainerID := createResp.ID + result.NewContainerID = newContainerID + // After a successful update, the resulting "current" container ID is the new container. + result.ContainerID = newContainerID a.logger.Info().Str("newContainerId", newContainerID).Msg("New container created") // 7. Connect to additional networks (if more than one) diff --git a/internal/dockeragent/container_update_test.go b/internal/dockeragent/container_update_test.go index 83d69ef14..d1662362e 100644 --- a/internal/dockeragent/container_update_test.go +++ b/internal/dockeragent/container_update_test.go @@ -534,4 +534,88 @@ func TestHandleUpdateContainerCommand(t *testing.T) { t.Fatalf("unexpected error: %v", err) } }) + + t.Run("success ack includes container id mapping payload", func(t *testing.T) { + var ( + mu sync.Mutex + acks []agentsdocker.CommandAck + ) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var ack agentsdocker.CommandAck + _ = json.Unmarshal(body, &ack) + mu.Lock() + acks = append(acks, ack) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + docker: &fakeDockerClient{ + containerInspectFn: func(_ context.Context, id string) (containertypes.InspectResponse, error) { + inspect := baseInspect() + if id == "new123" { + inspect.ContainerJSONBase.Image = "sha256:new0000000000" + } + return inspect, nil + }, + imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("{}")), nil + }, + containerStopFn: func(context.Context, string, containertypes.StopOptions) error { + return nil + }, + containerRenameFn: func(context.Context, string, string) error { + return nil + }, + containerCreateFn: func(context.Context, *containertypes.Config, *containertypes.HostConfig, *network.NetworkingConfig, *v1.Platform, string) (containertypes.CreateResponse, error) { + return containertypes.CreateResponse{ID: "new123"}, nil + }, + containerStartFn: func(context.Context, string, containertypes.StartOptions) error { + return nil + }, + }, + } + + cmd := agentsdocker.Command{ + ID: "cmd6", + Type: agentsdocker.CommandTypeUpdateContainer, + Payload: map[string]any{ + "containerId": "container1", + }, + } + + if err := agent.handleUpdateContainerCommand(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, cmd); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + mu.Lock() + defer mu.Unlock() + + var completed *agentsdocker.CommandAck + for i := range acks { + if acks[i].Status == agentsdocker.CommandStatusCompleted { + completed = &acks[i] + break + } + } + if completed == nil { + t.Fatalf("expected a completed ack, got %d total acks", len(acks)) + } + if completed.Payload == nil { + t.Fatalf("expected completed ack payload to be set") + } + if got, _ := completed.Payload["oldContainerId"].(string); got != "container1" { + t.Fatalf("oldContainerId = %q, want %q", got, "container1") + } + if got, _ := completed.Payload["newContainerId"].(string); got != "new123" { + t.Fatalf("newContainerId = %q, want %q", got, "new123") + } + }) } diff --git a/internal/monitoring/docker_metadata_migration.go b/internal/monitoring/docker_metadata_migration.go new file mode 100644 index 000000000..3cbf1dbbd --- /dev/null +++ b/internal/monitoring/docker_metadata_migration.go @@ -0,0 +1,72 @@ +package monitoring + +import ( + "fmt" + "slices" + "strings" + + "github.com/rcourtman/pulse-go-rewrite/internal/config" +) + +// CopyDockerContainerMetadata copies persisted container metadata from an old container runtime ID to a new one. +// +// Docker container updates typically recreate the container, producing a new runtime ID. Persisted metadata +// (custom URL, description, tags, notes) is keyed by resource ID and would otherwise be "lost" for the new +// container. +// +// This is intentionally a copy (not a move) so rollback-to-backup scenarios can still find metadata under +// the original container ID. +func (m *Monitor) CopyDockerContainerMetadata(hostID, oldContainerID, newContainerID string) error { + if m == nil || m.dockerMetadataStore == nil { + return nil + } + + hostID = strings.TrimSpace(hostID) + oldContainerID = strings.TrimSpace(oldContainerID) + newContainerID = strings.TrimSpace(newContainerID) + if hostID == "" || oldContainerID == "" || newContainerID == "" || oldContainerID == newContainerID { + return nil + } + + oldKey := fmt.Sprintf("%s:container:%s", hostID, oldContainerID) + newKey := fmt.Sprintf("%s:container:%s", hostID, newContainerID) + + oldMeta := m.dockerMetadataStore.Get(oldKey) + if oldMeta == nil { + return nil + } + if oldMeta.CustomURL == "" && oldMeta.Description == "" && len(oldMeta.Tags) == 0 && len(oldMeta.Notes) == 0 { + return nil + } + + newMeta := m.dockerMetadataStore.Get(newKey) + var merged config.DockerMetadata + if newMeta != nil { + merged = *newMeta + } + + // Merge missing fields from old -> new, so we don't clobber any metadata already present under the new ID. + if merged.CustomURL == "" { + merged.CustomURL = oldMeta.CustomURL + } + if merged.Description == "" { + merged.Description = oldMeta.Description + } + if len(merged.Tags) == 0 && len(oldMeta.Tags) > 0 { + merged.Tags = append([]string(nil), oldMeta.Tags...) + } + if len(merged.Notes) == 0 && len(oldMeta.Notes) > 0 { + merged.Notes = append([]string(nil), oldMeta.Notes...) + } + + // Avoid an unnecessary disk write if nothing changed. + if newMeta != nil && + merged.CustomURL == newMeta.CustomURL && + merged.Description == newMeta.Description && + slices.Equal(merged.Tags, newMeta.Tags) && + slices.Equal(merged.Notes, newMeta.Notes) { + return nil + } + + return m.dockerMetadataStore.Set(newKey, &merged) +} diff --git a/pkg/agents/docker/command.go b/pkg/agents/docker/command.go index 26a3ccaa3..ca9bb396d 100644 --- a/pkg/agents/docker/command.go +++ b/pkg/agents/docker/command.go @@ -15,9 +15,10 @@ type ReportResponse struct { // CommandAck is sent by the agent to confirm the result of a control command. type CommandAck struct { - HostID string `json:"hostId"` - Status string `json:"status"` - Message string `json:"message,omitempty"` + HostID string `json:"hostId"` + Status string `json:"status"` + Message string `json:"message,omitempty"` + Payload map[string]any `json:"payload,omitempty"` } const ( diff --git a/pkg/metrics/store.go b/pkg/metrics/store.go index 55e6497fc..4b8565d27 100644 --- a/pkg/metrics/store.go +++ b/pkg/metrics/store.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "time" @@ -109,7 +110,7 @@ func NewStore(config StoreConfig) (*Store, error) { "journal_mode(WAL)", "synchronous(NORMAL)", "auto_vacuum(INCREMENTAL)", - "wal_autocheckpoint(1000)", + "wal_autocheckpoint(500)", }, }.Encode() db, err := sql.Open("sqlite", dsn) @@ -198,10 +199,67 @@ func (s *Store) initSchema() error { return fmt.Errorf("failed to create schema: %w", err) } + // Ensure rollups (and any reprocessing after failed checkpoints) don't create duplicate rows. + // We enforce uniqueness on the natural key so we can use INSERT OR IGNORE for rollups. + if err := s.ensureMetricsUniqueIndex(); err != nil { + return err + } + log.Debug().Msg("Metrics schema initialized") return nil } +func (s *Store) ensureMetricsUniqueIndex() error { + const createUniqueIndex = ` + CREATE UNIQUE INDEX IF NOT EXISTS idx_metrics_unique + ON metrics(resource_type, resource_id, metric_type, timestamp, tier); + ` + + _, err := s.db.Exec(createUniqueIndex) + if err == nil { + return nil + } + + // If the DB already contains duplicates (from older versions), creating the unique index + // will fail. Deduplicate and retry once. + lower := strings.ToLower(err.Error()) + if !strings.Contains(lower, "unique") && !strings.Contains(lower, "constraint") && !strings.Contains(lower, "duplicate") { + return fmt.Errorf("failed to create unique index for metrics rollups: %w", err) + } + + log.Warn().Err(err).Msg("Duplicate metrics detected; deduplicating before creating unique index") + + tx, txErr := s.db.Begin() + if txErr != nil { + return fmt.Errorf("begin dedupe transaction: %w", txErr) + } + defer tx.Rollback() + + // Keep the earliest row (lowest id) for each natural key. + _, txErr = tx.Exec(` + DELETE FROM metrics + WHERE id NOT IN ( + SELECT MIN(id) + FROM metrics + GROUP BY resource_type, resource_id, metric_type, timestamp, tier + ) + `) + if txErr != nil { + return fmt.Errorf("dedupe metrics: %w", txErr) + } + + if txErr := tx.Commit(); txErr != nil { + return fmt.Errorf("commit dedupe: %w", txErr) + } + + if _, err := s.db.Exec(createUniqueIndex); err != nil { + return fmt.Errorf("failed to create unique index after dedupe: %w", err) + } + + log.Info().Msg("Metrics deduplicated and unique index created") + return nil +} + // migrateAutoVacuum ensures the database uses incremental auto-vacuum. // SQLite cannot switch from NONE to INCREMENTAL without a full VACUUM to // restructure the file, so we detect and convert on first run after upgrade. @@ -739,7 +797,7 @@ func (s *Store) rollupCandidate(resourceType, resourceID, metricType string, fro // Aggregate data into buckets _, err = tx.Exec(` - INSERT INTO metrics (resource_type, resource_id, metric_type, value, min_value, max_value, timestamp, tier) + INSERT OR IGNORE INTO metrics (resource_type, resource_id, metric_type, value, min_value, max_value, timestamp, tier) SELECT resource_type, resource_id,