Disable hourly DB wipe, improve DB maintainance

This commit is contained in:
Daniel 2020-04-27 08:19:33 +02:00
parent dd892c97d2
commit 9d526314a9
3 changed files with 68 additions and 13 deletions

View file

@ -1,6 +1,7 @@
package database package database
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
@ -162,7 +163,7 @@ func TestDatabaseSystem(t *testing.T) {
testDatabase(t, "fstree") testDatabase(t, "fstree")
testDatabase(t, "hashmap") testDatabase(t, "hashmap")
err = MaintainRecordStates() err = MaintainRecordStates(context.TODO())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/safing/portbase/database" "github.com/safing/portbase/database"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules" "github.com/safing/portbase/modules"
) )
@ -15,13 +16,16 @@ func startMaintenanceTasks() {
} }
func maintainBasic(ctx context.Context, task *modules.Task) error { func maintainBasic(ctx context.Context, task *modules.Task) error {
log.Infof("database: running Maintain")
return database.Maintain() return database.Maintain()
} }
func maintainThorough(ctx context.Context, task *modules.Task) error { func maintainThorough(ctx context.Context, task *modules.Task) error {
log.Infof("database: running MaintainThorough")
return database.MaintainThorough() return database.MaintainThorough()
} }
func maintainRecords(ctx context.Context, task *modules.Task) error { func maintainRecords(ctx context.Context, task *modules.Task) error {
return database.MaintainRecordStates() log.Infof("database: running MaintainRecordStates")
return database.MaintainRecordStates(ctx)
} }

View file

@ -1,16 +1,21 @@
package database package database
import ( import (
"context"
"time" "time"
"github.com/tevino/abool"
"github.com/safing/portbase/database/query" "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
) )
// Maintain runs the Maintain method on all storages. // Maintain runs the Maintain method on all storages.
func Maintain() (err error) { func Maintain() (err error) {
controllers := duplicateControllers() // copy, as we might use the very long
for _, c := range controllers { all := duplicateControllers()
for _, c := range all {
err = c.Maintain() err = c.Maintain()
if err != nil { if err != nil {
return return
@ -21,7 +26,9 @@ func Maintain() (err error) {
// MaintainThorough runs the MaintainThorough method on all storages. // MaintainThorough runs the MaintainThorough method on all storages.
func MaintainThorough() (err error) { func MaintainThorough() (err error) {
// copy, as we might use the very long
all := duplicateControllers() all := duplicateControllers()
for _, c := range all { for _, c := range all {
err = c.MaintainThorough() err = c.MaintainThorough()
if err != nil { if err != nil {
@ -32,12 +39,32 @@ func MaintainThorough() (err error) {
} }
// MaintainRecordStates runs record state lifecycle maintenance on all storages. // MaintainRecordStates runs record state lifecycle maintenance on all storages.
func MaintainRecordStates() error { func MaintainRecordStates(ctx context.Context) error { //nolint:gocognit
// TODO: Put this in the storage interface to correctly maintain on all storages.
// Storages might check for deletion and expiry in the query interface and not return anything here.
// listen for ctx cancel
stop := abool.New()
doneCh := make(chan struct{}) // for goroutine cleanup
defer close(doneCh)
go func() {
select {
case <-ctx.Done():
case <-doneCh:
}
stop.Set()
}()
// copy, as we might use the very long
all := duplicateControllers() all := duplicateControllers()
now := time.Now().Unix() now := time.Now().Unix()
thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour).Unix() thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour).Unix()
for _, c := range all { for _, c := range all {
if stop.IsSet() {
return nil
}
if c.ReadOnly() || c.Injected() { if c.ReadOnly() || c.Injected() {
continue continue
@ -56,30 +83,52 @@ func MaintainRecordStates() error {
var toDelete []record.Record var toDelete []record.Record
var toExpire []record.Record var toExpire []record.Record
for r := range it.Next { queryLoop:
switch { for {
case r.Meta().Deleted < thirtyDaysAgo: select {
toDelete = append(toDelete, r) case r := <-it.Next:
case r.Meta().Expires < now: if r == nil {
toExpire = append(toExpire, r) break queryLoop
}
meta := r.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < thirtyDaysAgo:
toDelete = append(toDelete, r)
case meta.Expires > 0 && meta.Expires < now:
toExpire = append(toExpire, r)
}
case <-ctx.Done():
it.Cancel()
break queryLoop
} }
} }
if it.Err() != nil { if it.Err() != nil {
return err return err
} }
if stop.IsSet() {
return nil
}
for _, r := range toDelete { for _, r := range toDelete {
err := c.storage.Delete(r.DatabaseKey()) err := c.storage.Delete(r.DatabaseKey())
if err != nil { if err != nil {
return err return err
} }
if stop.IsSet() {
return nil
}
} }
for _, r := range toExpire { for _, r := range toExpire {
r.Meta().Delete() r.Meta().Delete()
err := c.Put(r) err := c.Put(r)
if err != nil { if err != nil {
return err return err
} }
if stop.IsSet() {
return nil
}
} }
} }
@ -87,9 +136,10 @@ func MaintainRecordStates() error {
} }
func duplicateControllers() (all []*Controller) { func duplicateControllers() (all []*Controller) {
controllersLock.Lock() controllersLock.RLock()
defer controllersLock.Unlock() defer controllersLock.RUnlock()
all = make([]*Controller, len(controllers))
for _, c := range controllers { for _, c := range controllers {
all = append(all, c) all = append(all, c)
} }