Use waitgroup instead of mutex for sqlite storage

This commit is contained in:
Daniel 2025-02-28 11:38:27 +01:00
parent c04213219b
commit 71f6f09384

View file

@ -33,9 +33,9 @@ import (
type SQLite struct {
name string
db *sql.DB
bob bob.DB
lock sync.RWMutex
db *sql.DB
bob bob.DB
wg sync.WaitGroup
ctx context.Context
cancelCtx context.CancelFunc
@ -102,8 +102,8 @@ func openSQLite(name, location string, printStmts bool) (*SQLite, error) {
// Get returns a database record.
func (db *SQLite) Get(key string) (record.Record, error) {
db.lock.RLock()
defer db.lock.RUnlock()
db.wg.Add(1)
defer db.wg.Done()
// Get record from database.
r, err := models.FindRecord(db.ctx, db.bob, key)
@ -137,6 +137,9 @@ func (db *SQLite) Put(r record.Record) (record.Record, error) {
}
func (db *SQLite) putRecord(r record.Record, tx *bob.Tx) (record.Record, error) {
db.wg.Add(1)
defer db.wg.Done()
// Lock record if in a transaction.
if tx != nil {
r.Lock()
@ -170,10 +173,6 @@ func (db *SQLite) putRecord(r record.Record, tx *bob.Tx) (record.Record, error)
Crownjewel: omit.From(m.IsCrownJewel()),
}
// Lock for writing.
db.lock.Lock()
defer db.lock.Unlock()
// Simulate upsert with custom selection on conflict.
dbQuery := models.Records.Insert(
&setter,
@ -197,10 +196,8 @@ func (db *SQLite) putRecord(r record.Record, tx *bob.Tx) (record.Record, error)
// PutMany stores many records in the database.
func (db *SQLite) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
db.lock.Lock()
defer db.lock.Unlock()
// we could lock for every record, but we want to have the same behaviour
// as the other storage backends, especially for testing.
db.wg.Add(1)
defer db.wg.Done()
batch := make(chan record.Record, 100)
errs := make(chan error, 1)
@ -245,9 +242,8 @@ func (db *SQLite) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error
// Delete deletes a record from the database.
func (db *SQLite) Delete(key string) error {
// Lock for writing.
db.lock.Lock()
defer db.lock.Unlock()
db.wg.Add(1)
defer db.wg.Done()
toDelete := &models.Record{Key: key}
return toDelete.Delete(db.ctx, db.bob)
@ -255,6 +251,9 @@ func (db *SQLite) Delete(key string) error {
// Query returns a an iterator for the supplied query.
func (db *SQLite) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
db.wg.Add(1)
defer db.wg.Done()
_, err := q.Check()
if err != nil {
return nil, fmt.Errorf("invalid query: %w", err)
@ -267,6 +266,9 @@ func (db *SQLite) Query(q *query.Query, local, internal bool) (*iterator.Iterato
}
func (db *SQLite) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) {
db.wg.Add(1)
defer db.wg.Done()
// Build query.
var recordQuery *sqlite.ViewQuery[*models.Record, models.RecordSlice]
if q.DatabaseKeyPrefix() != "" {
@ -278,9 +280,7 @@ func (db *SQLite) queryExecutor(queryIter *iterator.Iterator, q *query.Query, lo
}
// Get cursor to go over all records in the query.
db.lock.RLock()
cursor, err := models.RecordsQuery.Cursor(recordQuery, db.ctx, db.bob)
db.lock.RUnlock()
if err != nil {
queryIter.Finish(err)
return
@ -352,11 +352,11 @@ recordsLoop:
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.
func (db *SQLite) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) {
db.wg.Add(1)
defer db.wg.Done()
// Optimize for local and internal queries without where clause and without shadow delete.
if local && internal && !shadowDelete && !q.HasWhereCondition() {
db.lock.Lock()
defer db.lock.Unlock()
// First count entries (SQLite does not support affected rows)
n, err := models.Records.Query(
models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"),
@ -374,9 +374,6 @@ func (db *SQLite) Purge(ctx context.Context, q *query.Query, local, internal, sh
// Optimize for local and internal queries without where clause, but with shadow delete.
if local && internal && shadowDelete && !q.HasWhereCondition() {
db.lock.Lock()
defer db.lock.Unlock()
// First count entries (SQLite does not support affected rows)
n, err := models.Records.Query(
models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"),
@ -429,8 +426,8 @@ func (db *SQLite) Injected() bool {
// MaintainRecordStates maintains records states in the database.
func (db *SQLite) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
db.lock.Lock()
defer db.lock.Unlock()
db.wg.Add(1)
defer db.wg.Done()
now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix()
@ -471,8 +468,8 @@ func (db *SQLite) MaintainRecordStates(ctx context.Context, purgeDeletedBefore t
}
func (db *SQLite) Maintain(ctx context.Context) error {
db.lock.Lock()
defer db.lock.Unlock()
db.wg.Add(1)
defer db.wg.Done()
// Remove up to about 100KB of SQLite pages from the freelist on every run.
// (Assuming 4KB page size.)
@ -481,8 +478,8 @@ func (db *SQLite) Maintain(ctx context.Context) error {
}
func (db *SQLite) MaintainThorough(ctx context.Context) error {
db.lock.Lock()
defer db.lock.Unlock()
db.wg.Add(1)
defer db.wg.Done()
// Remove all pages from the freelist.
_, err := db.db.ExecContext(ctx, "PRAGMA incremental_vacuum;")
@ -491,8 +488,7 @@ func (db *SQLite) MaintainThorough(ctx context.Context) error {
// Shutdown shuts down the database.
func (db *SQLite) Shutdown() error {
db.lock.Lock()
defer db.lock.Unlock()
db.wg.Wait()
db.cancelCtx()
return db.bob.Close()