mirror of
https://github.com/safing/portmaster
synced 2025-09-11 07:24:36 +00:00
Add SQLite database storage backend
This commit is contained in:
parent
fdca991166
commit
c742c7dfd1
18 changed files with 1722 additions and 74 deletions
376
base/database/storage/sqlite/sqlite.go
Normal file
376
base/database/storage/sqlite/sqlite.go
Normal file
|
@ -0,0 +1,376 @@
|
|||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aarondl/opt/omit"
|
||||
migrate "github.com/rubenv/sql-migrate"
|
||||
"github.com/safing/portmaster/base/database/accessor"
|
||||
"github.com/safing/portmaster/base/database/iterator"
|
||||
"github.com/safing/portmaster/base/database/query"
|
||||
"github.com/safing/portmaster/base/database/record"
|
||||
"github.com/safing/portmaster/base/database/storage"
|
||||
"github.com/safing/portmaster/base/database/storage/sqlite/models"
|
||||
"github.com/safing/portmaster/base/log"
|
||||
"github.com/safing/structures/dsd"
|
||||
"github.com/stephenafamo/bob"
|
||||
"github.com/stephenafamo/bob/dialect/sqlite"
|
||||
"github.com/stephenafamo/bob/dialect/sqlite/im"
|
||||
"github.com/stephenafamo/bob/dialect/sqlite/um"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// SQLite storage.
|
||||
type SQLite struct {
|
||||
name string
|
||||
|
||||
db *sql.DB
|
||||
bob bob.DB
|
||||
lock sync.RWMutex
|
||||
|
||||
ctx context.Context
|
||||
cancelCtx context.CancelFunc
|
||||
}
|
||||
|
||||
func init() {
|
||||
_ = storage.Register("sqlite", func(name, location string) (storage.Interface, error) {
|
||||
return NewSQLite(name, location)
|
||||
})
|
||||
}
|
||||
|
||||
// NewSQLite creates a sqlite database.
|
||||
func NewSQLite(name, location string) (*SQLite, error) {
|
||||
dbFile := filepath.Join(location, "db.sqlite")
|
||||
|
||||
// Open database file.
|
||||
db, err := sql.Open("sqlite", dbFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open sqlite: %w", err)
|
||||
}
|
||||
|
||||
// Run migrations on database.
|
||||
n, err := migrate.Exec(db, "sqlite3", getMigrations(), migrate.Up)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migrate sqlite: %w", err)
|
||||
}
|
||||
log.Debugf("database/sqlite: ran %d migrations on %s database", n, name)
|
||||
|
||||
// Return as bob database.
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
return &SQLite{
|
||||
name: name,
|
||||
bob: bob.NewDB(db),
|
||||
ctx: ctx,
|
||||
cancelCtx: cancelCtx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get returns a database record.
|
||||
func (db *SQLite) Get(key string) (record.Record, error) {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
// Get record from database.
|
||||
r, err := models.FindRecord(db.ctx, db.bob, key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %s", storage.ErrNotFound, err)
|
||||
}
|
||||
|
||||
// Return data in wrapper.
|
||||
return record.NewWrapperFromDatabase(db.name, key, getMeta(r), uint8(r.Format), r.Value)
|
||||
}
|
||||
|
||||
// GetMeta returns the metadata of a database record.
|
||||
func (db *SQLite) GetMeta(key string) (*record.Meta, error) {
|
||||
r, err := db.Get(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Meta(), nil
|
||||
}
|
||||
|
||||
// Put stores a record in the database.
|
||||
func (db *SQLite) Put(r record.Record) (record.Record, error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
// Serialize to JSON.
|
||||
data, err := r.MarshalDataOnly(r, dsd.JSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create structure for insert.
|
||||
m := r.Meta()
|
||||
setter := models.RecordSetter{
|
||||
Key: omit.From(r.DatabaseKey()),
|
||||
Format: omit.From(int16(dsd.JSON)),
|
||||
Value: omit.From(data),
|
||||
Created: omit.From(m.Created),
|
||||
Modified: omit.From(m.Modified),
|
||||
Expires: omit.From(m.Expires),
|
||||
Deleted: omit.From(m.Deleted),
|
||||
Secret: omit.From(m.IsSecret()),
|
||||
Crownjewel: omit.From(m.IsCrownJewel()),
|
||||
}
|
||||
|
||||
// Lock for writing.
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
// Simulate upsert with custom selection on conflict.
|
||||
_, err = models.Records.Insert(
|
||||
&setter,
|
||||
im.OnConflict("key").DoUpdate(
|
||||
im.SetExcluded("format", "value", "created", "modified", "expires", "deleted", "secret", "crownjewel"),
|
||||
),
|
||||
).Exec(db.ctx, db.bob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (db *SQLite) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
// we could lock for every record, but we want to have the same behaviour
|
||||
// as the other storage backends, especially for testing.
|
||||
|
||||
batch := make(chan record.Record, 100)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
// start handler
|
||||
go func() {
|
||||
for r := range batch {
|
||||
_, err := db.Put(r)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
errs <- nil
|
||||
}()
|
||||
|
||||
return batch, errs
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
func (db *SQLite) Delete(key string) error {
|
||||
// Lock for writing.
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
toDelete := &models.Record{Key: key}
|
||||
return toDelete.Delete(db.ctx, db.bob)
|
||||
}
|
||||
|
||||
// Query returns a an iterator for the supplied query.
|
||||
func (db *SQLite) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
||||
_, err := q.Check()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid query: %w", err)
|
||||
}
|
||||
|
||||
queryIter := iterator.New()
|
||||
|
||||
go db.queryExecutor(queryIter, q, local, internal)
|
||||
return queryIter, nil
|
||||
}
|
||||
|
||||
func (db *SQLite) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) {
|
||||
// Build query.
|
||||
var recordQuery *sqlite.ViewQuery[*models.Record, models.RecordSlice]
|
||||
if q.DatabaseKeyPrefix() != "" {
|
||||
recordQuery = models.Records.View.Query(
|
||||
models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix() + "%"),
|
||||
)
|
||||
} else {
|
||||
recordQuery = models.Records.View.Query()
|
||||
}
|
||||
|
||||
// Get all records from query.
|
||||
// TODO: This will load all records into memory. While this is efficient and
|
||||
// will not block others from using the datbase, this might be quite a strain
|
||||
// on the system memory. Monitor and see if this is an issue.
|
||||
db.lock.RLock()
|
||||
records, err := models.RecordsQuery.All(recordQuery, db.ctx, db.bob)
|
||||
db.lock.RUnlock()
|
||||
if err != nil {
|
||||
queryIter.Finish(err)
|
||||
return
|
||||
}
|
||||
|
||||
recordsLoop:
|
||||
for _, r := range records {
|
||||
// Check if key matches.
|
||||
if !q.MatchesKey(r.Key) {
|
||||
continue recordsLoop
|
||||
}
|
||||
|
||||
// Check Meta.
|
||||
m := getMeta(r)
|
||||
if !m.CheckValidity() ||
|
||||
!m.CheckPermission(local, internal) {
|
||||
continue recordsLoop
|
||||
}
|
||||
|
||||
// Check Data.
|
||||
if q.HasWhereCondition() {
|
||||
jsonData := string(r.Value)
|
||||
jsonAccess := accessor.NewJSONAccessor(&jsonData)
|
||||
if !q.MatchesAccessor(jsonAccess) {
|
||||
continue recordsLoop
|
||||
}
|
||||
}
|
||||
|
||||
// Build database record.
|
||||
matched, _ := record.NewWrapperFromDatabase(db.name, r.Key, m, uint8(r.Format), r.Value)
|
||||
|
||||
select {
|
||||
case <-queryIter.Done:
|
||||
break recordsLoop
|
||||
case queryIter.Next <- matched:
|
||||
default:
|
||||
select {
|
||||
case <-queryIter.Done:
|
||||
break recordsLoop
|
||||
case queryIter.Next <- matched:
|
||||
case <-time.After(1 * time.Second):
|
||||
err = errors.New("query timeout")
|
||||
break recordsLoop
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
queryIter.Finish(err)
|
||||
}
|
||||
|
||||
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.
|
||||
func (db *SQLite) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) {
|
||||
// Optimize for local and internal queries without where clause.
|
||||
if local && internal && !shadowDelete && !q.HasWhereCondition() {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
// First count entries (SQLite does not support affected rows)
|
||||
n, err := models.Records.Query(
|
||||
models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"),
|
||||
).Count(db.ctx, db.bob)
|
||||
if err != nil || n == 0 {
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// Delete entries.
|
||||
_, err = models.Records.Delete(
|
||||
models.DeleteWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"),
|
||||
).Exec(db.ctx, db.bob)
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// Otherwise, iterate over all entries and delete matching ones.
|
||||
|
||||
// Create iterator to check all matching records.
|
||||
queryIter := iterator.New()
|
||||
defer queryIter.Cancel()
|
||||
go db.queryExecutor(queryIter, q, local, internal)
|
||||
|
||||
// Delete all matching records.
|
||||
var deleted int
|
||||
for r := range queryIter.Next {
|
||||
db.Delete(r.DatabaseKey())
|
||||
deleted++
|
||||
}
|
||||
|
||||
return deleted, nil
|
||||
}
|
||||
|
||||
// ReadOnly returns whether the database is read only.
|
||||
func (db *SQLite) ReadOnly() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Injected returns whether the database is injected.
|
||||
func (db *SQLite) Injected() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// MaintainRecordStates maintains records states in the database.
|
||||
func (db *SQLite) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
now := time.Now().Unix()
|
||||
purgeThreshold := purgeDeletedBefore.Unix()
|
||||
|
||||
// Option 1: Using shadow delete.
|
||||
if shadowDelete {
|
||||
// Mark expired records as deleted.
|
||||
models.Records.Update(
|
||||
um.SetCol("deleted").ToArg(now),
|
||||
models.UpdateWhere.Records.Deleted.EQ(0),
|
||||
models.UpdateWhere.Records.Expires.GT(0),
|
||||
models.UpdateWhere.Records.Expires.LT(now),
|
||||
).Exec(db.ctx, db.bob)
|
||||
|
||||
// Purge deleted records before threshold.
|
||||
models.Records.Delete(
|
||||
models.DeleteWhere.Records.Deleted.GT(0),
|
||||
models.DeleteWhere.Records.Deleted.LT(purgeThreshold),
|
||||
).Exec(db.ctx, db.bob)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Option 2: Immediate delete.
|
||||
|
||||
// Delete expired record.
|
||||
models.Records.Delete(
|
||||
models.DeleteWhere.Records.Expires.GT(0),
|
||||
models.DeleteWhere.Records.Expires.LT(now),
|
||||
).Exec(db.ctx, db.bob)
|
||||
// Delete shadow deleted records.
|
||||
models.Records.Delete(
|
||||
models.DeleteWhere.Records.Deleted.GT(0),
|
||||
).Exec(db.ctx, db.bob)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *SQLite) Maintain(ctx context.Context) error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
// Remove up to about 100KB of SQLite pages from the freelist on every run.
|
||||
// (Assuming 4KB page size.)
|
||||
_, err := db.db.ExecContext(ctx, "PRAGMA incremental_vacuum(25);")
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *SQLite) MaintainThorough(ctx context.Context) error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
|
||||
// Remove all pages from the freelist.
|
||||
_, err := db.db.ExecContext(ctx, "PRAGMA incremental_vacuum;")
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown shuts down the database.
|
||||
func (db *SQLite) Shutdown() error {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
db.cancelCtx()
|
||||
|
||||
return db.bob.Close()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue