mirror of
https://github.com/safing/portbase
synced 2025-09-02 02:29:59 +00:00
Make shadow deletes conditional
Also, Move maintenance to separate interface.
This commit is contained in:
parent
c479430d46
commit
5bb73a7b4c
9 changed files with 83 additions and 45 deletions
|
@ -16,7 +16,8 @@ import (
|
|||
|
||||
// A Controller takes care of all the extra database logic.
|
||||
type Controller struct {
|
||||
storage storage.Interface
|
||||
storage storage.Interface
|
||||
shadowDelete bool
|
||||
|
||||
hooks []*RegisteredHook
|
||||
subscriptions []*Subscription
|
||||
|
@ -33,11 +34,12 @@ type Controller struct {
|
|||
}
|
||||
|
||||
// newController creates a new controller for a storage.
|
||||
func newController(storageInt storage.Interface) *Controller {
|
||||
func newController(storageInt storage.Interface, shadowDelete bool) *Controller {
|
||||
return &Controller{
|
||||
storage: storageInt,
|
||||
migrating: abool.NewBool(false),
|
||||
hibernating: abool.NewBool(false),
|
||||
storage: storageInt,
|
||||
shadowDelete: shadowDelete,
|
||||
migrating: abool.NewBool(false),
|
||||
hibernating: abool.NewBool(false),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -122,12 +124,21 @@ func (c *Controller) Put(r record.Record) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
r, err = c.storage.Put(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if r == nil {
|
||||
return errors.New("storage returned nil record after successful put operation")
|
||||
if !c.shadowDelete && r.Meta().IsDeleted() {
|
||||
// Immediate delete.
|
||||
err = c.storage.Delete(r.DatabaseKey())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Put or shadow delete.
|
||||
r, err = c.storage.Put(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if r == nil {
|
||||
return errors.New("storage returned nil record after successful put operation")
|
||||
}
|
||||
}
|
||||
|
||||
// process subscriptions
|
||||
|
@ -161,7 +172,7 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
|
|||
}
|
||||
|
||||
if batcher, ok := c.storage.(storage.Batcher); ok {
|
||||
return batcher.PutMany()
|
||||
return batcher.PutMany(c.shadowDelete)
|
||||
}
|
||||
|
||||
errs := make(chan error, 1)
|
||||
|
@ -236,7 +247,10 @@ func (c *Controller) Maintain(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return c.storage.Maintain(ctx)
|
||||
if maintenance, ok := c.storage.(storage.Maintenance); ok {
|
||||
return maintenance.Maintain(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MaintainThorough runs the MaintainThorough method on the storage.
|
||||
|
@ -248,7 +262,10 @@ func (c *Controller) MaintainThorough(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return c.storage.MaintainThorough(ctx)
|
||||
if maintenance, ok := c.storage.(storage.Maintenance); ok {
|
||||
return maintenance.MaintainThorough(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MaintainRecordStates runs the record state lifecycle maintenance on the storage.
|
||||
|
@ -260,7 +277,10 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor
|
|||
return nil
|
||||
}
|
||||
|
||||
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore)
|
||||
if maintenance, ok := c.storage.(storage.Maintenance); ok {
|
||||
return maintenance.MaintainRecordStates(ctx, purgeDeletedBefore)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the storage.
|
||||
|
|
|
@ -51,7 +51,7 @@ func getController(name string) (*Controller, error) {
|
|||
return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err)
|
||||
}
|
||||
|
||||
controller = newController(storageInt)
|
||||
controller = newController(storageInt, registeredDB.ShadowDelete)
|
||||
controllers[name] = controller
|
||||
return controller, nil
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ func InjectDatabase(name string, storageInt storage.Interface) (*Controller, err
|
|||
return nil, fmt.Errorf(`database not of type "injected"`)
|
||||
}
|
||||
|
||||
controller := newController(storageInt)
|
||||
controller := newController(storageInt, false)
|
||||
controllers[name] = controller
|
||||
return controller, nil
|
||||
}
|
||||
|
|
|
@ -7,13 +7,13 @@ import (
|
|||
|
||||
// Database holds information about registered databases
|
||||
type Database struct {
|
||||
Name string
|
||||
Description string
|
||||
StorageType string
|
||||
PrimaryAPI string
|
||||
Registered time.Time
|
||||
LastUpdated time.Time
|
||||
LastLoaded time.Time
|
||||
Name string
|
||||
Description string
|
||||
StorageType string
|
||||
ShadowDelete bool // Whether deleted records should be kept until purged.
|
||||
Registered time.Time
|
||||
LastUpdated time.Time
|
||||
LastLoaded time.Time
|
||||
}
|
||||
|
||||
// MigrateTo migrates the database to another storage type.
|
||||
|
|
|
@ -31,10 +31,10 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint
|
|||
dbName := fmt.Sprintf("testing-%s", storageType)
|
||||
fmt.Println(dbName)
|
||||
_, err := Register(&Database{
|
||||
Name: dbName,
|
||||
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
|
||||
StorageType: storageType,
|
||||
PrimaryAPI: "",
|
||||
Name: dbName,
|
||||
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
|
||||
StorageType: storageType,
|
||||
ShadowDelete: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -49,8 +49,8 @@ func Register(new *Database) (*Database, error) {
|
|||
registeredDB.Description = new.Description
|
||||
save = true
|
||||
}
|
||||
if registeredDB.PrimaryAPI != new.PrimaryAPI {
|
||||
registeredDB.PrimaryAPI = new.PrimaryAPI
|
||||
if registeredDB.ShadowDelete != new.ShadowDelete {
|
||||
registeredDB.ShadowDelete = new.ShadowDelete
|
||||
save = true
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -107,7 +107,7 @@ func (b *BBolt) Put(r record.Record) (record.Record, error) {
|
|||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) {
|
||||
func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
|
||||
batch := make(chan record.Record, 100)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
|
@ -115,16 +115,26 @@ func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) {
|
|||
err := b.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(bucketName)
|
||||
for r := range batch {
|
||||
// marshal
|
||||
data, txErr := r.MarshalRecord(r)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
if !shadowDelete && r.Meta().IsDeleted() {
|
||||
// Immediate delete.
|
||||
txErr := bucket.Delete([]byte(r.DatabaseKey()))
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
} else {
|
||||
// Put or shadow delete.
|
||||
|
||||
// put
|
||||
txErr = bucket.Put([]byte(r.DatabaseKey()), data)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
// marshal
|
||||
data, txErr := r.MarshalRecord(r)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
|
||||
// put
|
||||
txErr = bucket.Put([]byte(r.DatabaseKey()), data)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -54,7 +54,7 @@ func (hm *HashMap) Put(r record.Record) (record.Record, error) {
|
|||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) {
|
||||
func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
|
||||
hm.dbLock.Lock()
|
||||
defer hm.dbLock.Unlock()
|
||||
// we could lock for every record, but we want to have the same behaviour
|
||||
|
@ -66,7 +66,11 @@ func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) {
|
|||
// start handler
|
||||
go func() {
|
||||
for r := range batch {
|
||||
hm.db[r.DatabaseKey()] = r
|
||||
if !shadowDelete && r.Meta().IsDeleted() {
|
||||
delete(hm.db, r.DatabaseKey())
|
||||
} else {
|
||||
hm.db[r.DatabaseKey()] = r
|
||||
}
|
||||
}
|
||||
errs <- nil
|
||||
}()
|
||||
|
|
|
@ -28,7 +28,7 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) {
|
|||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) {
|
||||
func (i *InjectBase) PutMany(shadowDelete bool) (batch chan record.Record, err chan error) {
|
||||
batch = make(chan record.Record)
|
||||
err = make(chan error, 1)
|
||||
err <- errNotImplemented
|
||||
|
|
|
@ -18,13 +18,17 @@ type Interface interface {
|
|||
|
||||
ReadOnly() bool
|
||||
Injected() bool
|
||||
Shutdown() error
|
||||
}
|
||||
|
||||
// Maintenance defines the database storage API for backends that requ
|
||||
type Maintenance interface {
|
||||
Maintain(ctx context.Context) error
|
||||
MaintainThorough(ctx context.Context) error
|
||||
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error
|
||||
Shutdown() error
|
||||
}
|
||||
|
||||
// Batcher defines the database storage API for backends that support batch operations.
|
||||
type Batcher interface {
|
||||
PutMany() (batch chan<- record.Record, errs <-chan error)
|
||||
PutMany(shadowDelete bool) (batch chan<- record.Record, errs <-chan error)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue