Add Purge method/interface to database system

Also, implement Purger interface in bbolt storage.
This commit is contained in:
Daniel 2020-09-24 15:02:51 +02:00
parent 362539692e
commit 82af986224
5 changed files with 158 additions and 3 deletions

View file

@ -244,7 +244,7 @@ func (c *Controller) Maintain(ctx context.Context) error {
defer c.writeLock.RUnlock()
if shuttingDown.IsSet() {
return nil
return ErrShuttingDown
}
if maintainer, ok := c.storage.(storage.Maintainer); ok {
@ -259,7 +259,7 @@ func (c *Controller) MaintainThorough(ctx context.Context) error {
defer c.writeLock.RUnlock()
if shuttingDown.IsSet() {
return nil
return ErrShuttingDown
}
if maintainer, ok := c.storage.(storage.Maintainer); ok {
@ -274,12 +274,26 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor
defer c.writeLock.RUnlock()
if shuttingDown.IsSet() {
return nil
return ErrShuttingDown
}
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore)
}
func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal bool) (int, error) {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
if shuttingDown.IsSet() {
return 0, ErrShuttingDown
}
if purger, ok := c.storage.(storage.Purger); ok {
return purger.Purge(ctx, q, local, internal, c.shadowDelete)
}
return 0, ErrNotImplemented
}
// Shutdown shuts down the storage.
func (c *Controller) Shutdown() error {
// acquire full locks

View file

@ -1,6 +1,7 @@
package database
import (
"context"
"errors"
"fmt"
"time"
@ -400,6 +401,22 @@ func (i *Interface) Query(q *query.Query) (*iterator.Iterator, error) {
return db.Query(q, i.options.Local, i.options.Internal)
}
// Purge deletes all records that match the given query. It returns the number
// of successful deletes and an error.
func (i *Interface) Purge(ctx context.Context, q *query.Query) (int, error) {
_, err := q.Check()
if err != nil {
return 0, err
}
db, err := getController(q.DatabaseName())
if err != nil {
return 0, err
}
return db.Purge(ctx, q, i.options.Local, i.options.Internal)
}
// Subscribe subscribes to updates matching the given query.
func (i *Interface) Subscribe(q *query.Query) (*Subscription, error) {
_, err := q.Check()

View file

@ -297,6 +297,93 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim
})
}
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.
func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) {
prefix := []byte(q.DatabaseKeyPrefix())
var cnt int
var done bool
for !done {
err := b.db.Update(func(tx *bbolt.Tx) error {
// Create a cursor for iteration.
bucket := tx.Bucket(bucketName)
c := bucket.Cursor()
for key, value := c.Seek(prefix); key != nil; key, value = c.Next() {
// Check if context has been cancelled.
select {
case <-ctx.Done():
done = true
return nil
default:
}
// Check if we still match the key prefix, if not, exit.
if !bytes.HasPrefix(key, prefix) {
done = true
return nil
}
// Wrap the value in a new wrapper to access the metadata.
wrapper, err := record.NewRawWrapper(b.name, string(key), value)
if err != nil {
return err
}
// Check if we have permission for this record.
if !wrapper.Meta().CheckPermission(local, internal) {
continue
}
// Check if record is already deleted.
if wrapper.Meta().IsDeleted() {
continue
}
// Check if the query matches this record.
if !q.MatchesRecord(wrapper) {
continue
}
// Delete record.
if shadowDelete {
// Shadow delete.
wrapper.Meta().Delete()
deleted, err := wrapper.MarshalRecord(wrapper)
if err != nil {
return err
}
err = bucket.Put(key, deleted)
if err != nil {
return err
}
// Reposition the cursor after we have edited the bucket.
c.Seek(key)
} else {
// Immediate delete.
err = c.Delete()
if err != nil {
return err
}
}
// Work in batches of 1000 changes in order to enable other operations in between.
cnt++
if cnt%1000 == 0 {
return nil
}
}
done = true
return nil
})
if err != nil {
return cnt, err
}
}
return cnt, nil
}
// Shutdown shuts down the database.
func (b *BBolt) Shutdown() error {
return b.db.Close()

View file

@ -19,6 +19,7 @@ var (
// Compile time interface checks.
_ storage.Interface = &BBolt{}
_ storage.Batcher = &BBolt{}
_ storage.Purger = &BBolt{}
)
type TestRecord struct {
@ -158,6 +159,37 @@ func TestBBolt(t *testing.T) {
t.Fatal(err)
}
// purging
purger, ok := db.(storage.Purger)
if ok {
n, err := purger.Purge(context.TODO(), query.New("test:path/to/").MustBeValid(), true, true, false)
if err != nil {
t.Fatal(err)
}
if n != 3 {
t.Fatalf("unexpected purge delete count: %d", n)
}
} else {
t.Fatal("should implement Purger")
}
// test query
q = query.New("test").MustBeValid()
it, err = db.Query(q, true, true)
if err != nil {
t.Fatal(err)
}
cnt = 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
if cnt != 1 {
t.Fatalf("unexpected query result count: %d", cnt)
}
// shutdown
err = db.Shutdown()
if err != nil {

View file

@ -36,3 +36,8 @@ type Maintainer interface {
type Batcher interface {
PutMany(shadowDelete bool) (batch chan<- record.Record, errs <-chan error)
}
// Purger defines the database storage API for backends that support the purge operation.
type Purger interface {
Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error)
}