Move DB record maintenance to storage interface

This commit is contained in:
Daniel 2020-05-05 21:34:19 +02:00
parent 4eb21405cc
commit bea130d755
16 changed files with 385 additions and 224 deletions

View file

@ -16,7 +16,10 @@ type Example struct {
}
var (
exampleDB = NewInterface(nil)
exampleDB = NewInterface(&Options{
Internal: true,
Local: true,
})
)
// GetExample gets an Example from the database.

View file

@ -1,8 +1,10 @@
package database
import (
"context"
"errors"
"sync"
"time"
"github.com/tevino/abool"
@ -226,7 +228,7 @@ func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) {
}
// Maintain runs the Maintain method on the storage.
func (c *Controller) Maintain() error {
func (c *Controller) Maintain(ctx context.Context) error {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
@ -234,11 +236,11 @@ func (c *Controller) Maintain() error {
return nil
}
return c.storage.Maintain()
return c.storage.Maintain(ctx)
}
// MaintainThorough runs the MaintainThorough method on the storage.
func (c *Controller) MaintainThorough() error {
func (c *Controller) MaintainThorough(ctx context.Context) error {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
@ -246,7 +248,19 @@ func (c *Controller) MaintainThorough() error {
return nil
}
return c.storage.MaintainThorough()
return c.storage.MaintainThorough(ctx)
}
// MaintainRecordStates runs the record state lifecycle maintenance on the storage.
func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
if shuttingDown.IsSet() {
return nil
}
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore)
}
// Shutdown shuts down the storage.

View file

@ -11,9 +11,11 @@ import (
"testing"
"time"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
q "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
_ "github.com/safing/portbase/database/storage/badger"
_ "github.com/safing/portbase/database/storage/bbolt"
_ "github.com/safing/portbase/database/storage/fstree"
@ -24,117 +26,195 @@ func makeKey(dbName, key string) string {
return fmt.Sprintf("%s:%s", dbName, key)
}
func testDatabase(t *testing.T, storageType string) {
dbName := fmt.Sprintf("testing-%s", storageType)
_, err := Register(&Database{
Name: dbName,
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
StorageType: storageType,
PrimaryAPI: "",
})
if err != nil {
t.Fatal(err)
}
func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaintenance bool) { //nolint:gocognit,gocyclo
t.Run(fmt.Sprintf("TestStorage_%s", storageType), func(t *testing.T) {
dbName := fmt.Sprintf("testing-%s", storageType)
fmt.Println(dbName)
_, err := Register(&Database{
Name: dbName,
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
StorageType: storageType,
PrimaryAPI: "",
})
if err != nil {
t.Fatal(err)
}
dbController, err := getController(dbName)
if err != nil {
t.Fatal(err)
}
// hook
hook, err := RegisterHook(q.New(dbName).MustBeValid(), &HookBase{})
if err != nil {
t.Fatal(err)
}
// hook
hook, err := RegisterHook(q.New(dbName).MustBeValid(), &HookBase{})
if err != nil {
t.Fatal(err)
}
// interface
db := NewInterface(&Options{
Local: true,
Internal: true,
})
// interface
db := NewInterface(&Options{
Local: true,
Internal: true,
})
// sub
sub, err := db.Subscribe(q.New(dbName).MustBeValid())
if err != nil {
t.Fatal(err)
}
// sub
sub, err := db.Subscribe(q.New(dbName).MustBeValid())
if err != nil {
t.Fatal(err)
}
A := NewExample(makeKey(dbName, "A"), "Herbert", 411)
err = A.Save()
if err != nil {
t.Fatal(err)
}
A := NewExample(dbName+":A", "Herbert", 411)
err = A.Save()
if err != nil {
t.Fatal(err)
}
B := NewExample(makeKey(dbName, "B"), "Fritz", 347)
err = B.Save()
if err != nil {
t.Fatal(err)
}
B := NewExample(makeKey(dbName, "B"), "Fritz", 347)
err = B.Save()
if err != nil {
t.Fatal(err)
}
C := NewExample(makeKey(dbName, "C"), "Norbert", 217)
err = C.Save()
if err != nil {
t.Fatal(err)
}
C := NewExample(makeKey(dbName, "C"), "Norbert", 217)
err = C.Save()
if err != nil {
t.Fatal(err)
}
exists, err := db.Exists(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("record %s should exist!", makeKey(dbName, "A"))
}
exists, err := db.Exists(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("record %s should exist!", makeKey(dbName, "A"))
}
A1, err := GetExample(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(A, A1) {
log.Fatalf("A and A1 mismatch, A1: %v", A1)
}
A1, err := GetExample(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(A, A1) {
log.Fatalf("A and A1 mismatch, A1: %v", A1)
}
query, err := q.New(dbName).Where(
q.And(
q.Where("Name", q.EndsWith, "bert"),
q.Where("Score", q.GreaterThan, 100),
),
).Check()
if err != nil {
t.Fatal(err)
}
cnt := countRecords(t, db, q.New(dbName).Where(
q.And(
q.Where("Name", q.EndsWith, "bert"),
q.Where("Score", q.GreaterThan, 100),
),
))
if cnt != 2 {
t.Fatalf("expected two records, got %d", cnt)
}
it, err := db.Query(query)
if err != nil {
t.Fatal(err)
}
// test putmany
if testPutMany {
batchPut := db.PutMany(dbName)
records := []record.Record{A, B, C, nil} // nil is to signify finish
for _, r := range records {
err = batchPut(r)
if err != nil {
t.Fatal(err)
}
}
}
cnt := 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
if cnt != 2 {
t.Fatalf("expected two records, got %d", cnt)
}
// test maintenance
if testRecordMaintenance {
now := time.Now().UTC()
nowUnix := now.Unix()
switch storageType {
case "bbolt", "hashmap":
batchPut := db.PutMany(dbName)
records := []record.Record{A, B, C, nil} // nil is to signify finish
for _, r := range records {
err = batchPut(r)
// we start with 3 records without expiry
cnt := countRecords(t, db, q.New(dbName))
if cnt != 3 {
t.Fatalf("expected three records, got %d", cnt)
}
// delete entry
A.Meta().Deleted = nowUnix - 61
err = A.Save()
if err != nil {
t.Fatal(err)
}
// expire entry
B.Meta().Expires = nowUnix - 1
err = B.Save()
if err != nil {
t.Fatal(err)
}
// one left
cnt = countRecords(t, db, q.New(dbName))
if cnt != 1 {
t.Fatalf("expected one record, got %d", cnt)
}
// run maintenance
err = dbController.MaintainRecordStates(context.TODO(), now.Add(-60*time.Second))
if err != nil {
t.Fatal(err)
}
// one left
cnt = countRecords(t, db, q.New(dbName))
if cnt != 1 {
t.Fatalf("expected one record, got %d", cnt)
}
// check status individually
_, err = dbController.storage.Get("A")
if err != storage.ErrNotFound {
t.Errorf("A should be deleted and purged, err=%s", err)
}
B1, err := dbController.storage.Get("B")
if err != nil {
t.Fatalf("should exist: %s, original meta: %+v", err, B.Meta())
}
if B1.Meta().Deleted == 0 {
t.Errorf("B should be deleted")
}
// delete last entry
C.Meta().Deleted = nowUnix - 1
err = C.Save()
if err != nil {
t.Fatal(err)
}
// run maintenance
err = dbController.MaintainRecordStates(context.TODO(), now)
if err != nil {
t.Fatal(err)
}
// check status individually
B2, err := dbController.storage.Get("B")
if err == nil {
t.Errorf("B should be deleted and purged, meta: %+v", B2.Meta())
} else if err != storage.ErrNotFound {
t.Errorf("B should be deleted and purged, err=%s", err)
}
C2, err := dbController.storage.Get("C")
if err == nil {
t.Errorf("C should be deleted and purged, meta: %+v", C2.Meta())
} else if err != storage.ErrNotFound {
t.Errorf("C should be deleted and purged, err=%s", err)
}
// none left
cnt = countRecords(t, db, q.New(dbName))
if cnt != 0 {
t.Fatalf("expected no records, got %d", cnt)
}
}
}
err = hook.Cancel()
if err != nil {
t.Fatal(err)
}
err = sub.Cancel()
if err != nil {
t.Fatal(err)
}
err = hook.Cancel()
if err != nil {
t.Fatal(err)
}
err = sub.Cancel()
if err != nil {
t.Fatal(err)
}
})
}
func TestDatabaseSystem(t *testing.T) {
@ -158,22 +238,22 @@ func TestDatabaseSystem(t *testing.T) {
}
defer os.RemoveAll(testDir) // clean up
testDatabase(t, "badger")
testDatabase(t, "bbolt")
testDatabase(t, "fstree")
testDatabase(t, "hashmap")
testDatabase(t, "bbolt", true, true)
testDatabase(t, "hashmap", true, true)
testDatabase(t, "fstree", false, false)
testDatabase(t, "badger", false, false)
err = MaintainRecordStates(context.TODO())
if err != nil {
t.Fatal(err)
}
err = Maintain()
err = Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = MaintainThorough()
err = MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}
@ -182,5 +262,25 @@ func TestDatabaseSystem(t *testing.T) {
if err != nil {
t.Fatal(err)
}
}
func countRecords(t *testing.T, db *Interface, query *q.Query) int {
_, err := query.Check()
if err != nil {
t.Fatal(err)
}
it, err := db.Query(query)
if err != nil {
t.Fatal(err)
}
cnt := 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
return cnt
}

View file

@ -17,12 +17,12 @@ func startMaintenanceTasks() {
func maintainBasic(ctx context.Context, task *modules.Task) error {
log.Infof("database: running Maintain")
return database.Maintain()
return database.Maintain(ctx)
}
func maintainThorough(ctx context.Context, task *modules.Task) error {
log.Infof("database: running MaintainThorough")
return database.MaintainThorough()
return database.MaintainThorough(ctx)
}
func maintainRecords(ctx context.Context, task *modules.Task) error {

View file

@ -181,7 +181,7 @@ func (i *Interface) Put(r record.Record) (err error) {
return err
}
} else {
db, err = getController(r.DatabaseKey())
db, err = getController(r.DatabaseName())
if err != nil {
return err
}

View file

@ -3,20 +3,15 @@ package database
import (
"context"
"time"
"github.com/tevino/abool"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
)
// Maintain runs the Maintain method on all storages.
func Maintain() (err error) {
func Maintain(ctx context.Context) (err error) {
// copy, as we might use the very long
all := duplicateControllers()
for _, c := range all {
err = c.Maintain()
err = c.Maintain(ctx)
if err != nil {
return
}
@ -25,12 +20,12 @@ func Maintain() (err error) {
}
// MaintainThorough runs the MaintainThorough method on all storages.
func MaintainThorough() (err error) {
func MaintainThorough(ctx context.Context) (err error) {
// copy, as we might use the very long
all := duplicateControllers()
for _, c := range all {
err = c.MaintainThorough()
err = c.MaintainThorough(ctx)
if err != nil {
return
}
@ -39,100 +34,21 @@ func MaintainThorough() (err error) {
}
// MaintainRecordStates runs record state lifecycle maintenance on all storages.
func MaintainRecordStates(ctx context.Context) error { //nolint:gocognit
// TODO: Put this in the storage interface to correctly maintain on all storages.
// Storages might check for deletion and expiry in the query interface and not return anything here.
// listen for ctx cancel
stop := abool.New()
doneCh := make(chan struct{}) // for goroutine cleanup
defer close(doneCh)
go func() {
select {
case <-ctx.Done():
case <-doneCh:
}
stop.Set()
}()
func MaintainRecordStates(ctx context.Context) (err error) {
// delete immediately for now
// TODO: increase purge threshold when starting to sync DBs
purgeDeletedBefore := time.Now().UTC()
// copy, as we might use the very long
all := duplicateControllers()
now := time.Now().Unix()
thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour).Unix()
for _, c := range all {
if stop.IsSet() {
return nil
}
if c.ReadOnly() || c.Injected() {
continue
}
q, err := query.New("").Check()
err = c.MaintainRecordStates(ctx, purgeDeletedBefore)
if err != nil {
return err
return
}
it, err := c.Query(q, true, true)
if err != nil {
return err
}
var toDelete []record.Record
var toExpire []record.Record
queryLoop:
for {
select {
case r := <-it.Next:
if r == nil {
break queryLoop
}
meta := r.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < thirtyDaysAgo:
toDelete = append(toDelete, r)
case meta.Expires > 0 && meta.Expires < now:
toExpire = append(toExpire, r)
}
case <-ctx.Done():
it.Cancel()
break queryLoop
}
}
if it.Err() != nil {
return err
}
if stop.IsSet() {
return nil
}
for _, r := range toDelete {
err := c.storage.Delete(r.DatabaseKey())
if err != nil {
return err
}
if stop.IsSet() {
return nil
}
}
for _, r := range toExpire {
r.Meta().Delete()
err := c.Put(r)
if err != nil {
return err
}
if stop.IsSet() {
return nil
}
}
}
return nil
return
}
func duplicateControllers() (all []*Controller) {

View file

@ -1,6 +1,7 @@
package badger
import (
"context"
"errors"
"fmt"
"time"
@ -193,19 +194,25 @@ func (b *Badger) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (b *Badger) Maintain() error {
func (b *Badger) Maintain(_ context.Context) error {
_ = b.db.RunValueLogGC(0.7)
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (b *Badger) MaintainThorough() (err error) {
func (b *Badger) MaintainThorough(_ context.Context) (err error) {
for err == nil {
err = b.db.RunValueLogGC(0.7)
}
return nil
}
// MaintainRecordStates maintains records states in the database.
func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
// TODO: implement MaintainRecordStates
return nil
}
// Shutdown shuts down the database.
func (b *Badger) Shutdown() error {
return b.db.Close()

View file

@ -2,6 +2,7 @@
package badger
import (
"context"
"io/ioutil"
"os"
"reflect"
@ -116,11 +117,11 @@ func TestBadger(t *testing.T) {
}
// maintenance
err = db.Maintain()
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough()
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}

View file

@ -2,6 +2,7 @@ package bbolt
import (
"bytes"
"context"
"errors"
"fmt"
"path/filepath"
@ -235,15 +236,69 @@ func (b *BBolt) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (b *BBolt) Maintain() error {
func (b *BBolt) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (b *BBolt) MaintainThorough() (err error) {
func (b *BBolt) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix()
return b.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
// Create a cursor for iteration.
c := bucket.Cursor()
for key, value := c.First(); key != nil; key, value = c.Next() {
// wrap value
wrapper, err := record.NewRawWrapper(b.name, string(key), value)
if err != nil {
return err
}
// check if we need to do maintenance
meta := wrapper.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
// delete from storage
err = c.Delete()
if err != nil {
return err
}
case meta.Expires > 0 && meta.Expires < now:
// mark as deleted
meta.Deleted = meta.Expires
deleted, err := wrapper.MarshalRecord(wrapper)
if err != nil {
return err
}
err = bucket.Put(key, deleted)
if err != nil {
return err
}
// reposition cursor
c.Seek(key)
}
time.Sleep(100 * time.Millisecond)
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
}
return nil
})
}
// Shutdown shuts down the database.
func (b *BBolt) Shutdown() error {
return b.db.Close()

View file

@ -2,11 +2,13 @@
package bbolt
import (
"context"
"io/ioutil"
"os"
"reflect"
"sync"
"testing"
"time"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
@ -144,11 +146,15 @@ func TestBBolt(t *testing.T) {
}
// maintenance
err = db.Maintain()
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough()
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainRecordStates(context.TODO(), time.Now())
if err != nil {
t.Fatal(err)
}

View file

@ -5,6 +5,7 @@ It is primarily meant for easy testing or storing big files that can easily be a
package fstree
import (
"context"
"errors"
"fmt"
"io/ioutil"
@ -255,12 +256,18 @@ func (fst *FSTree) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (fst *FSTree) Maintain() error {
func (fst *FSTree) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (fst *FSTree) MaintainThorough() error {
func (fst *FSTree) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
// TODO: implement MaintainRecordStates
return nil
}

View file

@ -1,6 +1,7 @@
package hashmap
import (
"context"
"errors"
"fmt"
"sync"
@ -146,12 +147,44 @@ func (hm *HashMap) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (hm *HashMap) Maintain() error {
func (hm *HashMap) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (hm *HashMap) MaintainThorough() (err error) {
func (hm *HashMap) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix()
for key, record := range hm.db {
meta := record.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
// delete from storage
delete(hm.db, key)
case meta.Expires > 0 && meta.Expires < now:
// mark as deleted
record.Lock()
meta.Deleted = meta.Expires
record.Unlock()
}
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
}
return nil
}

View file

@ -2,6 +2,7 @@
package hashmap
import (
"context"
"reflect"
"sync"
"testing"
@ -130,11 +131,11 @@ func TestHashMap(t *testing.T) {
}
// maintenance
err = db.Maintain()
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough()
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}

View file

@ -1,7 +1,9 @@
package storage
import (
"context"
"errors"
"time"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
@ -54,12 +56,17 @@ func (i *InjectBase) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (i *InjectBase) Maintain() error {
func (i *InjectBase) Maintain(ctx context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (i *InjectBase) MaintainThorough() error {
func (i *InjectBase) MaintainThorough(ctx context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
return nil
}

View file

@ -1,6 +1,9 @@
package storage
import (
"context"
"time"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
@ -15,8 +18,9 @@ type Interface interface {
ReadOnly() bool
Injected() bool
Maintain() error
MaintainThorough() error
Maintain(ctx context.Context) error
MaintainThorough(ctx context.Context) error
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error
Shutdown() error
}

View file

@ -1,7 +1,9 @@
package sinkhole
import (
"context"
"errors"
"time"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
@ -77,12 +79,17 @@ func (s *Sinkhole) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (s *Sinkhole) Maintain() error {
func (s *Sinkhole) Maintain(ctx context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (s *Sinkhole) MaintainThorough() (err error) {
func (s *Sinkhole) MaintainThorough(ctx context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
return nil
}