fix(docker,metrics): preserve container metadata on update and reduce DB writes

Docker container URL preserved on update (#1054): container updates
recreate the container with a new runtime ID. The agent now includes
{oldContainerId, newContainerId} in the completion ACK payload; the
server uses this to copy persisted metadata (custom URLs, descriptions,
tags) to the new ID so nothing is lost. Migration is a copy, not a move,
so rollback scenarios still find metadata under the original ID.

Reduce metrics.db write amplification (#1124): add a UNIQUE index on
(resource_type, resource_id, metric_type, timestamp, tier) so rollup
reprocessing after a failed checkpoint uses INSERT OR IGNORE instead of
creating duplicate rows. Existing duplicates are deduplicated once on
startup if the index creation would otherwise fail. Also sets
wal_autocheckpoint(500) to checkpoint the WAL more frequently, preventing
unbounded WAL growth.

Fixes #1054
Fixes #1124
This commit is contained in:
rcourtman 2026-02-18 12:56:46 +00:00
parent 7522f6599c
commit 9d8f8b45b5
7 changed files with 277 additions and 14 deletions

View file

@ -25,9 +25,10 @@ type DockerAgentHandlers struct {
}
type dockerCommandAckRequest struct {
HostID string `json:"hostId"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
HostID string `json:"hostId"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
Payload map[string]any `json:"payload,omitempty"`
}
// errInvalidCommandStatus is returned when an unrecognized command status is provided.
@ -231,25 +232,45 @@ func (h *DockerAgentHandlers) HandleCommandAck(w http.ResponseWriter, r *http.Re
return
}
commandStatus, hostID, shouldRemove, err := h.getMonitor(r.Context()).AcknowledgeDockerHostCommand(commandID, req.HostID, status, req.Message)
mon := h.getMonitor(r.Context())
commandStatus, hostID, shouldRemove, err := mon.AcknowledgeDockerHostCommand(commandID, req.HostID, status, req.Message)
if err != nil {
writeErrorResponse(w, http.StatusBadRequest, "docker_command_ack_failed", err.Error(), nil)
return
}
// If a container update succeeded, migrate persisted container metadata (custom URLs, notes, tags)
// so the UI doesn't "lose" it when the container runtime ID changes.
if status == monitoring.DockerCommandStatusCompleted && commandStatus.Type == monitoring.DockerCommandTypeUpdateContainer && len(req.Payload) > 0 {
oldContainerID, _ := req.Payload["oldContainerId"].(string)
newContainerID, _ := req.Payload["newContainerId"].(string)
oldContainerID = strings.TrimSpace(oldContainerID)
newContainerID = strings.TrimSpace(newContainerID)
if oldContainerID != "" && newContainerID != "" && oldContainerID != newContainerID {
if err := mon.CopyDockerContainerMetadata(hostID, oldContainerID, newContainerID); err != nil {
log.Warn().
Err(err).
Str("dockerHostID", hostID).
Str("oldContainerID", oldContainerID).
Str("newContainerID", newContainerID).
Msg("Failed to migrate docker container metadata after update")
}
}
}
if shouldRemove {
if _, removeErr := h.getMonitor(r.Context()).RemoveDockerHost(hostID); removeErr != nil {
if _, removeErr := mon.RemoveDockerHost(hostID); removeErr != nil {
log.Error().Err(removeErr).Str("dockerHostID", hostID).Str("commandID", commandID).Msg("Failed to remove docker host after command completion")
} else {
// Clear the removal block since the agent has confirmed it stopped successfully.
// This allows immediate re-enrollment without waiting for the 24-hour TTL.
if reenrollErr := h.getMonitor(r.Context()).AllowDockerHostReenroll(hostID); reenrollErr != nil {
if reenrollErr := mon.AllowDockerHostReenroll(hostID); reenrollErr != nil {
log.Warn().Err(reenrollErr).Str("dockerHostID", hostID).Msg("Failed to clear removal block after successful stop")
}
}
}
go h.wsHub.BroadcastState(h.getMonitor(r.Context()).GetState().ToFrontend())
go h.wsHub.BroadcastState(mon.GetState().ToFrontend())
if err := utils.WriteJSONResponse(w, map[string]any{
"success": true,

View file

@ -935,6 +935,10 @@ func removeFileIfExists(path string) error {
}
func (a *Agent) sendCommandAck(ctx context.Context, target TargetConfig, commandID, status, message string) error {
return a.sendCommandAckWithPayload(ctx, target, commandID, status, message, nil)
}
func (a *Agent) sendCommandAckWithPayload(ctx context.Context, target TargetConfig, commandID, status, message string, payload map[string]any) error {
if a.hostID == "" {
return fmt.Errorf("host identifier unavailable; cannot acknowledge command")
}
@ -943,6 +947,7 @@ func (a *Agent) sendCommandAck(ctx context.Context, target TargetConfig, command
HostID: a.hostID,
Status: status,
Message: message,
Payload: payload,
}
body, err := jsonMarshalFn(ackPayload)

View file

@ -17,6 +17,8 @@ import (
type ContainerUpdateResult struct {
Success bool `json:"success"`
ContainerID string `json:"containerId"`
OldContainerID string `json:"oldContainerId,omitempty"`
NewContainerID string `json:"newContainerId,omitempty"`
ContainerName string `json:"containerName"`
OldImageDigest string `json:"oldImageDigest,omitempty"`
NewImageDigest string `json:"newImageDigest,omitempty"`
@ -66,7 +68,23 @@ func (a *Agent) handleUpdateContainerCommand(ctx context.Context, target TargetC
message = result.Error
}
if err := a.sendCommandAck(ctx, target, command.ID, status, message); err != nil {
var payload map[string]any
if result.Success && result.OldContainerID != "" && result.NewContainerID != "" && result.OldContainerID != result.NewContainerID {
// Provide a stable mapping so the server can migrate persisted metadata (custom URLs, notes, tags)
// from the old container runtime ID to the new one.
payload = map[string]any{
"oldContainerId": result.OldContainerID,
"newContainerId": result.NewContainerID,
}
}
var err error
if payload != nil {
err = a.sendCommandAckWithPayload(ctx, target, command.ID, status, message, payload)
} else {
err = a.sendCommandAck(ctx, target, command.ID, status, message)
}
if err != nil {
a.logger.Error().Err(err).Msg("Failed to send completion acknowledgement to Pulse")
}
@ -85,7 +103,8 @@ func (a *Agent) handleUpdateContainerCommand(ctx context.Context, target TargetC
// The progressFn callback is called at each step to report progress to Pulse.
func (a *Agent) updateContainerWithProgress(ctx context.Context, containerID string, progressFn func(step string)) ContainerUpdateResult {
result := ContainerUpdateResult{
ContainerID: containerID,
ContainerID: containerID,
OldContainerID: containerID,
}
// Helper to report progress (handles nil progressFn)
@ -207,6 +226,9 @@ func (a *Agent) updateContainerWithProgress(ctx context.Context, containerID str
}
newContainerID := createResp.ID
result.NewContainerID = newContainerID
// After a successful update, the resulting "current" container ID is the new container.
result.ContainerID = newContainerID
a.logger.Info().Str("newContainerId", newContainerID).Msg("New container created")
// 7. Connect to additional networks (if more than one)

View file

@ -534,4 +534,88 @@ func TestHandleUpdateContainerCommand(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
})
t.Run("success ack includes container id mapping payload", func(t *testing.T) {
var (
mu sync.Mutex
acks []agentsdocker.CommandAck
)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
var ack agentsdocker.CommandAck
_ = json.Unmarshal(body, &ack)
mu.Lock()
acks = append(acks, ack)
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
agent := &Agent{
logger: zerolog.Nop(),
hostID: "host1",
httpClients: map[bool]*http.Client{
false: server.Client(),
},
docker: &fakeDockerClient{
containerInspectFn: func(_ context.Context, id string) (containertypes.InspectResponse, error) {
inspect := baseInspect()
if id == "new123" {
inspect.ContainerJSONBase.Image = "sha256:new0000000000"
}
return inspect, nil
},
imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader("{}")), nil
},
containerStopFn: func(context.Context, string, containertypes.StopOptions) error {
return nil
},
containerRenameFn: func(context.Context, string, string) error {
return nil
},
containerCreateFn: func(context.Context, *containertypes.Config, *containertypes.HostConfig, *network.NetworkingConfig, *v1.Platform, string) (containertypes.CreateResponse, error) {
return containertypes.CreateResponse{ID: "new123"}, nil
},
containerStartFn: func(context.Context, string, containertypes.StartOptions) error {
return nil
},
},
}
cmd := agentsdocker.Command{
ID: "cmd6",
Type: agentsdocker.CommandTypeUpdateContainer,
Payload: map[string]any{
"containerId": "container1",
},
}
if err := agent.handleUpdateContainerCommand(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, cmd); err != nil {
t.Fatalf("unexpected error: %v", err)
}
mu.Lock()
defer mu.Unlock()
var completed *agentsdocker.CommandAck
for i := range acks {
if acks[i].Status == agentsdocker.CommandStatusCompleted {
completed = &acks[i]
break
}
}
if completed == nil {
t.Fatalf("expected a completed ack, got %d total acks", len(acks))
}
if completed.Payload == nil {
t.Fatalf("expected completed ack payload to be set")
}
if got, _ := completed.Payload["oldContainerId"].(string); got != "container1" {
t.Fatalf("oldContainerId = %q, want %q", got, "container1")
}
if got, _ := completed.Payload["newContainerId"].(string); got != "new123" {
t.Fatalf("newContainerId = %q, want %q", got, "new123")
}
})
}

View file

@ -0,0 +1,72 @@
package monitoring
import (
"fmt"
"slices"
"strings"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
)
// CopyDockerContainerMetadata copies persisted container metadata from an old container runtime ID to a new one.
//
// Docker container updates typically recreate the container, producing a new runtime ID. Persisted metadata
// (custom URL, description, tags, notes) is keyed by resource ID and would otherwise be "lost" for the new
// container.
//
// This is intentionally a copy (not a move) so rollback-to-backup scenarios can still find metadata under
// the original container ID.
func (m *Monitor) CopyDockerContainerMetadata(hostID, oldContainerID, newContainerID string) error {
if m == nil || m.dockerMetadataStore == nil {
return nil
}
hostID = strings.TrimSpace(hostID)
oldContainerID = strings.TrimSpace(oldContainerID)
newContainerID = strings.TrimSpace(newContainerID)
if hostID == "" || oldContainerID == "" || newContainerID == "" || oldContainerID == newContainerID {
return nil
}
oldKey := fmt.Sprintf("%s:container:%s", hostID, oldContainerID)
newKey := fmt.Sprintf("%s:container:%s", hostID, newContainerID)
oldMeta := m.dockerMetadataStore.Get(oldKey)
if oldMeta == nil {
return nil
}
if oldMeta.CustomURL == "" && oldMeta.Description == "" && len(oldMeta.Tags) == 0 && len(oldMeta.Notes) == 0 {
return nil
}
newMeta := m.dockerMetadataStore.Get(newKey)
var merged config.DockerMetadata
if newMeta != nil {
merged = *newMeta
}
// Merge missing fields from old -> new, so we don't clobber any metadata already present under the new ID.
if merged.CustomURL == "" {
merged.CustomURL = oldMeta.CustomURL
}
if merged.Description == "" {
merged.Description = oldMeta.Description
}
if len(merged.Tags) == 0 && len(oldMeta.Tags) > 0 {
merged.Tags = append([]string(nil), oldMeta.Tags...)
}
if len(merged.Notes) == 0 && len(oldMeta.Notes) > 0 {
merged.Notes = append([]string(nil), oldMeta.Notes...)
}
// Avoid an unnecessary disk write if nothing changed.
if newMeta != nil &&
merged.CustomURL == newMeta.CustomURL &&
merged.Description == newMeta.Description &&
slices.Equal(merged.Tags, newMeta.Tags) &&
slices.Equal(merged.Notes, newMeta.Notes) {
return nil
}
return m.dockerMetadataStore.Set(newKey, &merged)
}

View file

@ -15,9 +15,10 @@ type ReportResponse struct {
// CommandAck is sent by the agent to confirm the result of a control command.
type CommandAck struct {
HostID string `json:"hostId"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
HostID string `json:"hostId"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
Payload map[string]any `json:"payload,omitempty"`
}
const (

View file

@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -109,7 +110,7 @@ func NewStore(config StoreConfig) (*Store, error) {
"journal_mode(WAL)",
"synchronous(NORMAL)",
"auto_vacuum(INCREMENTAL)",
"wal_autocheckpoint(1000)",
"wal_autocheckpoint(500)",
},
}.Encode()
db, err := sql.Open("sqlite", dsn)
@ -198,10 +199,67 @@ func (s *Store) initSchema() error {
return fmt.Errorf("failed to create schema: %w", err)
}
// Ensure rollups (and any reprocessing after failed checkpoints) don't create duplicate rows.
// We enforce uniqueness on the natural key so we can use INSERT OR IGNORE for rollups.
if err := s.ensureMetricsUniqueIndex(); err != nil {
return err
}
log.Debug().Msg("Metrics schema initialized")
return nil
}
func (s *Store) ensureMetricsUniqueIndex() error {
const createUniqueIndex = `
CREATE UNIQUE INDEX IF NOT EXISTS idx_metrics_unique
ON metrics(resource_type, resource_id, metric_type, timestamp, tier);
`
_, err := s.db.Exec(createUniqueIndex)
if err == nil {
return nil
}
// If the DB already contains duplicates (from older versions), creating the unique index
// will fail. Deduplicate and retry once.
lower := strings.ToLower(err.Error())
if !strings.Contains(lower, "unique") && !strings.Contains(lower, "constraint") && !strings.Contains(lower, "duplicate") {
return fmt.Errorf("failed to create unique index for metrics rollups: %w", err)
}
log.Warn().Err(err).Msg("Duplicate metrics detected; deduplicating before creating unique index")
tx, txErr := s.db.Begin()
if txErr != nil {
return fmt.Errorf("begin dedupe transaction: %w", txErr)
}
defer tx.Rollback()
// Keep the earliest row (lowest id) for each natural key.
_, txErr = tx.Exec(`
DELETE FROM metrics
WHERE id NOT IN (
SELECT MIN(id)
FROM metrics
GROUP BY resource_type, resource_id, metric_type, timestamp, tier
)
`)
if txErr != nil {
return fmt.Errorf("dedupe metrics: %w", txErr)
}
if txErr := tx.Commit(); txErr != nil {
return fmt.Errorf("commit dedupe: %w", txErr)
}
if _, err := s.db.Exec(createUniqueIndex); err != nil {
return fmt.Errorf("failed to create unique index after dedupe: %w", err)
}
log.Info().Msg("Metrics deduplicated and unique index created")
return nil
}
// migrateAutoVacuum ensures the database uses incremental auto-vacuum.
// SQLite cannot switch from NONE to INCREMENTAL without a full VACUUM to
// restructure the file, so we detect and convert on first run after upgrade.
@ -739,7 +797,7 @@ func (s *Store) rollupCandidate(resourceType, resourceID, metricType string, fro
// Aggregate data into buckets
_, err = tx.Exec(`
INSERT INTO metrics (resource_type, resource_id, metric_type, value, min_value, max_value, timestamp, tier)
INSERT OR IGNORE INTO metrics (resource_type, resource_id, metric_type, value, min_value, max_value, timestamp, tier)
SELECT
resource_type,
resource_id,