diff --git a/internal/monitoring/circuit_breaker.go b/internal/monitoring/circuit_breaker.go index 3bdb71c3e..1c156ba49 100644 --- a/internal/monitoring/circuit_breaker.go +++ b/internal/monitoring/circuit_breaker.go @@ -14,17 +14,18 @@ const ( ) type circuitBreaker struct { - mu sync.Mutex - state breakerState - failureCount int - openedAt time.Time - lastAttempt time.Time - retryInterval time.Duration - maxDelay time.Duration - openThreshold int - halfOpenWindow time.Duration - stateSince time.Time - lastTransition time.Time + mu sync.Mutex + state breakerState + failureCount int + openedAt time.Time + lastAttempt time.Time + retryInterval time.Duration + baseRetryInterval time.Duration + maxDelay time.Duration + openThreshold int + halfOpenWindow time.Duration + stateSince time.Time + lastTransition time.Time } func newCircuitBreaker(openThreshold int, retryInterval, maxDelay, halfOpenWindow time.Duration) *circuitBreaker { @@ -42,13 +43,14 @@ func newCircuitBreaker(openThreshold int, retryInterval, maxDelay, halfOpenWindo } now := time.Now() return &circuitBreaker{ - state: breakerClosed, - retryInterval: retryInterval, - maxDelay: maxDelay, - openThreshold: openThreshold, - halfOpenWindow: halfOpenWindow, - stateSince: now, - lastTransition: now, + state: breakerClosed, + retryInterval: retryInterval, + baseRetryInterval: retryInterval, + maxDelay: maxDelay, + openThreshold: openThreshold, + halfOpenWindow: halfOpenWindow, + stateSince: now, + lastTransition: now, } } @@ -86,6 +88,7 @@ func (b *circuitBreaker) recordSuccess() { now := time.Now() b.state = breakerClosed b.failureCount = 0 + b.retryInterval = b.baseRetryInterval b.stateSince = now b.lastTransition = now } diff --git a/internal/notifications/queue.go b/internal/notifications/queue.go index ca5f1502f..4bd892863 100644 --- a/internal/notifications/queue.go +++ b/internal/notifications/queue.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "fmt" + "net/url" "os" "path/filepath" "sync" @@ -72,26 +73,25 @@ func NewNotificationQueue(dataDir string) (*NotificationQueue, error) { dbPath := filepath.Join(dataDir, "notification_queue.db") - db, err := sql.Open("sqlite", dbPath) + // Open database with pragmas in DSN so every pool connection is configured + dsn := dbPath + "?" + url.Values{ + "_pragma": []string{ + "busy_timeout(30000)", + "journal_mode(WAL)", + "synchronous(NORMAL)", + "foreign_keys(ON)", + "cache_size(-64000)", + }, + }.Encode() + db, err := sql.Open("sqlite", dsn) if err != nil { return nil, fmt.Errorf("failed to open notification queue database: %w", err) } - // Configure SQLite for better concurrency and durability - pragmas := []string{ - "PRAGMA journal_mode=WAL", - "PRAGMA synchronous=NORMAL", - "PRAGMA busy_timeout=5000", - "PRAGMA foreign_keys=ON", - "PRAGMA cache_size=-64000", // 64MB cache - } - - for _, pragma := range pragmas { - if _, err := db.Exec(pragma); err != nil { - db.Close() - return nil, fmt.Errorf("failed to set pragma: %w", err) - } - } + // SQLite works best with a single writer connection + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + db.SetConnMaxLifetime(0) nq := &NotificationQueue{ db: db, @@ -750,7 +750,6 @@ func (nq *NotificationQueue) CancelByAlertIDs(alertIDs []string) error { if err != nil { return fmt.Errorf("failed to query notifications for cancellation: %w", err) } - defer rows.Close() var toCancelIDs []string alertIDSet := make(map[string]struct{}) @@ -781,8 +780,10 @@ func (nq *NotificationQueue) CancelByAlertIDs(alertIDs []string) error { } } - if err := rows.Err(); err != nil { - return fmt.Errorf("error iterating notifications for cancellation: %w", err) + rowsErr := rows.Err() + rows.Close() // Release connection before executing updates + if rowsErr != nil { + return fmt.Errorf("error iterating notifications for cancellation: %w", rowsErr) } // Cancel the matched notifications (using direct SQL since we already hold the lock) diff --git a/pkg/audit/sqlite_logger.go b/pkg/audit/sqlite_logger.go index 382f44a20..85e44fb00 100644 --- a/pkg/audit/sqlite_logger.go +++ b/pkg/audit/sqlite_logger.go @@ -3,6 +3,7 @@ package audit import ( "database/sql" "fmt" + "net/url" "os" "path/filepath" "strings" @@ -46,26 +47,25 @@ func NewSQLiteLogger(cfg SQLiteLoggerConfig) (*SQLiteLogger, error) { dbPath := filepath.Join(auditDir, "audit.db") - db, err := sql.Open("sqlite", dbPath) + // Open database with pragmas in DSN so every pool connection is configured + dsn := dbPath + "?" + url.Values{ + "_pragma": []string{ + "busy_timeout(30000)", + "journal_mode(WAL)", + "synchronous(NORMAL)", + "foreign_keys(ON)", + "cache_size(-64000)", + }, + }.Encode() + db, err := sql.Open("sqlite", dsn) if err != nil { return nil, fmt.Errorf("failed to open audit database: %w", err) } - // Configure SQLite for better concurrency and durability - pragmas := []string{ - "PRAGMA journal_mode=WAL", - "PRAGMA synchronous=NORMAL", - "PRAGMA busy_timeout=5000", - "PRAGMA foreign_keys=ON", - "PRAGMA cache_size=-64000", // 64MB cache - } - - for _, pragma := range pragmas { - if _, err := db.Exec(pragma); err != nil { - db.Close() - return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err) - } - } + // SQLite works best with a single writer connection + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + db.SetConnMaxLifetime(0) // Initialize signer signer, err := NewSigner(auditDir, cfg.CryptoMgr) diff --git a/pkg/auth/sqlite_manager.go b/pkg/auth/sqlite_manager.go index b2b90f336..70cc6db13 100644 --- a/pkg/auth/sqlite_manager.go +++ b/pkg/auth/sqlite_manager.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "fmt" + "net/url" "os" "path/filepath" "sync" @@ -43,26 +44,25 @@ func NewSQLiteManager(cfg SQLiteManagerConfig) (*SQLiteManager, error) { dbPath := filepath.Join(rbacDir, "rbac.db") - db, err := sql.Open("sqlite", dbPath) + // Open database with pragmas in DSN so every pool connection is configured + dsn := dbPath + "?" + url.Values{ + "_pragma": []string{ + "busy_timeout(30000)", + "journal_mode(WAL)", + "synchronous(NORMAL)", + "foreign_keys(ON)", + "cache_size(-32000)", + }, + }.Encode() + db, err := sql.Open("sqlite", dsn) if err != nil { return nil, fmt.Errorf("failed to open rbac database: %w", err) } - // Configure SQLite for better concurrency and durability - pragmas := []string{ - "PRAGMA journal_mode=WAL", - "PRAGMA synchronous=NORMAL", - "PRAGMA busy_timeout=5000", - "PRAGMA foreign_keys=ON", - "PRAGMA cache_size=-32000", // 32MB cache - } - - for _, pragma := range pragmas { - if _, err := db.Exec(pragma); err != nil { - db.Close() - return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err) - } - } + // SQLite works best with a single writer connection + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + db.SetConnMaxLifetime(0) retention := cfg.ChangeLogRetention if retention == 0 { @@ -256,8 +256,9 @@ func (m *SQLiteManager) GetRoles() []Role { log.Error().Err(err).Msg("Failed to query roles") return nil } - defer rows.Close() + // Collect roles first, then close rows before loading permissions + // (avoids holding the connection during nested queries with MaxOpenConns=1) var roles []Role for rows.Next() { var role Role @@ -275,11 +276,14 @@ func (m *SQLiteManager) GetRoles() []Role { role.CreatedAt = time.Unix(createdAt, 0) role.UpdatedAt = time.Unix(updatedAt, 0) - // Load permissions - role.Permissions = m.loadRolePermissions(role.ID) - roles = append(roles, role) } + rows.Close() + + // Load permissions after releasing the connection + for i := range roles { + roles[i].Permissions = m.loadRolePermissions(roles[i].ID) + } return roles } @@ -518,21 +522,26 @@ func (m *SQLiteManager) GetUserAssignments() []UserRoleAssignment { m.mu.RLock() defer m.mu.RUnlock() - // Get unique usernames + // Collect usernames first, then close rows before nested queries + // (avoids holding the connection during nested queries with MaxOpenConns=1) rows, err := m.db.Query("SELECT DISTINCT username FROM rbac_user_assignments") if err != nil { log.Error().Err(err).Msg("Failed to query user assignments") return nil } - defer rows.Close() - var assignments []UserRoleAssignment + var usernames []string for rows.Next() { var username string if err := rows.Scan(&username); err != nil { continue } + usernames = append(usernames, username) + } + rows.Close() + var assignments []UserRoleAssignment + for _, username := range usernames { assignment := m.getUserAssignmentUnsafe(username) if len(assignment.RoleIDs) > 0 { assignments = append(assignments, assignment) diff --git a/pkg/metrics/store.go b/pkg/metrics/store.go index 532242743..eab5bdfc3 100644 --- a/pkg/metrics/store.go +++ b/pkg/metrics/store.go @@ -5,6 +5,7 @@ package metrics import ( "database/sql" "fmt" + "net/url" "os" "path/filepath" "strconv" @@ -101,16 +102,19 @@ func NewStore(config StoreConfig) (*Store, error) { return nil, fmt.Errorf("failed to create metrics directory: %w", err) } - // Open database - db, err := sql.Open("sqlite", config.DBPath) + // Open database with pragmas in DSN so every pool connection is configured + dsn := config.DBPath + "?" + url.Values{ + "_pragma": []string{ + "busy_timeout(30000)", + "journal_mode(WAL)", + "synchronous(NORMAL)", + }, + }.Encode() + db, err := sql.Open("sqlite", dsn) if err != nil { return nil, fmt.Errorf("failed to open metrics database: %w", err) } - // Set busy timeout and WAL mode explicitly - _, _ = db.Exec("PRAGMA busy_timeout = 5000") - _, _ = db.Exec("PRAGMA journal_mode = WAL") - // Configure connection pool (SQLite works best with single writer) db.SetMaxOpenConns(1) db.SetMaxIdleConns(1)