diff --git a/database/controller.go b/database/controller.go index fd6ddbc..902d7fc 100644 --- a/database/controller.go +++ b/database/controller.go @@ -244,7 +244,7 @@ func (c *Controller) Maintain(ctx context.Context) error { defer c.writeLock.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } if maintainer, ok := c.storage.(storage.Maintainer); ok { @@ -259,7 +259,7 @@ func (c *Controller) MaintainThorough(ctx context.Context) error { defer c.writeLock.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } if maintainer, ok := c.storage.(storage.Maintainer); ok { @@ -274,12 +274,26 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor defer c.writeLock.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) } +func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal bool) (int, error) { + c.writeLock.RLock() + defer c.writeLock.RUnlock() + + if shuttingDown.IsSet() { + return 0, ErrShuttingDown + } + + if purger, ok := c.storage.(storage.Purger); ok { + return purger.Purge(ctx, q, local, internal, c.shadowDelete) + } + return 0, ErrNotImplemented +} + // Shutdown shuts down the storage. func (c *Controller) Shutdown() error { // acquire full locks diff --git a/database/interface.go b/database/interface.go index 7e15767..02bae88 100644 --- a/database/interface.go +++ b/database/interface.go @@ -1,6 +1,7 @@ package database import ( + "context" "errors" "fmt" "time" @@ -400,6 +401,22 @@ func (i *Interface) Query(q *query.Query) (*iterator.Iterator, error) { return db.Query(q, i.options.Local, i.options.Internal) } +// Purge deletes all records that match the given query. It returns the number +// of successful deletes and an error. +func (i *Interface) Purge(ctx context.Context, q *query.Query) (int, error) { + _, err := q.Check() + if err != nil { + return 0, err + } + + db, err := getController(q.DatabaseName()) + if err != nil { + return 0, err + } + + return db.Purge(ctx, q, i.options.Local, i.options.Internal) +} + // Subscribe subscribes to updates matching the given query. func (i *Interface) Subscribe(q *query.Query) (*Subscription, error) { _, err := q.Check() diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 06e4d8f..ca88864 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -297,6 +297,93 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim }) } +// Purge deletes all records that match the given query. It returns the number of successful deletes and an error. +func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) { + prefix := []byte(q.DatabaseKeyPrefix()) + + var cnt int + var done bool + for !done { + err := b.db.Update(func(tx *bbolt.Tx) error { + // Create a cursor for iteration. + bucket := tx.Bucket(bucketName) + c := bucket.Cursor() + for key, value := c.Seek(prefix); key != nil; key, value = c.Next() { + // Check if context has been cancelled. + select { + case <-ctx.Done(): + done = true + return nil + default: + } + + // Check if we still match the key prefix, if not, exit. + if !bytes.HasPrefix(key, prefix) { + done = true + return nil + } + + // Wrap the value in a new wrapper to access the metadata. + wrapper, err := record.NewRawWrapper(b.name, string(key), value) + if err != nil { + return err + } + + // Check if we have permission for this record. + if !wrapper.Meta().CheckPermission(local, internal) { + continue + } + + // Check if record is already deleted. + if wrapper.Meta().IsDeleted() { + continue + } + + // Check if the query matches this record. + if !q.MatchesRecord(wrapper) { + continue + } + + // Delete record. + if shadowDelete { + // Shadow delete. + wrapper.Meta().Delete() + deleted, err := wrapper.MarshalRecord(wrapper) + if err != nil { + return err + } + err = bucket.Put(key, deleted) + if err != nil { + return err + } + + // Reposition the cursor after we have edited the bucket. + c.Seek(key) + } else { + // Immediate delete. + err = c.Delete() + if err != nil { + return err + } + } + + // Work in batches of 1000 changes in order to enable other operations in between. + cnt++ + if cnt%1000 == 0 { + return nil + } + } + done = true + return nil + }) + if err != nil { + return cnt, err + } + } + + return cnt, nil +} + // Shutdown shuts down the database. func (b *BBolt) Shutdown() error { return b.db.Close() diff --git a/database/storage/bbolt/bbolt_test.go b/database/storage/bbolt/bbolt_test.go index 783c67f..7897362 100644 --- a/database/storage/bbolt/bbolt_test.go +++ b/database/storage/bbolt/bbolt_test.go @@ -19,6 +19,7 @@ var ( // Compile time interface checks. _ storage.Interface = &BBolt{} _ storage.Batcher = &BBolt{} + _ storage.Purger = &BBolt{} ) type TestRecord struct { @@ -158,6 +159,37 @@ func TestBBolt(t *testing.T) { t.Fatal(err) } + // purging + purger, ok := db.(storage.Purger) + if ok { + n, err := purger.Purge(context.TODO(), query.New("test:path/to/").MustBeValid(), true, true, false) + if err != nil { + t.Fatal(err) + } + if n != 3 { + t.Fatalf("unexpected purge delete count: %d", n) + } + } else { + t.Fatal("should implement Purger") + } + + // test query + q = query.New("test").MustBeValid() + it, err = db.Query(q, true, true) + if err != nil { + t.Fatal(err) + } + cnt = 0 + for range it.Next { + cnt++ + } + if it.Err() != nil { + t.Fatal(it.Err()) + } + if cnt != 1 { + t.Fatalf("unexpected query result count: %d", cnt) + } + // shutdown err = db.Shutdown() if err != nil { diff --git a/database/storage/interface.go b/database/storage/interface.go index 432ffd3..39cffd6 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -36,3 +36,8 @@ type Maintainer interface { type Batcher interface { PutMany(shadowDelete bool) (batch chan<- record.Record, errs <-chan error) } + +// Purger defines the database storage API for backends that support the purge operation. +type Purger interface { + Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) +}