mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-04-28 03:20:11 +00:00
fix: harden SQLite against I/O contention causing persistent lock errors
- Move all SQLite pragmas from db.Exec() to DSN parameters so every connection the pool creates gets busy_timeout and other settings. Previously only the first connection had these applied. - Set MaxOpenConns(1) on audit, RBAC, and notification databases (metrics already had this). Fixes potential for multiple connections where new ones lack busy_timeout. - Increase busy_timeout from 5s to 30s across all databases to tolerate disk I/O pressure during backup windows. - Fix nested query deadlocks in GetRoles(), GetUserAssignments(), and CancelByAlertIDs() that would deadlock with MaxOpenConns(1). - Fix circuit breaker retryInterval not resetting on recovery, which caused the next trip to start at 5-minute backoff instead of 5s. Related to #1156
This commit is contained in:
parent
ded1593048
commit
3b347b6548
5 changed files with 99 additions and 82 deletions
|
|
@ -14,17 +14,18 @@ const (
|
|||
)
|
||||
|
||||
type circuitBreaker struct {
|
||||
mu sync.Mutex
|
||||
state breakerState
|
||||
failureCount int
|
||||
openedAt time.Time
|
||||
lastAttempt time.Time
|
||||
retryInterval time.Duration
|
||||
maxDelay time.Duration
|
||||
openThreshold int
|
||||
halfOpenWindow time.Duration
|
||||
stateSince time.Time
|
||||
lastTransition time.Time
|
||||
mu sync.Mutex
|
||||
state breakerState
|
||||
failureCount int
|
||||
openedAt time.Time
|
||||
lastAttempt time.Time
|
||||
retryInterval time.Duration
|
||||
baseRetryInterval time.Duration
|
||||
maxDelay time.Duration
|
||||
openThreshold int
|
||||
halfOpenWindow time.Duration
|
||||
stateSince time.Time
|
||||
lastTransition time.Time
|
||||
}
|
||||
|
||||
func newCircuitBreaker(openThreshold int, retryInterval, maxDelay, halfOpenWindow time.Duration) *circuitBreaker {
|
||||
|
|
@ -42,13 +43,14 @@ func newCircuitBreaker(openThreshold int, retryInterval, maxDelay, halfOpenWindo
|
|||
}
|
||||
now := time.Now()
|
||||
return &circuitBreaker{
|
||||
state: breakerClosed,
|
||||
retryInterval: retryInterval,
|
||||
maxDelay: maxDelay,
|
||||
openThreshold: openThreshold,
|
||||
halfOpenWindow: halfOpenWindow,
|
||||
stateSince: now,
|
||||
lastTransition: now,
|
||||
state: breakerClosed,
|
||||
retryInterval: retryInterval,
|
||||
baseRetryInterval: retryInterval,
|
||||
maxDelay: maxDelay,
|
||||
openThreshold: openThreshold,
|
||||
halfOpenWindow: halfOpenWindow,
|
||||
stateSince: now,
|
||||
lastTransition: now,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -86,6 +88,7 @@ func (b *circuitBreaker) recordSuccess() {
|
|||
now := time.Now()
|
||||
b.state = breakerClosed
|
||||
b.failureCount = 0
|
||||
b.retryInterval = b.baseRetryInterval
|
||||
b.stateSince = now
|
||||
b.lastTransition = now
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
|
@ -72,26 +73,25 @@ func NewNotificationQueue(dataDir string) (*NotificationQueue, error) {
|
|||
|
||||
dbPath := filepath.Join(dataDir, "notification_queue.db")
|
||||
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
// Open database with pragmas in DSN so every pool connection is configured
|
||||
dsn := dbPath + "?" + url.Values{
|
||||
"_pragma": []string{
|
||||
"busy_timeout(30000)",
|
||||
"journal_mode(WAL)",
|
||||
"synchronous(NORMAL)",
|
||||
"foreign_keys(ON)",
|
||||
"cache_size(-64000)",
|
||||
},
|
||||
}.Encode()
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open notification queue database: %w", err)
|
||||
}
|
||||
|
||||
// Configure SQLite for better concurrency and durability
|
||||
pragmas := []string{
|
||||
"PRAGMA journal_mode=WAL",
|
||||
"PRAGMA synchronous=NORMAL",
|
||||
"PRAGMA busy_timeout=5000",
|
||||
"PRAGMA foreign_keys=ON",
|
||||
"PRAGMA cache_size=-64000", // 64MB cache
|
||||
}
|
||||
|
||||
for _, pragma := range pragmas {
|
||||
if _, err := db.Exec(pragma); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("failed to set pragma: %w", err)
|
||||
}
|
||||
}
|
||||
// SQLite works best with a single writer connection
|
||||
db.SetMaxOpenConns(1)
|
||||
db.SetMaxIdleConns(1)
|
||||
db.SetConnMaxLifetime(0)
|
||||
|
||||
nq := &NotificationQueue{
|
||||
db: db,
|
||||
|
|
@ -750,7 +750,6 @@ func (nq *NotificationQueue) CancelByAlertIDs(alertIDs []string) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to query notifications for cancellation: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var toCancelIDs []string
|
||||
alertIDSet := make(map[string]struct{})
|
||||
|
|
@ -781,8 +780,10 @@ func (nq *NotificationQueue) CancelByAlertIDs(alertIDs []string) error {
|
|||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return fmt.Errorf("error iterating notifications for cancellation: %w", err)
|
||||
rowsErr := rows.Err()
|
||||
rows.Close() // Release connection before executing updates
|
||||
if rowsErr != nil {
|
||||
return fmt.Errorf("error iterating notifications for cancellation: %w", rowsErr)
|
||||
}
|
||||
|
||||
// Cancel the matched notifications (using direct SQL since we already hold the lock)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package audit
|
|||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
|
@ -46,26 +47,25 @@ func NewSQLiteLogger(cfg SQLiteLoggerConfig) (*SQLiteLogger, error) {
|
|||
|
||||
dbPath := filepath.Join(auditDir, "audit.db")
|
||||
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
// Open database with pragmas in DSN so every pool connection is configured
|
||||
dsn := dbPath + "?" + url.Values{
|
||||
"_pragma": []string{
|
||||
"busy_timeout(30000)",
|
||||
"journal_mode(WAL)",
|
||||
"synchronous(NORMAL)",
|
||||
"foreign_keys(ON)",
|
||||
"cache_size(-64000)",
|
||||
},
|
||||
}.Encode()
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open audit database: %w", err)
|
||||
}
|
||||
|
||||
// Configure SQLite for better concurrency and durability
|
||||
pragmas := []string{
|
||||
"PRAGMA journal_mode=WAL",
|
||||
"PRAGMA synchronous=NORMAL",
|
||||
"PRAGMA busy_timeout=5000",
|
||||
"PRAGMA foreign_keys=ON",
|
||||
"PRAGMA cache_size=-64000", // 64MB cache
|
||||
}
|
||||
|
||||
for _, pragma := range pragmas {
|
||||
if _, err := db.Exec(pragma); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err)
|
||||
}
|
||||
}
|
||||
// SQLite works best with a single writer connection
|
||||
db.SetMaxOpenConns(1)
|
||||
db.SetMaxIdleConns(1)
|
||||
db.SetConnMaxLifetime(0)
|
||||
|
||||
// Initialize signer
|
||||
signer, err := NewSigner(auditDir, cfg.CryptoMgr)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
|
@ -43,26 +44,25 @@ func NewSQLiteManager(cfg SQLiteManagerConfig) (*SQLiteManager, error) {
|
|||
|
||||
dbPath := filepath.Join(rbacDir, "rbac.db")
|
||||
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
// Open database with pragmas in DSN so every pool connection is configured
|
||||
dsn := dbPath + "?" + url.Values{
|
||||
"_pragma": []string{
|
||||
"busy_timeout(30000)",
|
||||
"journal_mode(WAL)",
|
||||
"synchronous(NORMAL)",
|
||||
"foreign_keys(ON)",
|
||||
"cache_size(-32000)",
|
||||
},
|
||||
}.Encode()
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open rbac database: %w", err)
|
||||
}
|
||||
|
||||
// Configure SQLite for better concurrency and durability
|
||||
pragmas := []string{
|
||||
"PRAGMA journal_mode=WAL",
|
||||
"PRAGMA synchronous=NORMAL",
|
||||
"PRAGMA busy_timeout=5000",
|
||||
"PRAGMA foreign_keys=ON",
|
||||
"PRAGMA cache_size=-32000", // 32MB cache
|
||||
}
|
||||
|
||||
for _, pragma := range pragmas {
|
||||
if _, err := db.Exec(pragma); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err)
|
||||
}
|
||||
}
|
||||
// SQLite works best with a single writer connection
|
||||
db.SetMaxOpenConns(1)
|
||||
db.SetMaxIdleConns(1)
|
||||
db.SetConnMaxLifetime(0)
|
||||
|
||||
retention := cfg.ChangeLogRetention
|
||||
if retention == 0 {
|
||||
|
|
@ -256,8 +256,9 @@ func (m *SQLiteManager) GetRoles() []Role {
|
|||
log.Error().Err(err).Msg("Failed to query roles")
|
||||
return nil
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Collect roles first, then close rows before loading permissions
|
||||
// (avoids holding the connection during nested queries with MaxOpenConns=1)
|
||||
var roles []Role
|
||||
for rows.Next() {
|
||||
var role Role
|
||||
|
|
@ -275,11 +276,14 @@ func (m *SQLiteManager) GetRoles() []Role {
|
|||
role.CreatedAt = time.Unix(createdAt, 0)
|
||||
role.UpdatedAt = time.Unix(updatedAt, 0)
|
||||
|
||||
// Load permissions
|
||||
role.Permissions = m.loadRolePermissions(role.ID)
|
||||
|
||||
roles = append(roles, role)
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
// Load permissions after releasing the connection
|
||||
for i := range roles {
|
||||
roles[i].Permissions = m.loadRolePermissions(roles[i].ID)
|
||||
}
|
||||
|
||||
return roles
|
||||
}
|
||||
|
|
@ -518,21 +522,26 @@ func (m *SQLiteManager) GetUserAssignments() []UserRoleAssignment {
|
|||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
// Get unique usernames
|
||||
// Collect usernames first, then close rows before nested queries
|
||||
// (avoids holding the connection during nested queries with MaxOpenConns=1)
|
||||
rows, err := m.db.Query("SELECT DISTINCT username FROM rbac_user_assignments")
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to query user assignments")
|
||||
return nil
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var assignments []UserRoleAssignment
|
||||
var usernames []string
|
||||
for rows.Next() {
|
||||
var username string
|
||||
if err := rows.Scan(&username); err != nil {
|
||||
continue
|
||||
}
|
||||
usernames = append(usernames, username)
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
var assignments []UserRoleAssignment
|
||||
for _, username := range usernames {
|
||||
assignment := m.getUserAssignmentUnsafe(username)
|
||||
if len(assignment.RoleIDs) > 0 {
|
||||
assignments = append(assignments, assignment)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ package metrics
|
|||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
|
@ -101,16 +102,19 @@ func NewStore(config StoreConfig) (*Store, error) {
|
|||
return nil, fmt.Errorf("failed to create metrics directory: %w", err)
|
||||
}
|
||||
|
||||
// Open database
|
||||
db, err := sql.Open("sqlite", config.DBPath)
|
||||
// Open database with pragmas in DSN so every pool connection is configured
|
||||
dsn := config.DBPath + "?" + url.Values{
|
||||
"_pragma": []string{
|
||||
"busy_timeout(30000)",
|
||||
"journal_mode(WAL)",
|
||||
"synchronous(NORMAL)",
|
||||
},
|
||||
}.Encode()
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open metrics database: %w", err)
|
||||
}
|
||||
|
||||
// Set busy timeout and WAL mode explicitly
|
||||
_, _ = db.Exec("PRAGMA busy_timeout = 5000")
|
||||
_, _ = db.Exec("PRAGMA journal_mode = WAL")
|
||||
|
||||
// Configure connection pool (SQLite works best with single writer)
|
||||
db.SetMaxOpenConns(1)
|
||||
db.SetMaxIdleConns(1)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue