From 387a79bc2d221f1742eb9edb0a663566a4e09d2d Mon Sep 17 00:00:00 2001 From: rcourtman Date: Wed, 1 Apr 2026 12:44:17 +0100 Subject: [PATCH] Defer metrics store startup maintenance --- pkg/metrics/store.go | 73 ++++++++++++++++++++-------- pkg/metrics/store_additional_test.go | 49 ++++++++++++++++++- pkg/metrics/store_test.go | 4 +- 3 files changed, 103 insertions(+), 23 deletions(-) diff --git a/pkg/metrics/store.go b/pkg/metrics/store.go index 2b181197b..171de2ba9 100644 --- a/pkg/metrics/store.go +++ b/pkg/metrics/store.go @@ -78,6 +78,12 @@ type writeRequest struct { done chan struct{} } +type maintenanceRequest struct { + run func() +} + +var startupMaintenanceHook func() + // WriteMetric represents a metric sample to be written synchronously. type WriteMetric struct { ResourceType string @@ -98,10 +104,11 @@ type Store struct { buffer []bufferedMetric // Background workers - writeCh chan writeRequest - stopCh chan struct{} - doneCh chan struct{} - stopOnce sync.Once + writeCh chan writeRequest + maintenanceCh chan maintenanceRequest + stopCh chan struct{} + doneCh chan struct{} + stopOnce sync.Once } // NewStore creates a new metrics store with the given configuration @@ -141,12 +148,13 @@ func NewStore(config StoreConfig) (*Store, error) { db.SetConnMaxLifetime(0) store := &Store{ - db: db, - config: config, - buffer: make([]bufferedMetric, 0, config.WriteBufferSize), - writeCh: make(chan writeRequest, 100), // Buffer for write batches - stopCh: make(chan struct{}), - doneCh: make(chan struct{}), + db: db, + config: config, + buffer: make([]bufferedMetric, 0, config.WriteBufferSize), + writeCh: make(chan writeRequest, 100), // Buffer for write batches + maintenanceCh: make(chan maintenanceRequest, 1), + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), } // Initialize schema @@ -155,18 +163,9 @@ func NewStore(config StoreConfig) (*Store, error) { return nil, fmt.Errorf("failed to initialize schema: %w", err) } - // Clean up stale data from previous runs before starting the background worker. - // This prevents accumulation if Pulse was restarted before hourly retention ran. - // Runs BEFORE auto-vacuum migration so the VACUUM operates on a much smaller - // dataset (e.g. 60MB of live data instead of 5GB of stale + live). - store.runRetention() - - // Migrate existing databases to incremental auto-vacuum. This is a one-time - // operation that restructures the file so deleted pages can be reclaimed. - store.migrateAutoVacuum() - // Start background workers go store.backgroundWorker() + store.enqueueMaintenance(store.runStartupMaintenance) log.Info(). Str("path", config.DBPath). @@ -384,6 +383,35 @@ func (s *Store) enqueueWrite(req writeRequest) { } } +func (s *Store) enqueueMaintenance(run func()) { + if run == nil { + return + } + + select { + case s.maintenanceCh <- maintenanceRequest{run: run}: + default: + log.Debug().Msg("Metrics maintenance queue full, skipping duplicate request") + } +} + +func (s *Store) runStartupMaintenance() { + start := time.Now() + if startupMaintenanceHook != nil { + startupMaintenanceHook() + } + + // Clean up stale data before any deferred vacuum work so the database + // only restructures live rows when a long-lived installation restarts. + s.runRetention() + + // Migrate existing databases to incremental auto-vacuum. This is a one-time + // operation that restructures the file so deleted pages can be reclaimed. + s.migrateAutoVacuum() + + log.Info().Dur("duration", time.Since(start)).Msg("Deferred metrics startup maintenance completed") +} + func (s *Store) enqueueAndWait(req writeRequest) { if req.done == nil { req.done = make(chan struct{}) @@ -785,6 +813,11 @@ func (s *Store) backgroundWorker() { } s.processWriteRequests(s.coalesceQueuedRequests(req)) + case maintenance := <-s.maintenanceCh: + if maintenance.run != nil { + maintenance.run() + } + case <-flushTicker.C: s.flushBufferedAsync() diff --git a/pkg/metrics/store_additional_test.go b/pkg/metrics/store_additional_test.go index b148407dd..aa16d6980 100644 --- a/pkg/metrics/store_additional_test.go +++ b/pkg/metrics/store_additional_test.go @@ -55,7 +55,7 @@ func TestStoreWriteBatchSync(t *testing.T) { } defer store.Close() - ts := time.Unix(1000, 0) + ts := time.Now().UTC().Truncate(time.Second) metrics := []WriteMetric{ {ResourceType: "vm", ResourceID: "v1", MetricType: "cpu", Value: 10.0, Timestamp: ts, Tier: TierRaw}, {ResourceType: "vm", ResourceID: "v1", MetricType: "mem", Value: 50.0, Timestamp: ts, Tier: TierRaw}, @@ -182,3 +182,50 @@ func TestStoreFlushMakesQueuedWritesVisible(t *testing.T) { t.Fatalf("expected flushed metric to be immediately visible, got %v", points) } } + +func TestNewStoreDefersStartupMaintenance(t *testing.T) { + previousHook := startupMaintenanceHook + defer func() { + startupMaintenanceHook = previousHook + }() + + started := make(chan struct{}) + release := make(chan struct{}) + startupMaintenanceHook = func() { + close(started) + <-release + } + + dir := t.TempDir() + cfg := DefaultConfig(dir) + cfg.FlushInterval = time.Hour + + done := make(chan struct{}) + var ( + store *Store + err error + ) + go func() { + store, err = NewStore(cfg) + close(done) + }() + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatal("NewStore blocked on startup maintenance") + } + + if err != nil { + t.Fatalf("NewStore returned error: %v", err) + } + + select { + case <-started: + case <-time.After(time.Second): + t.Fatal("startup maintenance was not scheduled") + } + + close(release) + defer store.Close() +} diff --git a/pkg/metrics/store_test.go b/pkg/metrics/store_test.go index 9efe1343f..0ecc3f0f1 100644 --- a/pkg/metrics/store_test.go +++ b/pkg/metrics/store_test.go @@ -22,7 +22,7 @@ func TestStoreWriteBatchAndQuery(t *testing.T) { } defer store.Close() - ts := time.Unix(1000, 0) + ts := time.Now().UTC().Truncate(time.Second) store.writeBatch([]bufferedMetric{ {resourceType: "vm", resourceID: "vm-101", metricType: "cpu", value: 1.5, timestamp: ts, tier: TierRaw}, {resourceType: "vm", resourceID: "vm-101", metricType: "cpu", value: 2.5, timestamp: ts.Add(1 * time.Second), tier: TierRaw}, @@ -78,7 +78,7 @@ func TestStoreSelectTierAndStats(t *testing.T) { } // Insert one point for each tier to verify stats aggregation. - ts := int64(1000) + ts := time.Now().UTC().Unix() _, err = store.db.Exec( `INSERT INTO metrics (resource_type, resource_id, metric_type, value, timestamp, tier) VALUES ('vm','vm-101','cpu',1.0,?, 'raw'),