Defer metrics store startup maintenance

This commit is contained in:
rcourtman 2026-04-01 12:44:17 +01:00
parent 63cc0e038b
commit 387a79bc2d
3 changed files with 103 additions and 23 deletions

View file

@ -78,6 +78,12 @@ type writeRequest struct {
done chan struct{}
}
type maintenanceRequest struct {
run func()
}
var startupMaintenanceHook func()
// WriteMetric represents a metric sample to be written synchronously.
type WriteMetric struct {
ResourceType string
@ -98,10 +104,11 @@ type Store struct {
buffer []bufferedMetric
// Background workers
writeCh chan writeRequest
stopCh chan struct{}
doneCh chan struct{}
stopOnce sync.Once
writeCh chan writeRequest
maintenanceCh chan maintenanceRequest
stopCh chan struct{}
doneCh chan struct{}
stopOnce sync.Once
}
// NewStore creates a new metrics store with the given configuration
@ -141,12 +148,13 @@ func NewStore(config StoreConfig) (*Store, error) {
db.SetConnMaxLifetime(0)
store := &Store{
db: db,
config: config,
buffer: make([]bufferedMetric, 0, config.WriteBufferSize),
writeCh: make(chan writeRequest, 100), // Buffer for write batches
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
db: db,
config: config,
buffer: make([]bufferedMetric, 0, config.WriteBufferSize),
writeCh: make(chan writeRequest, 100), // Buffer for write batches
maintenanceCh: make(chan maintenanceRequest, 1),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
// Initialize schema
@ -155,18 +163,9 @@ func NewStore(config StoreConfig) (*Store, error) {
return nil, fmt.Errorf("failed to initialize schema: %w", err)
}
// Clean up stale data from previous runs before starting the background worker.
// This prevents accumulation if Pulse was restarted before hourly retention ran.
// Runs BEFORE auto-vacuum migration so the VACUUM operates on a much smaller
// dataset (e.g. 60MB of live data instead of 5GB of stale + live).
store.runRetention()
// Migrate existing databases to incremental auto-vacuum. This is a one-time
// operation that restructures the file so deleted pages can be reclaimed.
store.migrateAutoVacuum()
// Start background workers
go store.backgroundWorker()
store.enqueueMaintenance(store.runStartupMaintenance)
log.Info().
Str("path", config.DBPath).
@ -384,6 +383,35 @@ func (s *Store) enqueueWrite(req writeRequest) {
}
}
func (s *Store) enqueueMaintenance(run func()) {
if run == nil {
return
}
select {
case s.maintenanceCh <- maintenanceRequest{run: run}:
default:
log.Debug().Msg("Metrics maintenance queue full, skipping duplicate request")
}
}
func (s *Store) runStartupMaintenance() {
start := time.Now()
if startupMaintenanceHook != nil {
startupMaintenanceHook()
}
// Clean up stale data before any deferred vacuum work so the database
// only restructures live rows when a long-lived installation restarts.
s.runRetention()
// Migrate existing databases to incremental auto-vacuum. This is a one-time
// operation that restructures the file so deleted pages can be reclaimed.
s.migrateAutoVacuum()
log.Info().Dur("duration", time.Since(start)).Msg("Deferred metrics startup maintenance completed")
}
func (s *Store) enqueueAndWait(req writeRequest) {
if req.done == nil {
req.done = make(chan struct{})
@ -785,6 +813,11 @@ func (s *Store) backgroundWorker() {
}
s.processWriteRequests(s.coalesceQueuedRequests(req))
case maintenance := <-s.maintenanceCh:
if maintenance.run != nil {
maintenance.run()
}
case <-flushTicker.C:
s.flushBufferedAsync()

View file

@ -55,7 +55,7 @@ func TestStoreWriteBatchSync(t *testing.T) {
}
defer store.Close()
ts := time.Unix(1000, 0)
ts := time.Now().UTC().Truncate(time.Second)
metrics := []WriteMetric{
{ResourceType: "vm", ResourceID: "v1", MetricType: "cpu", Value: 10.0, Timestamp: ts, Tier: TierRaw},
{ResourceType: "vm", ResourceID: "v1", MetricType: "mem", Value: 50.0, Timestamp: ts, Tier: TierRaw},
@ -182,3 +182,50 @@ func TestStoreFlushMakesQueuedWritesVisible(t *testing.T) {
t.Fatalf("expected flushed metric to be immediately visible, got %v", points)
}
}
func TestNewStoreDefersStartupMaintenance(t *testing.T) {
previousHook := startupMaintenanceHook
defer func() {
startupMaintenanceHook = previousHook
}()
started := make(chan struct{})
release := make(chan struct{})
startupMaintenanceHook = func() {
close(started)
<-release
}
dir := t.TempDir()
cfg := DefaultConfig(dir)
cfg.FlushInterval = time.Hour
done := make(chan struct{})
var (
store *Store
err error
)
go func() {
store, err = NewStore(cfg)
close(done)
}()
select {
case <-done:
case <-time.After(200 * time.Millisecond):
t.Fatal("NewStore blocked on startup maintenance")
}
if err != nil {
t.Fatalf("NewStore returned error: %v", err)
}
select {
case <-started:
case <-time.After(time.Second):
t.Fatal("startup maintenance was not scheduled")
}
close(release)
defer store.Close()
}

View file

@ -22,7 +22,7 @@ func TestStoreWriteBatchAndQuery(t *testing.T) {
}
defer store.Close()
ts := time.Unix(1000, 0)
ts := time.Now().UTC().Truncate(time.Second)
store.writeBatch([]bufferedMetric{
{resourceType: "vm", resourceID: "vm-101", metricType: "cpu", value: 1.5, timestamp: ts, tier: TierRaw},
{resourceType: "vm", resourceID: "vm-101", metricType: "cpu", value: 2.5, timestamp: ts.Add(1 * time.Second), tier: TierRaw},
@ -78,7 +78,7 @@ func TestStoreSelectTierAndStats(t *testing.T) {
}
// Insert one point for each tier to verify stats aggregation.
ts := int64(1000)
ts := time.Now().UTC().Unix()
_, err = store.db.Exec(
`INSERT INTO metrics (resource_type, resource_id, metric_type, value, timestamp, tier) VALUES
('vm','vm-101','cpu',1.0,?, 'raw'),