diff --git a/pkg/metrics/store.go b/pkg/metrics/store.go index 4b8565d27..3a37e14ce 100644 --- a/pkg/metrics/store.go +++ b/pkg/metrics/store.go @@ -49,8 +49,11 @@ type StoreConfig struct { // DefaultConfig returns sensible defaults for metrics storage func DefaultConfig(dataDir string) StoreConfig { return StoreConfig{ - DBPath: filepath.Join(dataDir, "metrics.db"), - WriteBufferSize: 100, + DBPath: filepath.Join(dataDir, "metrics.db"), + // Large installs can enqueue hundreds of metric points per poll cycle. + // A larger buffer keeps those writes inside a single SQLite transaction + // more often, which materially reduces WAL churn on SSD-backed setups. + WriteBufferSize: 500, FlushInterval: 5 * time.Second, RetentionRaw: 2 * time.Hour, RetentionMinute: 24 * time.Hour, @@ -110,7 +113,9 @@ func NewStore(config StoreConfig) (*Store, error) { "journal_mode(WAL)", "synchronous(NORMAL)", "auto_vacuum(INCREMENTAL)", - "wal_autocheckpoint(500)", + // Checkpoint less aggressively so high-cardinality installs don't + // keep rewriting tiny WAL segments back into the main DB file. + "wal_autocheckpoint(4000)", }, }.Encode() db, err := sql.Open("sqlite", dsn) @@ -406,6 +411,32 @@ func (s *Store) writeBatch(metrics []bufferedMetric) { log.Debug().Int("count", len(metrics)).Msg("Wrote metrics batch") } +// coalesceQueuedBatches drains any already-queued write batches so the worker +// can commit them in a single SQLite transaction. This reduces WAL write +// amplification when the in-memory buffer flushes multiple times during one +// poll cycle. +func (s *Store) coalesceQueuedBatches(initial []bufferedMetric) []bufferedMetric { + if len(initial) == 0 { + return nil + } + + combined := initial + for { + select { + case next, ok := <-s.writeCh: + if !ok { + return combined + } + if len(next) == 0 { + continue + } + combined = append(combined, next...) + default: + return combined + } + } +} + // Query retrieves metrics for a resource within a time range, with optional downsampling func (s *Store) Query(resourceType, resourceID, metricType string, start, end time.Time, stepSecs int64) ([]MetricPoint, error) { tiers := s.tierFallbacks(end.Sub(start)) @@ -673,13 +704,20 @@ func (s *Store) backgroundWorker() { s.Flush() // Process remaining writes close(s.writeCh) + var remaining []bufferedMetric for batch := range s.writeCh { - s.writeBatch(batch) + if len(batch) == 0 { + continue + } + remaining = append(remaining, batch...) + } + if len(remaining) > 0 { + s.writeBatch(remaining) } return case batch := <-s.writeCh: - s.writeBatch(batch) + s.writeBatch(s.coalesceQueuedBatches(batch)) case <-flushTicker.C: s.Flush() diff --git a/pkg/metrics/store_additional_test.go b/pkg/metrics/store_additional_test.go index 638552b73..4a6152397 100644 --- a/pkg/metrics/store_additional_test.go +++ b/pkg/metrics/store_additional_test.go @@ -6,6 +6,37 @@ import ( "time" ) +func TestDefaultConfig_UsesLargerWriteBuffer(t *testing.T) { + cfg := DefaultConfig("/tmp/pulse") + if cfg.WriteBufferSize != 500 { + t.Fatalf("WriteBufferSize = %d, want 500", cfg.WriteBufferSize) + } +} + +func TestStoreCoalesceQueuedBatches(t *testing.T) { + store := &Store{ + writeCh: make(chan []bufferedMetric, 4), + } + + initial := []bufferedMetric{ + {resourceType: "vm", resourceID: "vm-1", metricType: "cpu", value: 10}, + } + store.writeCh <- []bufferedMetric{ + {resourceType: "vm", resourceID: "vm-1", metricType: "memory", value: 20}, + } + store.writeCh <- []bufferedMetric{ + {resourceType: "vm", resourceID: "vm-2", metricType: "cpu", value: 30}, + } + + combined := store.coalesceQueuedBatches(initial) + if len(combined) != 3 { + t.Fatalf("expected 3 combined metrics, got %d", len(combined)) + } + if len(store.writeCh) != 0 { + t.Fatalf("expected queued batches to be drained, got %d remaining", len(store.writeCh)) + } +} + func TestStoreWriteBatchSync(t *testing.T) { dir := t.TempDir() store, err := NewStore(DefaultConfig(dir))