Reduce metrics store transaction churn (#1124)

This commit is contained in:
rcourtman 2026-03-25 12:06:28 +00:00
parent 73bebf2f4f
commit 2acf2e9ef9
2 changed files with 74 additions and 5 deletions

View file

@ -49,8 +49,11 @@ type StoreConfig struct {
// DefaultConfig returns sensible defaults for metrics storage
func DefaultConfig(dataDir string) StoreConfig {
return StoreConfig{
DBPath: filepath.Join(dataDir, "metrics.db"),
WriteBufferSize: 100,
DBPath: filepath.Join(dataDir, "metrics.db"),
// Large installs can enqueue hundreds of metric points per poll cycle.
// A larger buffer keeps those writes inside a single SQLite transaction
// more often, which materially reduces WAL churn on SSD-backed setups.
WriteBufferSize: 500,
FlushInterval: 5 * time.Second,
RetentionRaw: 2 * time.Hour,
RetentionMinute: 24 * time.Hour,
@ -110,7 +113,9 @@ func NewStore(config StoreConfig) (*Store, error) {
"journal_mode(WAL)",
"synchronous(NORMAL)",
"auto_vacuum(INCREMENTAL)",
"wal_autocheckpoint(500)",
// Checkpoint less aggressively so high-cardinality installs don't
// keep rewriting tiny WAL segments back into the main DB file.
"wal_autocheckpoint(4000)",
},
}.Encode()
db, err := sql.Open("sqlite", dsn)
@ -406,6 +411,32 @@ func (s *Store) writeBatch(metrics []bufferedMetric) {
log.Debug().Int("count", len(metrics)).Msg("Wrote metrics batch")
}
// coalesceQueuedBatches drains any already-queued write batches so the worker
// can commit them in a single SQLite transaction. This reduces WAL write
// amplification when the in-memory buffer flushes multiple times during one
// poll cycle.
func (s *Store) coalesceQueuedBatches(initial []bufferedMetric) []bufferedMetric {
if len(initial) == 0 {
return nil
}
combined := initial
for {
select {
case next, ok := <-s.writeCh:
if !ok {
return combined
}
if len(next) == 0 {
continue
}
combined = append(combined, next...)
default:
return combined
}
}
}
// Query retrieves metrics for a resource within a time range, with optional downsampling
func (s *Store) Query(resourceType, resourceID, metricType string, start, end time.Time, stepSecs int64) ([]MetricPoint, error) {
tiers := s.tierFallbacks(end.Sub(start))
@ -673,13 +704,20 @@ func (s *Store) backgroundWorker() {
s.Flush()
// Process remaining writes
close(s.writeCh)
var remaining []bufferedMetric
for batch := range s.writeCh {
s.writeBatch(batch)
if len(batch) == 0 {
continue
}
remaining = append(remaining, batch...)
}
if len(remaining) > 0 {
s.writeBatch(remaining)
}
return
case batch := <-s.writeCh:
s.writeBatch(batch)
s.writeBatch(s.coalesceQueuedBatches(batch))
case <-flushTicker.C:
s.Flush()

View file

@ -6,6 +6,37 @@ import (
"time"
)
func TestDefaultConfig_UsesLargerWriteBuffer(t *testing.T) {
cfg := DefaultConfig("/tmp/pulse")
if cfg.WriteBufferSize != 500 {
t.Fatalf("WriteBufferSize = %d, want 500", cfg.WriteBufferSize)
}
}
func TestStoreCoalesceQueuedBatches(t *testing.T) {
store := &Store{
writeCh: make(chan []bufferedMetric, 4),
}
initial := []bufferedMetric{
{resourceType: "vm", resourceID: "vm-1", metricType: "cpu", value: 10},
}
store.writeCh <- []bufferedMetric{
{resourceType: "vm", resourceID: "vm-1", metricType: "memory", value: 20},
}
store.writeCh <- []bufferedMetric{
{resourceType: "vm", resourceID: "vm-2", metricType: "cpu", value: 30},
}
combined := store.coalesceQueuedBatches(initial)
if len(combined) != 3 {
t.Fatalf("expected 3 combined metrics, got %d", len(combined))
}
if len(store.writeCh) != 0 {
t.Fatalf("expected queued batches to be drained, got %d remaining", len(store.writeCh))
}
}
func TestStoreWriteBatchSync(t *testing.T) {
dir := t.TempDir()
store, err := NewStore(DefaultConfig(dir))