Merge branch 'develop' into feature/ui-revamp

This commit is contained in:
Patrick Pacher 2020-09-29 09:18:36 +02:00
commit baf6301eb5
No known key found for this signature in database
GPG key ID: E8CD2DA160925A6D
20 changed files with 386 additions and 185 deletions

View file

@ -134,7 +134,6 @@ func registerAsDatabase() error {
Name: "config", Name: "config",
Description: "Configuration Manager", Description: "Configuration Manager",
StorageType: "injected", StorageType: "injected",
PrimaryAPI: "",
}) })
if err != nil { if err != nil {
return err return err

View file

@ -16,7 +16,8 @@ import (
// A Controller takes care of all the extra database logic. // A Controller takes care of all the extra database logic.
type Controller struct { type Controller struct {
storage storage.Interface storage storage.Interface
shadowDelete bool
hooks []*RegisteredHook hooks []*RegisteredHook
subscriptions []*Subscription subscriptions []*Subscription
@ -28,11 +29,12 @@ type Controller struct {
} }
// newController creates a new controller for a storage. // newController creates a new controller for a storage.
func newController(storageInt storage.Interface) *Controller { func newController(storageInt storage.Interface, shadowDelete bool) *Controller {
return &Controller{ return &Controller{
storage: storageInt, storage: storageInt,
migrating: abool.NewBool(false), shadowDelete: shadowDelete,
hibernating: abool.NewBool(false), migrating: abool.NewBool(false),
hibernating: abool.NewBool(false),
} }
} }
@ -117,12 +119,21 @@ func (c *Controller) Put(r record.Record) (err error) {
} }
} }
r, err = c.storage.Put(r) if !c.shadowDelete && r.Meta().IsDeleted() {
if err != nil { // Immediate delete.
return err err = c.storage.Delete(r.DatabaseKey())
} if err != nil {
if r == nil { return err
return errors.New("storage returned nil record after successful put operation") }
} else {
// Put or shadow delete.
r, err = c.storage.Put(r)
if err != nil {
return err
}
if r == nil {
return errors.New("storage returned nil record after successful put operation")
}
} }
// process subscriptions // process subscriptions
@ -156,7 +167,7 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
} }
if batcher, ok := c.storage.(storage.Batcher); ok { if batcher, ok := c.storage.(storage.Batcher); ok {
return batcher.PutMany() return batcher.PutMany(c.shadowDelete)
} }
errs := make(chan error, 1) errs := make(chan error, 1)
@ -230,10 +241,13 @@ func (c *Controller) Maintain(ctx context.Context) error {
defer c.exclusiveAccess.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return nil return ErrShuttingDown
} }
return c.storage.Maintain(ctx) if maintainer, ok := c.storage.(storage.Maintainer); ok {
return maintainer.Maintain(ctx)
}
return nil
} }
// MaintainThorough runs the MaintainThorough method on the storage. // MaintainThorough runs the MaintainThorough method on the storage.
@ -242,10 +256,13 @@ func (c *Controller) MaintainThorough(ctx context.Context) error {
defer c.exclusiveAccess.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return nil return ErrShuttingDown
} }
return c.storage.MaintainThorough(ctx) if maintainer, ok := c.storage.(storage.Maintainer); ok {
return maintainer.MaintainThorough(ctx)
}
return nil
} }
// MaintainRecordStates runs the record state lifecycle maintenance on the storage. // MaintainRecordStates runs the record state lifecycle maintenance on the storage.
@ -254,10 +271,25 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor
defer c.exclusiveAccess.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return nil return ErrShuttingDown
} }
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore, c.shadowDelete)
}
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.
func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal bool) (int, error) {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
if shuttingDown.IsSet() {
return 0, ErrShuttingDown
}
if purger, ok := c.storage.(storage.Purger); ok {
return purger.Purge(ctx, q, local, internal, c.shadowDelete)
}
return 0, ErrNotImplemented
} }
// Shutdown shuts down the storage. // Shutdown shuts down the storage.

View file

@ -51,7 +51,7 @@ func getController(name string) (*Controller, error) {
return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err) return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err)
} }
controller = newController(storageInt) controller = newController(storageInt, registeredDB.ShadowDelete)
controllers[name] = controller controllers[name] = controller
return controller, nil return controller, nil
} }
@ -82,7 +82,7 @@ func InjectDatabase(name string, storageInt storage.Interface) (*Controller, err
return nil, fmt.Errorf(`database not of type "injected"`) return nil, fmt.Errorf(`database not of type "injected"`)
} }
controller := newController(storageInt) controller := newController(storageInt, false)
controllers[name] = controller controllers[name] = controller
return controller, nil return controller, nil
} }

View file

@ -7,13 +7,13 @@ import (
// Database holds information about registered databases // Database holds information about registered databases
type Database struct { type Database struct {
Name string Name string
Description string Description string
StorageType string StorageType string
PrimaryAPI string ShadowDelete bool // Whether deleted records should be kept until purged.
Registered time.Time Registered time.Time
LastUpdated time.Time LastUpdated time.Time
LastLoaded time.Time LastLoaded time.Time
} }
// MigrateTo migrates the database to another storage type. // MigrateTo migrates the database to another storage type.

View file

@ -26,15 +26,15 @@ func makeKey(dbName, key string) string {
return fmt.Sprintf("%s:%s", dbName, key) return fmt.Sprintf("%s:%s", dbName, key)
} }
func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaintenance bool) { //nolint:gocognit,gocyclo func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolint:gocognit,gocyclo
t.Run(fmt.Sprintf("TestStorage_%s", storageType), func(t *testing.T) { t.Run(fmt.Sprintf("TestStorage_%s_%v", storageType, shadowDelete), func(t *testing.T) {
dbName := fmt.Sprintf("testing-%s", storageType) dbName := fmt.Sprintf("testing-%s-%v", storageType, shadowDelete)
fmt.Println(dbName) fmt.Println(dbName)
_, err := Register(&Database{ _, err := Register(&Database{
Name: dbName, Name: dbName,
Description: fmt.Sprintf("Unit Test Database for %s", storageType), Description: fmt.Sprintf("Unit Test Database for %s", storageType),
StorageType: storageType, StorageType: storageType,
PrimaryAPI: "", ShadowDelete: shadowDelete,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -107,7 +107,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint
} }
// test putmany // test putmany
if testPutMany { if _, ok := dbController.storage.(storage.Batcher); ok {
batchPut := db.PutMany(dbName) batchPut := db.PutMany(dbName)
records := []record.Record{A, B, C, nil} // nil is to signify finish records := []record.Record{A, B, C, nil} // nil is to signify finish
for _, r := range records { for _, r := range records {
@ -119,7 +119,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint
} }
// test maintenance // test maintenance
if testRecordMaintenance { if _, ok := dbController.storage.(storage.Maintainer); ok {
now := time.Now().UTC() now := time.Now().UTC()
nowUnix := now.Unix() nowUnix := now.Unix()
@ -238,10 +238,13 @@ func TestDatabaseSystem(t *testing.T) {
} }
defer os.RemoveAll(testDir) // clean up defer os.RemoveAll(testDir) // clean up
testDatabase(t, "bbolt", true, true) for _, shadowDelete := range []bool{false, true} {
testDatabase(t, "hashmap", true, true) testDatabase(t, "bbolt", shadowDelete)
testDatabase(t, "fstree", false, false) testDatabase(t, "hashmap", shadowDelete)
testDatabase(t, "badger", false, false) testDatabase(t, "fstree", shadowDelete)
// testDatabase(t, "badger", shadowDelete)
// TODO: Fix badger tests
}
err = MaintainRecordStates(context.TODO()) err = MaintainRecordStates(context.TODO())
if err != nil { if err != nil {

View file

@ -1,6 +1,7 @@
package database package database
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"time" "time"
@ -400,6 +401,22 @@ func (i *Interface) Query(q *query.Query) (*iterator.Iterator, error) {
return db.Query(q, i.options.Local, i.options.Internal) return db.Query(q, i.options.Local, i.options.Internal)
} }
// Purge deletes all records that match the given query. It returns the number
// of successful deletes and an error.
func (i *Interface) Purge(ctx context.Context, q *query.Query) (int, error) {
_, err := q.Check()
if err != nil {
return 0, err
}
db, err := getController(q.DatabaseName())
if err != nil {
return 0, err
}
return db.Purge(ctx, q, i.options.Local, i.options.Internal)
}
// Subscribe subscribes to updates matching the given query. // Subscribe subscribes to updates matching the given query.
func (i *Interface) Subscribe(q *query.Query) (*Subscription, error) { func (i *Interface) Subscribe(q *query.Query) (*Subscription, error) {
_, err := q.Check() _, err := q.Check()

View file

@ -49,8 +49,8 @@ func Register(new *Database) (*Database, error) {
registeredDB.Description = new.Description registeredDB.Description = new.Description
save = true save = true
} }
if registeredDB.PrimaryAPI != new.PrimaryAPI { if registeredDB.ShadowDelete != new.ShadowDelete {
registeredDB.PrimaryAPI = new.PrimaryAPI registeredDB.ShadowDelete = new.ShadowDelete
save = true save = true
} }
} else { } else {

View file

@ -208,7 +208,7 @@ func (b *Badger) MaintainThorough(_ context.Context) (err error) {
} }
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
// TODO: implement MaintainRecordStates // TODO: implement MaintainRecordStates
return nil return nil
} }

View file

@ -11,6 +11,13 @@ import (
"github.com/safing/portbase/database/query" "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
)
var (
// Compile time interface checks.
_ storage.Interface = &Badger{}
_ storage.Maintainer = &Badger{}
) )
type TestRecord struct { type TestRecord struct {
@ -117,13 +124,18 @@ func TestBadger(t *testing.T) {
} }
// maintenance // maintenance
err = db.Maintain(context.TODO()) maintainer, ok := db.(storage.Maintainer)
if err != nil { if ok {
t.Fatal(err) err = maintainer.Maintain(context.TODO())
} if err != nil {
err = db.MaintainThorough(context.TODO()) t.Fatal(err)
if err != nil { }
t.Fatal(err) err = maintainer.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}
} else {
t.Fatal("should implement Maintainer")
} }
// shutdown // shutdown

View file

@ -107,7 +107,7 @@ func (b *BBolt) Put(r record.Record) (record.Record, error) {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) { func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
batch := make(chan record.Record, 100) batch := make(chan record.Record, 100)
errs := make(chan error, 1) errs := make(chan error, 1)
@ -115,16 +115,26 @@ func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) {
err := b.db.Batch(func(tx *bbolt.Tx) error { err := b.db.Batch(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName) bucket := tx.Bucket(bucketName)
for r := range batch { for r := range batch {
// marshal if !shadowDelete && r.Meta().IsDeleted() {
data, txErr := r.MarshalRecord(r) // Immediate delete.
if txErr != nil { txErr := bucket.Delete([]byte(r.DatabaseKey()))
return txErr if txErr != nil {
} return txErr
}
} else {
// Put or shadow delete.
// put // marshal
txErr = bucket.Put([]byte(r.DatabaseKey()), data) data, txErr := r.MarshalRecord(r)
if txErr != nil { if txErr != nil {
return txErr return txErr
}
// put
txErr = bucket.Put([]byte(r.DatabaseKey()), data)
if txErr != nil {
return txErr
}
} }
} }
return nil return nil
@ -235,18 +245,8 @@ func (b *BBolt) Injected() bool {
return false return false
} }
// Maintain runs a light maintenance operation on the database.
func (b *BBolt) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (b *BBolt) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { //nolint:gocognit
now := time.Now().Unix() now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix() purgeThreshold := purgeDeletedBefore.Unix()
@ -255,6 +255,13 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim
// Create a cursor for iteration. // Create a cursor for iteration.
c := bucket.Cursor() c := bucket.Cursor()
for key, value := c.First(); key != nil; key, value = c.Next() { for key, value := c.First(); key != nil; key, value = c.Next() {
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
// wrap value // wrap value
wrapper, err := record.NewRawWrapper(b.name, string(key), value) wrapper, err := record.NewRawWrapper(b.name, string(key), value)
if err != nil { if err != nil {
@ -264,39 +271,135 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim
// check if we need to do maintenance // check if we need to do maintenance
meta := wrapper.Meta() meta := wrapper.Meta()
switch { switch {
case meta.Deleted > 0 && meta.Deleted < purgeThreshold: case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now:
if shadowDelete {
// mark as deleted
meta.Deleted = meta.Expires
deleted, err := wrapper.MarshalRecord(wrapper)
if err != nil {
return err
}
err = bucket.Put(key, deleted)
if err != nil {
return err
}
// Cursor repositioning is required after modifying data.
// While the documentation states that this is also required after a
// delete, this actually makes the cursor skip a record with the
// following c.Next() call of the loop.
// Docs/Issue: https://github.com/boltdb/bolt/issues/426#issuecomment-141982984
c.Seek(key)
continue
}
// Immediately delete expired entries if shadowDelete is disabled.
fallthrough
case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold):
// delete from storage // delete from storage
err = c.Delete() err = c.Delete()
if err != nil { if err != nil {
return err return err
} }
case meta.Expires > 0 && meta.Expires < now:
// mark as deleted
meta.Deleted = meta.Expires
deleted, err := wrapper.MarshalRecord(wrapper)
if err != nil {
return err
}
err = bucket.Put(key, deleted)
if err != nil {
return err
}
// reposition cursor
c.Seek(key)
}
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
} }
} }
return nil return nil
}) })
} }
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.
func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) { //nolint:gocognit
prefix := []byte(q.DatabaseKeyPrefix())
var cnt int
var done bool
for !done {
err := b.db.Update(func(tx *bbolt.Tx) error {
// Create a cursor for iteration.
bucket := tx.Bucket(bucketName)
c := bucket.Cursor()
for key, value := c.Seek(prefix); key != nil; key, value = c.Next() {
// Check if context has been cancelled.
select {
case <-ctx.Done():
done = true
return nil
default:
}
// Check if we still match the key prefix, if not, exit.
if !bytes.HasPrefix(key, prefix) {
done = true
return nil
}
// Wrap the value in a new wrapper to access the metadata.
wrapper, err := record.NewRawWrapper(b.name, string(key), value)
if err != nil {
return err
}
// Check if we have permission for this record.
if !wrapper.Meta().CheckPermission(local, internal) {
continue
}
// Check if record is already deleted.
if wrapper.Meta().IsDeleted() {
continue
}
// Check if the query matches this record.
if !q.MatchesRecord(wrapper) {
continue
}
// Delete record.
if shadowDelete {
// Shadow delete.
wrapper.Meta().Delete()
deleted, err := wrapper.MarshalRecord(wrapper)
if err != nil {
return err
}
err = bucket.Put(key, deleted)
if err != nil {
return err
}
// Cursor repositioning is required after modifying data.
// While the documentation states that this is also required after a
// delete, this actually makes the cursor skip a record with the
// following c.Next() call of the loop.
// Docs/Issue: https://github.com/boltdb/bolt/issues/426#issuecomment-141982984
c.Seek(key)
} else {
// Immediate delete.
err = c.Delete()
if err != nil {
return err
}
}
// Work in batches of 1000 changes in order to enable other operations in between.
cnt++
if cnt%1000 == 0 {
return nil
}
}
done = true
return nil
})
if err != nil {
return cnt, err
}
}
return cnt, nil
}
// Shutdown shuts down the database. // Shutdown shuts down the database.
func (b *BBolt) Shutdown() error { func (b *BBolt) Shutdown() error {
return b.db.Close() return b.db.Close()

View file

@ -12,6 +12,14 @@ import (
"github.com/safing/portbase/database/query" "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
)
var (
// Compile time interface checks.
_ storage.Interface = &BBolt{}
_ storage.Batcher = &BBolt{}
_ storage.Purger = &BBolt{}
) )
type TestRecord struct { type TestRecord struct {
@ -146,18 +154,47 @@ func TestBBolt(t *testing.T) {
} }
// maintenance // maintenance
err = db.Maintain(context.TODO()) err = db.MaintainRecordStates(context.TODO(), time.Now(), true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.MaintainThorough(context.TODO())
// maintenance
err = db.MaintainRecordStates(context.TODO(), time.Now(), false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.MaintainRecordStates(context.TODO(), time.Now())
// purging
purger, ok := db.(storage.Purger)
if ok {
n, err := purger.Purge(context.TODO(), query.New("test:path/to/").MustBeValid(), true, true, false)
if err != nil {
t.Fatal(err)
}
if n != 3 {
t.Fatalf("unexpected purge delete count: %d", n)
}
} else {
t.Fatal("should implement Purger")
}
// test query
q = query.New("test").MustBeValid()
it, err = db.Query(q, true, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
cnt = 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
if cnt != 1 {
t.Fatalf("unexpected query result count: %d", cnt)
}
// shutdown // shutdown
err = db.Shutdown() err = db.Shutdown()

View file

@ -255,18 +255,8 @@ func (fst *FSTree) Injected() bool {
return false return false
} }
// Maintain runs a light maintenance operation on the database.
func (fst *FSTree) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (fst *FSTree) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
// TODO: implement MaintainRecordStates // TODO: implement MaintainRecordStates
return nil return nil
} }
@ -279,6 +269,7 @@ func (fst *FSTree) Shutdown() error {
// writeFile mirrors ioutil.WriteFile, replacing an existing file with the same // writeFile mirrors ioutil.WriteFile, replacing an existing file with the same
// name atomically. This is not atomic on Windows, but still an improvement. // name atomically. This is not atomic on Windows, but still an improvement.
// TODO: Replace with github.com/google/renamio.WriteFile as soon as it is fixed on Windows. // TODO: Replace with github.com/google/renamio.WriteFile as soon as it is fixed on Windows.
// TODO: This has become a wont-fix. Explore other options.
// This function is forked from https://github.com/google/renameio/blob/a368f9987532a68a3d676566141654a81aa8100b/writefile.go. // This function is forked from https://github.com/google/renameio/blob/a368f9987532a68a3d676566141654a81aa8100b/writefile.go.
func writeFile(filename string, data []byte, perm os.FileMode) error { func writeFile(filename string, data []byte, perm os.FileMode) error {
t, err := renameio.TempFile("", filename) t, err := renameio.TempFile("", filename)

View file

@ -0,0 +1,8 @@
package fstree
import "github.com/safing/portbase/database/storage"
var (
// Compile time interface checks.
_ storage.Interface = &FSTree{}
)

View file

@ -54,7 +54,7 @@ func (hm *HashMap) Put(r record.Record) (record.Record, error) {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) { func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
hm.dbLock.Lock() hm.dbLock.Lock()
defer hm.dbLock.Unlock() defer hm.dbLock.Unlock()
// we could lock for every record, but we want to have the same behaviour // we could lock for every record, but we want to have the same behaviour
@ -66,7 +66,11 @@ func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) {
// start handler // start handler
go func() { go func() {
for r := range batch { for r := range batch {
hm.db[r.DatabaseKey()] = r if !shadowDelete && r.Meta().IsDeleted() {
delete(hm.db, r.DatabaseKey())
} else {
hm.db[r.DatabaseKey()] = r
}
} }
errs <- nil errs <- nil
}() }()
@ -145,18 +149,8 @@ func (hm *HashMap) Injected() bool {
return false return false
} }
// Maintain runs a light maintenance operation on the database.
func (hm *HashMap) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (hm *HashMap) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
hm.dbLock.Lock() hm.dbLock.Lock()
defer hm.dbLock.Unlock() defer hm.dbLock.Unlock()
@ -164,24 +158,31 @@ func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore
purgeThreshold := purgeDeletedBefore.Unix() purgeThreshold := purgeDeletedBefore.Unix()
for key, record := range hm.db { for key, record := range hm.db {
meta := record.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
// delete from storage
delete(hm.db, key)
case meta.Expires > 0 && meta.Expires < now:
// mark as deleted
record.Lock()
meta.Deleted = meta.Expires
record.Unlock()
}
// check if context is cancelled // check if context is cancelled
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
default: default:
} }
meta := record.Meta()
switch {
case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now:
if shadowDelete {
// mark as deleted
record.Lock()
meta.Deleted = meta.Expires
record.Unlock()
continue
}
// Immediately delete expired entries if shadowDelete is disabled.
fallthrough
case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold):
// delete from storage
delete(hm.db, key)
}
} }
return nil return nil

View file

@ -2,15 +2,22 @@
package hashmap package hashmap
import ( import (
"context"
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"github.com/safing/portbase/database/storage"
"github.com/safing/portbase/database/query" "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
) )
var (
// Compile time interface checks.
_ storage.Interface = &HashMap{}
_ storage.Batcher = &HashMap{}
)
type TestRecord struct { type TestRecord struct {
record.Base record.Base
sync.Mutex sync.Mutex
@ -130,16 +137,6 @@ func TestHashMap(t *testing.T) {
t.Fatal("should fail") t.Fatal("should fail")
} }
// maintenance
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}
// shutdown // shutdown
err = db.Shutdown() err = db.Shutdown()
if err != nil { if err != nil {

View file

@ -17,6 +17,9 @@ var (
// InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces. // InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces.
type InjectBase struct{} type InjectBase struct{}
// Compile time interface check
var _ Interface = &InjectBase{}
// Get returns a database record. // Get returns a database record.
func (i *InjectBase) Get(key string) (record.Record, error) { func (i *InjectBase) Get(key string) (record.Record, error) {
return nil, errNotImplemented return nil, errNotImplemented
@ -27,14 +30,6 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) {
return nil, errNotImplemented return nil, errNotImplemented
} }
// PutMany stores many records in the database.
func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) {
batch = make(chan record.Record)
err = make(chan error, 1)
err <- errNotImplemented
return
}
// Delete deletes a record from the database. // Delete deletes a record from the database.
func (i *InjectBase) Delete(key string) error { func (i *InjectBase) Delete(key string) error {
return errNotImplemented return errNotImplemented
@ -55,18 +50,8 @@ func (i *InjectBase) Injected() bool {
return true return true
} }
// Maintain runs a light maintenance operation on the database.
func (i *InjectBase) Maintain(ctx context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (i *InjectBase) MaintainThorough(ctx context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
return nil return nil
} }

View file

@ -11,20 +11,33 @@ import (
// Interface defines the database storage API. // Interface defines the database storage API.
type Interface interface { type Interface interface {
// Primary Interface
Get(key string) (record.Record, error) Get(key string) (record.Record, error)
Put(m record.Record) (record.Record, error) Put(m record.Record) (record.Record, error)
Delete(key string) error Delete(key string) error
Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error)
// Information and Control
ReadOnly() bool ReadOnly() bool
Injected() bool Injected() bool
Shutdown() error
// Mandatory Record Maintenance
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error
}
// Maintainer defines the database storage API for backends that require regular maintenance.
type Maintainer interface {
Maintain(ctx context.Context) error Maintain(ctx context.Context) error
MaintainThorough(ctx context.Context) error MaintainThorough(ctx context.Context) error
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error
Shutdown() error
} }
// Batcher defines the database storage API for backends that support batch operations. // Batcher defines the database storage API for backends that support batch operations.
type Batcher interface { type Batcher interface {
PutMany() (batch chan<- record.Record, errs <-chan error) PutMany(shadowDelete bool) (batch chan<- record.Record, errs <-chan error)
}
// Purger defines the database storage API for backends that support the purge operation.
type Purger interface {
Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error)
} }

View file

@ -16,6 +16,13 @@ type Sinkhole struct {
name string name string
} }
var (
// Compile time interface check
_ storage.Interface = &Sinkhole{}
_ storage.Maintainer = &Sinkhole{}
_ storage.Batcher = &Sinkhole{}
)
func init() { func init() {
_ = storage.Register("sinkhole", NewSinkhole) _ = storage.Register("sinkhole", NewSinkhole)
} }
@ -43,7 +50,7 @@ func (s *Sinkhole) Put(r record.Record) (record.Record, error) {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) { func (s *Sinkhole) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
batch := make(chan record.Record, 100) batch := make(chan record.Record, 100)
errs := make(chan error, 1) errs := make(chan error, 1)
@ -89,7 +96,7 @@ func (s *Sinkhole) MaintainThorough(ctx context.Context) error {
} }
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
return nil return nil
} }

View file

@ -38,7 +38,6 @@ func registerAsDatabase() error {
Name: "notifications", Name: "notifications",
Description: "Notifications", Description: "Notifications",
StorageType: "injected", StorageType: "injected",
PrimaryAPI: "",
}) })
if err != nil { if err != nil {
return err return err

View file

@ -38,7 +38,7 @@ func Run() int {
} }
if printStackOnExit { if printStackOnExit {
printStackTo(os.Stdout) printStackTo(os.Stdout, "PRINTING STACK ON EXIT (STARTUP ERROR)")
} }
_ = modules.Shutdown() _ = modules.Shutdown()
@ -67,7 +67,7 @@ signalLoop:
case sig := <-signalCh: case sig := <-signalCh:
// only print and continue to wait if SIGUSR1 // only print and continue to wait if SIGUSR1
if sig == sigUSR1 { if sig == sigUSR1 {
_ = pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) printStackTo(os.Stderr, "PRINTING STACK ON REQUEST")
continue signalLoop continue signalLoop
} }
@ -83,21 +83,19 @@ signalLoop:
if forceCnt > 0 { if forceCnt > 0 {
fmt.Printf(" <INTERRUPT> again, but already shutting down. %d more to force.\n", forceCnt) fmt.Printf(" <INTERRUPT> again, but already shutting down. %d more to force.\n", forceCnt)
} else { } else {
fmt.Fprintln(os.Stderr, "===== FORCED EXIT =====") printStackTo(os.Stderr, "PRINTING STACK ON FORCED EXIT")
printStackTo(os.Stderr)
os.Exit(1) os.Exit(1)
} }
} }
}() }()
if printStackOnExit { if printStackOnExit {
printStackTo(os.Stdout) printStackTo(os.Stdout, "PRINTING STACK ON EXIT")
} }
go func() { go func() {
time.Sleep(3 * time.Minute) time.Sleep(3 * time.Minute)
fmt.Fprintln(os.Stderr, "===== TAKING TOO LONG FOR SHUTDOWN =====") printStackTo(os.Stderr, "PRINTING STACK - TAKING TOO LONG FOR SHUTDOWN")
printStackTo(os.Stderr)
os.Exit(1) os.Exit(1)
}() }()
@ -131,13 +129,12 @@ func inputSignals(signalCh chan os.Signal) {
} }
} }
func printStackTo(writer io.Writer) { func printStackTo(writer io.Writer, msg string) {
fmt.Fprintln(writer, "=== PRINTING TRACES ===") _, err := fmt.Fprintf(writer, "===== %s =====\n", msg)
fmt.Fprintln(writer, "=== GOROUTINES ===") if err == nil {
_ = pprof.Lookup("goroutine").WriteTo(writer, 1) err = pprof.Lookup("goroutine").WriteTo(writer, 1)
fmt.Fprintln(writer, "=== BLOCKING ===") }
_ = pprof.Lookup("block").WriteTo(writer, 1) if err != nil {
fmt.Fprintln(writer, "=== MUTEXES ===") log.Errorf("main: failed to write stack trace: %s", err)
_ = pprof.Lookup("mutex").WriteTo(writer, 1) }
fmt.Fprintln(writer, "=== END TRACES ===")
} }