diff --git a/base/database/controller.go b/base/database/controller.go index 4d95c01e..5d23c1c8 100644 --- a/base/database/controller.go +++ b/base/database/controller.go @@ -264,6 +264,20 @@ func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal return 0, ErrNotImplemented } +// PurgeOlderThan deletes all records last updated before the given time. +// It returns the number of successful deletes and an error. +func (c *Controller) PurgeOlderThan(ctx context.Context, prefix string, purgeBefore time.Time, local, internal bool) (int, error) { + if shuttingDown.IsSet() { + return 0, ErrShuttingDown + } + + if purger, ok := c.storage.(storage.PurgeOlderThan); ok { + return purger.PurgeOlderThan(ctx, prefix, purgeBefore, local, internal, c.shadowDelete) + } + + return 0, ErrNotImplemented +} + // Shutdown shuts down the storage. func (c *Controller) Shutdown() error { return c.storage.Shutdown() diff --git a/base/database/interface.go b/base/database/interface.go index ce9b8a97..04cd46ad 100644 --- a/base/database/interface.go +++ b/base/database/interface.go @@ -562,6 +562,27 @@ func (i *Interface) Purge(ctx context.Context, q *query.Query) (int, error) { return db.Purge(ctx, q, i.options.Local, i.options.Internal) } +// PurgeOlderThan deletes all records last updated before the given time. +// It returns the number of successful deletes and an error. +func (i *Interface) PurgeOlderThan(ctx context.Context, prefix string, purgeBefore time.Time) (int, error) { + dbName, dbKeyPrefix := record.ParseKey(prefix) + if dbName == "" { + return 0, errors.New("unknown database") + } + + db, err := getController(dbName) + if err != nil { + return 0, err + } + + // Check if database is read only before we add to the cache. + if db.ReadOnly() { + return 0, ErrReadOnly + } + + return db.PurgeOlderThan(ctx, dbKeyPrefix, purgeBefore, i.options.Local, i.options.Internal) +} + // Subscribe subscribes to updates matching the given query. func (i *Interface) Subscribe(q *query.Query) (*Subscription, error) { _, err := q.Check() diff --git a/base/database/storage/interface.go b/base/database/storage/interface.go index c329a0a6..7bdd84f8 100644 --- a/base/database/storage/interface.go +++ b/base/database/storage/interface.go @@ -46,3 +46,8 @@ type Batcher interface { type Purger interface { Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) } + +// PurgeOlderThan defines the database storage API for backends that support the PurgeOlderThan operation. +type PurgeOlderThan interface { + PurgeOlderThan(ctx context.Context, prefix string, purgeBefore time.Time, local, internal, shadowDelete bool) (int, error) +} diff --git a/base/database/storage/sqlite/sqlite.go b/base/database/storage/sqlite/sqlite.go index 454870ad..2cc15cb9 100644 --- a/base/database/storage/sqlite/sqlite.go +++ b/base/database/storage/sqlite/sqlite.go @@ -401,24 +401,62 @@ func (db *SQLite) Purge(ctx context.Context, q *query.Query, local, internal, sh } // Otherwise, iterate over all entries and delete matching ones. + + // TODO: Non-local, non-internal or content matching queries are not supported at the moment. return 0, storage.ErrNotImplemented +} - // Create iterator to check all matching records. +// PurgeOlderThan deletes all records last updated before the given time. It returns the number of successful deletes and an error. +func (db *SQLite) PurgeOlderThan(ctx context.Context, prefix string, purgeBefore time.Time, local, internal, shadowDelete bool) (int, error) { + db.wg.Add(1) + defer db.wg.Done() - // TODO: This is untested and also needs handling of shadowDelete. - // For now: Use only without where condition and with a local and internal db interface. - // queryIter := iterator.New() - // defer queryIter.Cancel() - // go db.queryExecutor(queryIter, q, local, internal) + purgeBeforeInt := purgeBefore.Unix() - // // Delete all matching records. - // var deleted int - // for r := range queryIter.Next { - // db.Delete(r.DatabaseKey()) - // deleted++ - // } + // Optimize for local and internal queries without where clause and without shadow delete. + if local && internal && !shadowDelete { + // First count entries (SQLite does not support affected rows) + n, err := models.Records.Query( + models.SelectWhere.Records.Key.Like(prefix+"%"), + models.SelectWhere.Records.Modified.LT(purgeBeforeInt), + ).Count(db.ctx, db.bob) + if err != nil || n == 0 { + return int(n), err + } - // return deleted, nil + // Delete entries. + _, err = models.Records.Delete( + models.DeleteWhere.Records.Key.Like(prefix+"%"), + models.DeleteWhere.Records.Modified.LT(purgeBeforeInt), + ).Exec(db.ctx, db.bob) + return int(n), err + } + + // Optimize for local and internal queries without where clause, but with shadow delete. + if local && internal && shadowDelete { + // First count entries (SQLite does not support affected rows) + n, err := models.Records.Query( + models.SelectWhere.Records.Key.Like(prefix+"%"), + models.SelectWhere.Records.Modified.LT(purgeBeforeInt), + ).Count(db.ctx, db.bob) + if err != nil || n == 0 { + return int(n), err + } + + // Mark purged records as deleted. + now := time.Now().Unix() + _, err = models.Records.Update( + um.SetCol("format").ToArg(nil), + um.SetCol("value").ToArg(nil), + um.SetCol("deleted").ToArg(now), + models.UpdateWhere.Records.Key.Like(prefix+"%"), + models.UpdateWhere.Records.Modified.LT(purgeBeforeInt), + ).Exec(db.ctx, db.bob) + return int(n), err + } + + // TODO: Non-local or non-internal queries are not supported at the moment. + return 0, storage.ErrNotImplemented } // ReadOnly returns whether the database is read only. diff --git a/base/database/storage/sqlite/sqlite_test.go b/base/database/storage/sqlite/sqlite_test.go index 799a230b..6586c8cc 100644 --- a/base/database/storage/sqlite/sqlite_test.go +++ b/base/database/storage/sqlite/sqlite_test.go @@ -98,17 +98,24 @@ func TestSQLite(t *testing.T) { qA := &TestRecord{} qA.SetKey("test:path/to/A") qA.UpdateMeta() + qB := &TestRecord{} qB.SetKey("test:path/to/B") qB.UpdateMeta() + // Set creation/modification in the past. + qB.Meta().Created = time.Now().Add(-time.Hour).Unix() + qB.Meta().Modified = time.Now().Add(-time.Hour).Unix() + qC := &TestRecord{} qC.SetKey("test:path/to/C") qC.UpdateMeta() // Set expiry in the past. qC.Meta().Expires = time.Now().Add(-time.Hour).Unix() + qZ := &TestRecord{} qZ.SetKey("test:z") qZ.UpdateMeta() + put, errs := db.PutMany(false) put <- qA put <- qB @@ -150,6 +157,15 @@ func TestSQLite(t *testing.T) { t.Fatal("should fail") } + // purge older than + n, err := db.PurgeOlderThan(t.Context(), "path/to/", time.Now().Add(-30*time.Minute), true, true, false) + if err != nil { + t.Fatal(err) + } + if n != 1 { + t.Fatalf("unexpected purge older than delete count: %d", n) + } + // maintenance err = db.MaintainRecordStates(t.Context(), time.Now().Add(-time.Minute), true) if err != nil { @@ -162,12 +178,12 @@ func TestSQLite(t *testing.T) { t.Fatal(err) } - // purging - n, err := db.Purge(t.Context(), query.New("test:path/to/").MustBeValid(), true, true, true) + // purge + n, err = db.Purge(t.Context(), query.New("test:path/to/").MustBeValid(), true, true, true) if err != nil { t.Fatal(err) } - if n != 2 { + if n != 1 { t.Fatalf("unexpected purge delete count: %d", n) } diff --git a/service/intel/filterlists/updater.go b/service/intel/filterlists/updater.go index 72f7b82e..4ffbec52 100644 --- a/service/intel/filterlists/updater.go +++ b/service/intel/filterlists/updater.go @@ -12,6 +12,7 @@ import ( "github.com/safing/portmaster/base/database" "github.com/safing/portmaster/base/database/query" + "github.com/safing/portmaster/base/database/storage" "github.com/safing/portmaster/base/log" "github.com/safing/portmaster/base/updater" "github.com/safing/portmaster/service/mgr" @@ -158,9 +159,25 @@ func performUpdate(ctx context.Context) error { func removeAllObsoleteFilterEntries(wc *mgr.WorkerCtx) error { log.Debugf("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") - n, err := cache.Purge(wc.Ctx(), query.New(filterListKeyPrefix).Where( - // TODO(ppacher): remember the timestamp we started the last update - // and use that rather than "one hour ago" + + // TODO: Remember the timestamp we started the last update and use that rather than "one hour ago". + + // First try to purge with PurgeOlderThan. + n, err := cache.PurgeOlderThan(wc.Ctx(), filterListKeyPrefix, time.Now().Add(-time.Hour)) + switch { + case err == nil: + // Success! + log.Debugf("intel/filterlists: successfully removed %d obsolete entries", n) + return nil + case errors.Is(err, database.ErrNotImplemented) || errors.Is(err, storage.ErrNotImplemented): + // Try next method. + default: + // Return error. + return err + } + + // Try with regular purge. + n, err = cache.Purge(wc.Ctx(), query.New(filterListKeyPrefix).Where( query.Where("UpdatedAt", query.LessThan, time.Now().Add(-time.Hour).Unix()), )) if err != nil {