Release to master

This commit is contained in:
Daniel 2020-05-08 11:07:39 +02:00
commit 5ce9236979
27 changed files with 657 additions and 305 deletions

View file

@ -16,7 +16,10 @@ type Example struct {
}
var (
exampleDB = NewInterface(nil)
exampleDB = NewInterface(&Options{
Internal: true,
Local: true,
})
)
// GetExample gets an Example from the database.

View file

@ -1,8 +1,10 @@
package database
import (
"context"
"errors"
"sync"
"time"
"github.com/tevino/abool"
@ -226,7 +228,7 @@ func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) {
}
// Maintain runs the Maintain method on the storage.
func (c *Controller) Maintain() error {
func (c *Controller) Maintain(ctx context.Context) error {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
@ -234,11 +236,11 @@ func (c *Controller) Maintain() error {
return nil
}
return c.storage.Maintain()
return c.storage.Maintain(ctx)
}
// MaintainThorough runs the MaintainThorough method on the storage.
func (c *Controller) MaintainThorough() error {
func (c *Controller) MaintainThorough(ctx context.Context) error {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
@ -246,7 +248,19 @@ func (c *Controller) MaintainThorough() error {
return nil
}
return c.storage.MaintainThorough()
return c.storage.MaintainThorough(ctx)
}
// MaintainRecordStates runs the record state lifecycle maintenance on the storage.
func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
if shuttingDown.IsSet() {
return nil
}
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore)
}
// Shutdown shuts down the storage.

View file

@ -11,9 +11,11 @@ import (
"testing"
"time"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
q "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
_ "github.com/safing/portbase/database/storage/badger"
_ "github.com/safing/portbase/database/storage/bbolt"
_ "github.com/safing/portbase/database/storage/fstree"
@ -24,117 +26,195 @@ func makeKey(dbName, key string) string {
return fmt.Sprintf("%s:%s", dbName, key)
}
func testDatabase(t *testing.T, storageType string) {
dbName := fmt.Sprintf("testing-%s", storageType)
_, err := Register(&Database{
Name: dbName,
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
StorageType: storageType,
PrimaryAPI: "",
})
if err != nil {
t.Fatal(err)
}
func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaintenance bool) { //nolint:gocognit,gocyclo
t.Run(fmt.Sprintf("TestStorage_%s", storageType), func(t *testing.T) {
dbName := fmt.Sprintf("testing-%s", storageType)
fmt.Println(dbName)
_, err := Register(&Database{
Name: dbName,
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
StorageType: storageType,
PrimaryAPI: "",
})
if err != nil {
t.Fatal(err)
}
dbController, err := getController(dbName)
if err != nil {
t.Fatal(err)
}
// hook
hook, err := RegisterHook(q.New(dbName).MustBeValid(), &HookBase{})
if err != nil {
t.Fatal(err)
}
// hook
hook, err := RegisterHook(q.New(dbName).MustBeValid(), &HookBase{})
if err != nil {
t.Fatal(err)
}
// interface
db := NewInterface(&Options{
Local: true,
Internal: true,
})
// interface
db := NewInterface(&Options{
Local: true,
Internal: true,
})
// sub
sub, err := db.Subscribe(q.New(dbName).MustBeValid())
if err != nil {
t.Fatal(err)
}
// sub
sub, err := db.Subscribe(q.New(dbName).MustBeValid())
if err != nil {
t.Fatal(err)
}
A := NewExample(makeKey(dbName, "A"), "Herbert", 411)
err = A.Save()
if err != nil {
t.Fatal(err)
}
A := NewExample(dbName+":A", "Herbert", 411)
err = A.Save()
if err != nil {
t.Fatal(err)
}
B := NewExample(makeKey(dbName, "B"), "Fritz", 347)
err = B.Save()
if err != nil {
t.Fatal(err)
}
B := NewExample(makeKey(dbName, "B"), "Fritz", 347)
err = B.Save()
if err != nil {
t.Fatal(err)
}
C := NewExample(makeKey(dbName, "C"), "Norbert", 217)
err = C.Save()
if err != nil {
t.Fatal(err)
}
C := NewExample(makeKey(dbName, "C"), "Norbert", 217)
err = C.Save()
if err != nil {
t.Fatal(err)
}
exists, err := db.Exists(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("record %s should exist!", makeKey(dbName, "A"))
}
exists, err := db.Exists(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("record %s should exist!", makeKey(dbName, "A"))
}
A1, err := GetExample(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(A, A1) {
log.Fatalf("A and A1 mismatch, A1: %v", A1)
}
A1, err := GetExample(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(A, A1) {
log.Fatalf("A and A1 mismatch, A1: %v", A1)
}
query, err := q.New(dbName).Where(
q.And(
q.Where("Name", q.EndsWith, "bert"),
q.Where("Score", q.GreaterThan, 100),
),
).Check()
if err != nil {
t.Fatal(err)
}
cnt := countRecords(t, db, q.New(dbName).Where(
q.And(
q.Where("Name", q.EndsWith, "bert"),
q.Where("Score", q.GreaterThan, 100),
),
))
if cnt != 2 {
t.Fatalf("expected two records, got %d", cnt)
}
it, err := db.Query(query)
if err != nil {
t.Fatal(err)
}
// test putmany
if testPutMany {
batchPut := db.PutMany(dbName)
records := []record.Record{A, B, C, nil} // nil is to signify finish
for _, r := range records {
err = batchPut(r)
if err != nil {
t.Fatal(err)
}
}
}
cnt := 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
if cnt != 2 {
t.Fatalf("expected two records, got %d", cnt)
}
// test maintenance
if testRecordMaintenance {
now := time.Now().UTC()
nowUnix := now.Unix()
switch storageType {
case "bbolt", "hashmap":
batchPut := db.PutMany(dbName)
records := []record.Record{A, B, C, nil} // nil is to signify finish
for _, r := range records {
err = batchPut(r)
// we start with 3 records without expiry
cnt := countRecords(t, db, q.New(dbName))
if cnt != 3 {
t.Fatalf("expected three records, got %d", cnt)
}
// delete entry
A.Meta().Deleted = nowUnix - 61
err = A.Save()
if err != nil {
t.Fatal(err)
}
// expire entry
B.Meta().Expires = nowUnix - 1
err = B.Save()
if err != nil {
t.Fatal(err)
}
// one left
cnt = countRecords(t, db, q.New(dbName))
if cnt != 1 {
t.Fatalf("expected one record, got %d", cnt)
}
// run maintenance
err = dbController.MaintainRecordStates(context.TODO(), now.Add(-60*time.Second))
if err != nil {
t.Fatal(err)
}
// one left
cnt = countRecords(t, db, q.New(dbName))
if cnt != 1 {
t.Fatalf("expected one record, got %d", cnt)
}
// check status individually
_, err = dbController.storage.Get("A")
if err != storage.ErrNotFound {
t.Errorf("A should be deleted and purged, err=%s", err)
}
B1, err := dbController.storage.Get("B")
if err != nil {
t.Fatalf("should exist: %s, original meta: %+v", err, B.Meta())
}
if B1.Meta().Deleted == 0 {
t.Errorf("B should be deleted")
}
// delete last entry
C.Meta().Deleted = nowUnix - 1
err = C.Save()
if err != nil {
t.Fatal(err)
}
// run maintenance
err = dbController.MaintainRecordStates(context.TODO(), now)
if err != nil {
t.Fatal(err)
}
// check status individually
B2, err := dbController.storage.Get("B")
if err == nil {
t.Errorf("B should be deleted and purged, meta: %+v", B2.Meta())
} else if err != storage.ErrNotFound {
t.Errorf("B should be deleted and purged, err=%s", err)
}
C2, err := dbController.storage.Get("C")
if err == nil {
t.Errorf("C should be deleted and purged, meta: %+v", C2.Meta())
} else if err != storage.ErrNotFound {
t.Errorf("C should be deleted and purged, err=%s", err)
}
// none left
cnt = countRecords(t, db, q.New(dbName))
if cnt != 0 {
t.Fatalf("expected no records, got %d", cnt)
}
}
}
err = hook.Cancel()
if err != nil {
t.Fatal(err)
}
err = sub.Cancel()
if err != nil {
t.Fatal(err)
}
err = hook.Cancel()
if err != nil {
t.Fatal(err)
}
err = sub.Cancel()
if err != nil {
t.Fatal(err)
}
})
}
func TestDatabaseSystem(t *testing.T) {
@ -158,22 +238,22 @@ func TestDatabaseSystem(t *testing.T) {
}
defer os.RemoveAll(testDir) // clean up
testDatabase(t, "badger")
testDatabase(t, "bbolt")
testDatabase(t, "fstree")
testDatabase(t, "hashmap")
testDatabase(t, "bbolt", true, true)
testDatabase(t, "hashmap", true, true)
testDatabase(t, "fstree", false, false)
testDatabase(t, "badger", false, false)
err = MaintainRecordStates(context.TODO())
if err != nil {
t.Fatal(err)
}
err = Maintain()
err = Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = MaintainThorough()
err = MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}
@ -182,5 +262,25 @@ func TestDatabaseSystem(t *testing.T) {
if err != nil {
t.Fatal(err)
}
}
func countRecords(t *testing.T, db *Interface, query *q.Query) int {
_, err := query.Check()
if err != nil {
t.Fatal(err)
}
it, err := db.Query(query)
if err != nil {
t.Fatal(err)
}
cnt := 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
return cnt
}

View file

@ -17,12 +17,12 @@ func startMaintenanceTasks() {
func maintainBasic(ctx context.Context, task *modules.Task) error {
log.Infof("database: running Maintain")
return database.Maintain()
return database.Maintain(ctx)
}
func maintainThorough(ctx context.Context, task *modules.Task) error {
log.Infof("database: running MaintainThorough")
return database.MaintainThorough()
return database.MaintainThorough(ctx)
}
func maintainRecords(ctx context.Context, task *modules.Task) error {

View file

@ -181,7 +181,7 @@ func (i *Interface) Put(r record.Record) (err error) {
return err
}
} else {
db, err = getController(r.DatabaseKey())
db, err = getController(r.DatabaseName())
if err != nil {
return err
}

View file

@ -3,20 +3,15 @@ package database
import (
"context"
"time"
"github.com/tevino/abool"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
)
// Maintain runs the Maintain method on all storages.
func Maintain() (err error) {
func Maintain(ctx context.Context) (err error) {
// copy, as we might use the very long
all := duplicateControllers()
for _, c := range all {
err = c.Maintain()
err = c.Maintain(ctx)
if err != nil {
return
}
@ -25,12 +20,12 @@ func Maintain() (err error) {
}
// MaintainThorough runs the MaintainThorough method on all storages.
func MaintainThorough() (err error) {
func MaintainThorough(ctx context.Context) (err error) {
// copy, as we might use the very long
all := duplicateControllers()
for _, c := range all {
err = c.MaintainThorough()
err = c.MaintainThorough(ctx)
if err != nil {
return
}
@ -39,100 +34,21 @@ func MaintainThorough() (err error) {
}
// MaintainRecordStates runs record state lifecycle maintenance on all storages.
func MaintainRecordStates(ctx context.Context) error { //nolint:gocognit
// TODO: Put this in the storage interface to correctly maintain on all storages.
// Storages might check for deletion and expiry in the query interface and not return anything here.
// listen for ctx cancel
stop := abool.New()
doneCh := make(chan struct{}) // for goroutine cleanup
defer close(doneCh)
go func() {
select {
case <-ctx.Done():
case <-doneCh:
}
stop.Set()
}()
func MaintainRecordStates(ctx context.Context) (err error) {
// delete immediately for now
// TODO: increase purge threshold when starting to sync DBs
purgeDeletedBefore := time.Now().UTC()
// copy, as we might use the very long
all := duplicateControllers()
now := time.Now().Unix()
thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour).Unix()
for _, c := range all {
if stop.IsSet() {
return nil
}
if c.ReadOnly() || c.Injected() {
continue
}
q, err := query.New("").Check()
err = c.MaintainRecordStates(ctx, purgeDeletedBefore)
if err != nil {
return err
return
}
it, err := c.Query(q, true, true)
if err != nil {
return err
}
var toDelete []record.Record
var toExpire []record.Record
queryLoop:
for {
select {
case r := <-it.Next:
if r == nil {
break queryLoop
}
meta := r.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < thirtyDaysAgo:
toDelete = append(toDelete, r)
case meta.Expires > 0 && meta.Expires < now:
toExpire = append(toExpire, r)
}
case <-ctx.Done():
it.Cancel()
break queryLoop
}
}
if it.Err() != nil {
return err
}
if stop.IsSet() {
return nil
}
for _, r := range toDelete {
err := c.storage.Delete(r.DatabaseKey())
if err != nil {
return err
}
if stop.IsSet() {
return nil
}
}
for _, r := range toExpire {
r.Meta().Delete()
err := c.Put(r)
if err != nil {
return err
}
if stop.IsSet() {
return nil
}
}
}
return nil
return
}
func duplicateControllers() (all []*Controller) {

View file

@ -1,6 +1,7 @@
package badger
import (
"context"
"errors"
"fmt"
"time"
@ -193,19 +194,25 @@ func (b *Badger) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (b *Badger) Maintain() error {
func (b *Badger) Maintain(_ context.Context) error {
_ = b.db.RunValueLogGC(0.7)
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (b *Badger) MaintainThorough() (err error) {
func (b *Badger) MaintainThorough(_ context.Context) (err error) {
for err == nil {
err = b.db.RunValueLogGC(0.7)
}
return nil
}
// MaintainRecordStates maintains records states in the database.
func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
// TODO: implement MaintainRecordStates
return nil
}
// Shutdown shuts down the database.
func (b *Badger) Shutdown() error {
return b.db.Close()

View file

@ -2,6 +2,7 @@
package badger
import (
"context"
"io/ioutil"
"os"
"reflect"
@ -116,11 +117,11 @@ func TestBadger(t *testing.T) {
}
// maintenance
err = db.Maintain()
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough()
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}

View file

@ -2,6 +2,7 @@ package bbolt
import (
"bytes"
"context"
"errors"
"fmt"
"path/filepath"
@ -235,15 +236,69 @@ func (b *BBolt) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (b *BBolt) Maintain() error {
func (b *BBolt) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (b *BBolt) MaintainThorough() (err error) {
func (b *BBolt) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix()
return b.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
// Create a cursor for iteration.
c := bucket.Cursor()
for key, value := c.First(); key != nil; key, value = c.Next() {
// wrap value
wrapper, err := record.NewRawWrapper(b.name, string(key), value)
if err != nil {
return err
}
// check if we need to do maintenance
meta := wrapper.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
// delete from storage
err = c.Delete()
if err != nil {
return err
}
case meta.Expires > 0 && meta.Expires < now:
// mark as deleted
meta.Deleted = meta.Expires
deleted, err := wrapper.MarshalRecord(wrapper)
if err != nil {
return err
}
err = bucket.Put(key, deleted)
if err != nil {
return err
}
// reposition cursor
c.Seek(key)
}
time.Sleep(100 * time.Millisecond)
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
}
return nil
})
}
// Shutdown shuts down the database.
func (b *BBolt) Shutdown() error {
return b.db.Close()

View file

@ -2,11 +2,13 @@
package bbolt
import (
"context"
"io/ioutil"
"os"
"reflect"
"sync"
"testing"
"time"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
@ -144,11 +146,15 @@ func TestBBolt(t *testing.T) {
}
// maintenance
err = db.Maintain()
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough()
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainRecordStates(context.TODO(), time.Now())
if err != nil {
t.Fatal(err)
}

View file

@ -5,6 +5,7 @@ It is primarily meant for easy testing or storing big files that can easily be a
package fstree
import (
"context"
"errors"
"fmt"
"io/ioutil"
@ -255,12 +256,18 @@ func (fst *FSTree) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (fst *FSTree) Maintain() error {
func (fst *FSTree) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (fst *FSTree) MaintainThorough() error {
func (fst *FSTree) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
// TODO: implement MaintainRecordStates
return nil
}

View file

@ -1,6 +1,7 @@
package hashmap
import (
"context"
"errors"
"fmt"
"sync"
@ -146,12 +147,44 @@ func (hm *HashMap) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (hm *HashMap) Maintain() error {
func (hm *HashMap) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (hm *HashMap) MaintainThorough() (err error) {
func (hm *HashMap) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix()
for key, record := range hm.db {
meta := record.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
// delete from storage
delete(hm.db, key)
case meta.Expires > 0 && meta.Expires < now:
// mark as deleted
record.Lock()
meta.Deleted = meta.Expires
record.Unlock()
}
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
}
return nil
}

View file

@ -2,6 +2,7 @@
package hashmap
import (
"context"
"reflect"
"sync"
"testing"
@ -130,11 +131,11 @@ func TestHashMap(t *testing.T) {
}
// maintenance
err = db.Maintain()
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough()
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}

View file

@ -1,7 +1,9 @@
package storage
import (
"context"
"errors"
"time"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
@ -54,12 +56,17 @@ func (i *InjectBase) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (i *InjectBase) Maintain() error {
func (i *InjectBase) Maintain(ctx context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (i *InjectBase) MaintainThorough() error {
func (i *InjectBase) MaintainThorough(ctx context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
return nil
}

View file

@ -1,6 +1,9 @@
package storage
import (
"context"
"time"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
@ -15,8 +18,9 @@ type Interface interface {
ReadOnly() bool
Injected() bool
Maintain() error
MaintainThorough() error
Maintain(ctx context.Context) error
MaintainThorough(ctx context.Context) error
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error
Shutdown() error
}

View file

@ -1,7 +1,9 @@
package sinkhole
import (
"context"
"errors"
"time"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
@ -77,12 +79,17 @@ func (s *Sinkhole) Injected() bool {
}
// Maintain runs a light maintenance operation on the database.
func (s *Sinkhole) Maintain() error {
func (s *Sinkhole) Maintain(ctx context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (s *Sinkhole) MaintainThorough() (err error) {
func (s *Sinkhole) MaintainThorough(ctx context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
return nil
}

View file

@ -101,6 +101,8 @@ func SetDatabaseKeySpace(keySpace string) {
}
func printGraph() {
fmt.Println("subsystems dependency graph:")
// unmark subsystems module
module.Disable()
// mark roots
@ -111,6 +113,15 @@ func printGraph() {
for _, sub := range subsystems {
printModuleGraph("", sub.module, true)
}
fmt.Println("\nsubsystem module groups:")
_ = start() // no errors for what we need here
for _, sub := range subsystems {
fmt.Printf("├── %s\n", sub.Name)
for _, mod := range sub.Modules[1:] {
fmt.Printf("│ ├── %s\n", mod.Name)
}
}
}
func printModuleGraph(prefix string, module *modules.Module, root bool) {

View file

@ -67,6 +67,48 @@ func Get(id string) *Notification {
return nil
}
// NotifyInfo is a helper method for quickly showing a info
// notification. The notification is already shown. If id is
// an empty string a new UUIDv4 will be generated.
func NotifyInfo(id, msg string, actions ...Action) *Notification {
return notify(Info, id, msg, actions...)
}
// NotifyWarn is a helper method for quickly showing a warning
// notification. The notification is already shown. If id is
// an empty string a new UUIDv4 will be generated.
func NotifyWarn(id, msg string, actions ...Action) *Notification {
return notify(Warning, id, msg, actions...)
}
// NotifyPrompt is a helper method for quickly showing a prompt
// notification. The notification is already shown. If id is
// an empty string a new UUIDv4 will be generated.
func NotifyPrompt(id, msg string, actions ...Action) *Notification {
return notify(Prompt, id, msg, actions...)
}
func notify(nType uint8, id string, msg string, actions ...Action) *Notification {
acts := make([]*Action, len(actions))
for idx := range actions {
a := actions[idx]
acts[idx] = &a
}
if id == "" {
id = uuid.NewV4().String()
}
n := Notification{
ID: id,
Message: msg,
Type: nType,
AvailableActions: acts,
}
return n.Save()
}
// Save saves the notification and returns it.
func (n *Notification) Save() *Notification {
notsLock.Lock()

View file

@ -1,6 +1,7 @@
package updater
// Export exports the list of resources. All resources must be locked when accessed.
// Export exports the list of resources. All resources must be
// locked when accessed.
func (reg *ResourceRegistry) Export() map[string]*Resource {
reg.RLock()
defer reg.RUnlock()

View file

@ -25,7 +25,7 @@ func (reg *ResourceRegistry) fetchFile(rv *ResourceVersion, tries int) error {
// create URL
downloadURL, err := joinURLandPath(reg.UpdateURLs[tries%len(reg.UpdateURLs)], rv.versionedPath())
if err != nil {
return fmt.Errorf("error build url (%s + %s): %s", reg.UpdateURLs[tries%len(reg.UpdateURLs)], rv.versionedPath(), err)
return fmt.Errorf("error build url (%s + %s): %w", reg.UpdateURLs[tries%len(reg.UpdateURLs)], rv.versionedPath(), err)
}
// check destination dir
@ -39,14 +39,14 @@ func (reg *ResourceRegistry) fetchFile(rv *ResourceVersion, tries int) error {
// open file for writing
atomicFile, err := renameio.TempFile(reg.tmpDir.Path, rv.storagePath())
if err != nil {
return fmt.Errorf("could not create temp file for download: %s", err)
return fmt.Errorf("could not create temp file for download: %w", err)
}
defer atomicFile.Cleanup() //nolint:errcheck // ignore error for now, tmp dir will be cleaned later again anyway
// start file download
resp, err := http.Get(downloadURL) //nolint:gosec // url is variable on purpose
if err != nil {
return fmt.Errorf("error fetching url (%s): %s", downloadURL, err)
return fmt.Errorf("error fetching url (%s): %w", downloadURL, err)
}
defer resp.Body.Close()
@ -57,7 +57,7 @@ func (reg *ResourceRegistry) fetchFile(rv *ResourceVersion, tries int) error {
// download and write file
n, err := io.Copy(atomicFile, resp.Body)
if err != nil {
return fmt.Errorf("failed downloading %s: %s", downloadURL, err)
return fmt.Errorf("failed downloading %s: %w", downloadURL, err)
}
if resp.ContentLength != n {
return fmt.Errorf("download unfinished, written %d out of %d bytes", n, resp.ContentLength)
@ -66,7 +66,7 @@ func (reg *ResourceRegistry) fetchFile(rv *ResourceVersion, tries int) error {
// finalize file
err = atomicFile.CloseAtomicallyReplace()
if err != nil {
return fmt.Errorf("%s: failed to finalize file %s: %s", reg.Name, rv.storagePath(), err)
return fmt.Errorf("%s: failed to finalize file %s: %w", reg.Name, rv.storagePath(), err)
}
// set permissions
if !onWindows {
@ -90,13 +90,13 @@ func (reg *ResourceRegistry) fetchData(downloadPath string, tries int) ([]byte,
// create URL
downloadURL, err := joinURLandPath(reg.UpdateURLs[tries%len(reg.UpdateURLs)], downloadPath)
if err != nil {
return nil, fmt.Errorf("error build url (%s + %s): %s", reg.UpdateURLs[tries%len(reg.UpdateURLs)], downloadPath, err)
return nil, fmt.Errorf("error build url (%s + %s): %w", reg.UpdateURLs[tries%len(reg.UpdateURLs)], downloadPath, err)
}
// start file download
resp, err := http.Get(downloadURL) //nolint:gosec // url is variable on purpose
if err != nil {
return nil, fmt.Errorf("error fetching url (%s): %s", downloadURL, err)
return nil, fmt.Errorf("error fetching url (%s): %w", downloadURL, err)
}
defer resp.Body.Close()
@ -108,7 +108,7 @@ func (reg *ResourceRegistry) fetchData(downloadPath string, tries int) ([]byte,
buf := bytes.NewBuffer(make([]byte, 0, resp.ContentLength))
n, err := io.Copy(buf, resp.Body)
if err != nil {
return nil, fmt.Errorf("failed downloading %s: %s", downloadURL, err)
return nil, fmt.Errorf("failed downloading %s: %w", downloadURL, err)
}
if resp.ContentLength != n {
return nil, fmt.Errorf("download unfinished, written %d out of %d bytes", n, resp.ContentLength)

View file

@ -1,5 +1,7 @@
package updater
import "github.com/safing/portbase/log"
// File represents a file from the update system.
type File struct {
resource *Resource
@ -36,6 +38,7 @@ func (file *File) markActiveWithLocking() {
// update last used version
if file.resource.ActiveVersion != file.version {
log.Debugf("updater: setting active version of resource %s from %s to %s", file.resource.Identifier, file.resource.ActiveVersion, file.version.VersionNumber)
file.resource.ActiveVersion = file.version
}
}

View file

@ -37,7 +37,7 @@ func (reg *ResourceRegistry) GetFile(identifier string) (*File, error) {
// check download dir
err := reg.tmpDir.Ensure()
if err != nil {
return nil, fmt.Errorf("could not prepare tmp directory for download: %s", err)
return nil, fmt.Errorf("could not prepare tmp directory for download: %w", err)
}
// download file

16
updater/indexes.go Normal file
View file

@ -0,0 +1,16 @@
package updater
// Index describes an index file pulled by the updater.
type Index struct {
// Path is the path to the index file
// on the update server.
Path string
// Stable is set if the index file contains only stable
// releases.
Stable bool
// Beta is set if the index file contains beta
// releases.
Beta bool
}

View file

@ -20,6 +20,7 @@ type ResourceRegistry struct {
Name string
storageDir *utils.DirStructure
tmpDir *utils.DirStructure
indexes []Index
resources map[string]*Resource
UpdateURLs []string
@ -30,6 +31,14 @@ type ResourceRegistry struct {
Online bool
}
// AddIndex adds a new index to the resource registry.
func (reg *ResourceRegistry) AddIndex(idx Index) {
reg.Lock()
defer reg.Unlock()
reg.indexes = append(reg.indexes, idx)
}
// Initialize initializes a raw registry struct and makes it ready for usage.
func (reg *ResourceRegistry) Initialize(storageDir *utils.DirStructure) error {
// check if storage dir is available
@ -48,6 +57,18 @@ func (reg *ResourceRegistry) Initialize(storageDir *utils.DirStructure) error {
reg.tmpDir = storageDir.ChildDir("tmp", 0700)
reg.resources = make(map[string]*Resource)
// remove tmp dir to delete old entries
err = reg.Cleanup()
if err != nil {
log.Warningf("%s: failed to remove tmp dir: %s", reg.Name, err)
}
// (re-)create tmp dir
err = reg.tmpDir.Ensure()
if err != nil {
log.Warningf("%s: failed to create tmp dir: %s", reg.Name, err)
}
return nil
}

View file

@ -19,37 +19,90 @@ type Resource struct {
registry *ResourceRegistry
notifier *notifier
// Identifier is the unique identifier for that resource.
// It forms a file path using a forward-slash as the
// path separator.
Identifier string
Versions []*ResourceVersion
ActiveVersion *ResourceVersion
// Versions holds all available resource versions.
Versions []*ResourceVersion
// ActiveVersion is the last version of the resource
// that someone requested using GetFile().
ActiveVersion *ResourceVersion
// SelectedVersion is newest, selectable version of
// that resource that is available. A version
// is selectable if it's not blacklisted by the user.
// Note that it's not guaranteed that the selected version
// is available locally. In that case, GetFile will attempt
// to download the latest version from the updates servers
// specified in the resource registry.
SelectedVersion *ResourceVersion
ForceDownload bool
}
// ResourceVersion represents a single version of a resource.
type ResourceVersion struct {
resource *Resource
// VersionNumber is the string representation of the resource
// version.
VersionNumber string
semVer *semver.Version
Available bool
// Available indicates if this version is available locally.
Available bool
// StableRelease indicates that this version is part of
// a stable release index file.
StableRelease bool
BetaRelease bool
Blacklisted bool
// BetaRelease indicates that this version is part of
// a beta release index file.
BetaRelease bool
// Blacklisted may be set to true if this version should
// be skipped and not used. This is useful if the version
// is known to be broken.
Blacklisted bool
}
// Len is the number of elements in the collection. (sort.Interface for Versions)
func (rv *ResourceVersion) String() string {
return rv.VersionNumber
}
// isSelectable returns true if the version represented by rv is selectable.
// A version is selectable if it's not blacklisted and either already locally
// available or ready to be downloaded.
func (rv *ResourceVersion) isSelectable() bool {
return !rv.Blacklisted && (rv.Available || rv.resource.registry.Online)
}
// isBetaVersionNumber checks if rv is marked as a beta version by checking
// the version string. It does not honor the BetaRelease field of rv!
func (rv *ResourceVersion) isBetaVersionNumber() bool {
// "b" suffix check if for backwards compatibility
// new versions should use the pre-release suffix as
// declared by https://semver.org
// i.e. 1.2.3-beta
return strings.HasSuffix(rv.VersionNumber, "b") || strings.Contains(rv.semVer.Prerelease(), "beta")
}
// Len is the number of elements in the collection.
// It implements sort.Interface for ResourceVersion.
func (res *Resource) Len() int {
return len(res.Versions)
}
// Less reports whether the element with index i should sort before the element with index j. (sort.Interface for Versions)
// Less reports whether the element with index i should
// sort before the element with index j.
// It implements sort.Interface for ResourceVersions.
func (res *Resource) Less(i, j int) bool {
return res.Versions[i].semVer.GreaterThan(res.Versions[j].semVer)
}
// Swap swaps the elements with indexes i and j. (sort.Interface for Versions)
// Swap swaps the elements with indexes i and j.
// It implements sort.Interface for ResourceVersions.
func (res *Resource) Swap(i, j int) {
res.Versions[i], res.Versions[j] = res.Versions[j], res.Versions[i]
}
@ -64,6 +117,20 @@ func (res *Resource) available() bool {
return false
}
// inUse returns true if the resource is currently in use.
func (res *Resource) inUse() bool {
return res.ActiveVersion != nil
}
// AnyVersionAvailable returns true if any version of
// res is locally available.
func (res *Resource) AnyVersionAvailable() bool {
res.Lock()
defer res.Unlock()
return res.available()
}
func (reg *ResourceRegistry) newResource(identifier string) *Resource {
return &Resource{
registry: reg,
@ -154,18 +221,24 @@ func (res *Resource) GetFile() *File {
}
}
//nolint:gocognit // function already kept as simlpe as possible
//nolint:gocognit // function already kept as simple as possible
func (res *Resource) selectVersion() {
sort.Sort(res)
// export after we finish
defer func() {
if res.ActiveVersion != nil && // resource has already been used
log.Debugf("updater: selected version %s for resource %s", res.SelectedVersion, res.Identifier)
if res.inUse() &&
res.SelectedVersion != res.ActiveVersion && // new selected version does not match previously selected version
res.notifier != nil {
res.notifier.markAsUpgradeable()
res.notifier = nil
log.Debugf("updater: active version of %s is %s, update available", res.Identifier, res.ActiveVersion.VersionNumber)
}
}()
if len(res.Versions) == 0 {
@ -190,7 +263,7 @@ func (res *Resource) selectVersion() {
if res.registry.Beta {
for _, rv := range res.Versions {
if rv.BetaRelease {
if !rv.Blacklisted && (rv.Available || rv.resource.registry.Online) {
if rv.isSelectable() {
res.SelectedVersion = rv
return
}
@ -202,7 +275,7 @@ func (res *Resource) selectVersion() {
// 3) Stable release
for _, rv := range res.Versions {
if rv.StableRelease {
if !rv.Blacklisted && (rv.Available || rv.resource.registry.Online) {
if rv.isSelectable() {
res.SelectedVersion = rv
return
}
@ -212,7 +285,7 @@ func (res *Resource) selectVersion() {
// 4) Latest stable release
for _, rv := range res.Versions {
if !strings.HasSuffix(rv.VersionNumber, "b") && !rv.Blacklisted && (rv.Available || rv.resource.registry.Online) {
if !rv.isBetaVersionNumber() && rv.isSelectable() {
res.SelectedVersion = rv
return
}
@ -220,7 +293,7 @@ func (res *Resource) selectVersion() {
// 5) Latest of any type
for _, rv := range res.Versions {
if !rv.Blacklisted && (rv.Available || rv.resource.registry.Online) {
if rv.isSelectable() {
res.SelectedVersion = rv
return
}
@ -228,6 +301,7 @@ func (res *Resource) selectVersion() {
// 6) Default to newest
res.SelectedVersion = res.Versions[0]
log.Warningf("updater: falling back to version %s for %s because we failed to find a selectable one", res.SelectedVersion, res.Identifier)
}
// Blacklist blacklists the specified version and selects a new version.
@ -235,7 +309,7 @@ func (res *Resource) Blacklist(version string) error {
res.Lock()
defer res.Unlock()
// count already blacklisted entries
// count available and valid versions
valid := 0
for _, rv := range res.Versions {
if rv.VersionNumber == "0" {
@ -262,7 +336,11 @@ func (res *Resource) Blacklist(version string) error {
return errors.New("could not find version")
}
// Purge deletes old updates, retaining a certain amount, specified by the keep parameter. Will at least keep 2 updates per resource. After purging, new versions will be selected.
// Purge deletes old updates, retaining a certain amount, specified by
// the keep parameter. Purge will always keep at least 2 versions so
// specifying a smaller keep value will have no effect. Note that
// blacklisted versions are not counted for the keep parameter.
// After purging a new version will be selected.
func (res *Resource) Purge(keep int) {
res.Lock()
defer res.Unlock()

View file

@ -13,7 +13,11 @@ import (
"github.com/safing/portbase/utils"
)
// ScanStorage scans root within the storage dir and adds found resources to the registry. If an error occurred, it is logged and the last error is returned. Everything that was found despite errors is added to the registry anyway. Leave root empty to scan the full storage dir.
// ScanStorage scans root within the storage dir and adds found
// resources to the registry. If an error occurred, it is logged
// and the last error is returned. Everything that was found
// despite errors is added to the registry anyway. Leave root
// empty to scan the full storage dir.
func (reg *ResourceRegistry) ScanStorage(root string) error {
var lastError error
@ -34,7 +38,7 @@ func (reg *ResourceRegistry) ScanStorage(root string) error {
// walk fs
_ = filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
lastError = fmt.Errorf("%s: could not read %s: %s", reg.Name, path, err)
lastError = fmt.Errorf("%s: could not read %s: %w", reg.Name, path, err)
log.Warning(lastError.Error())
return nil
}
@ -42,7 +46,7 @@ func (reg *ResourceRegistry) ScanStorage(root string) error {
// get relative path to storage
relativePath, err := filepath.Rel(reg.storageDir.Path, path)
if err != nil {
lastError = fmt.Errorf("%s: could not get relative path of %s: %s", reg.Name, path, err)
lastError = fmt.Errorf("%s: could not get relative path of %s: %w", reg.Name, path, err)
log.Warning(lastError.Error())
return nil
}
@ -62,7 +66,7 @@ func (reg *ResourceRegistry) ScanStorage(root string) error {
// save
err = reg.AddResource(identifier, version, true, false, false)
if err != nil {
lastError = fmt.Errorf("%s: could not get add resource %s v%s: %s", reg.Name, identifier, version, err)
lastError = fmt.Errorf("%s: could not get add resource %s v%s: %w", reg.Name, identifier, version, err)
log.Warning(lastError.Error())
}
return nil
@ -71,29 +75,42 @@ func (reg *ResourceRegistry) ScanStorage(root string) error {
return lastError
}
// LoadIndexes loads the current release indexes from disk and will fetch a new version if not available and online.
// LoadIndexes loads the current release indexes from disk
// or will fetch a new version if not available and the
// registry is marked as online.
func (reg *ResourceRegistry) LoadIndexes() error {
err := reg.loadIndexFile("stable.json", true, false)
if err != nil {
err = reg.downloadIndex("stable.json", true, false)
if err != nil {
return err
var firstErr error
for _, idx := range reg.getIndexes() {
err := reg.loadIndexFile(idx)
if err == nil {
log.Debugf("%s: loaded index %s", reg.Name, idx.Path)
} else if reg.Online {
// try to download the index file if a local disk version
// does not exist or we don't have permission to read it.
if os.IsNotExist(err) || os.IsPermission(err) {
err = reg.downloadIndex(idx)
}
}
if err != nil && firstErr == nil {
firstErr = err
}
}
err = reg.loadIndexFile("beta.json", false, true)
if err != nil {
err = reg.downloadIndex("beta.json", false, true)
if err != nil {
return err
}
}
return nil
return firstErr
}
func (reg *ResourceRegistry) loadIndexFile(name string, stableRelease, betaRelease bool) error {
data, err := ioutil.ReadFile(filepath.Join(reg.storageDir.Path, name))
func (reg *ResourceRegistry) getIndexes() []Index {
reg.RLock()
defer reg.RUnlock()
indexes := make([]Index, len(reg.indexes))
copy(indexes, reg.indexes)
return indexes
}
func (reg *ResourceRegistry) loadIndexFile(idx Index) error {
path := filepath.FromSlash(idx.Path)
data, err := ioutil.ReadFile(filepath.Join(reg.storageDir.Path, path))
if err != nil {
return err
}
@ -105,26 +122,26 @@ func (reg *ResourceRegistry) loadIndexFile(name string, stableRelease, betaRelea
}
if len(releases) == 0 {
return fmt.Errorf("%s is empty", name)
return fmt.Errorf("%s is empty", path)
}
err = reg.AddResources(releases, false, stableRelease, betaRelease)
err = reg.AddResources(releases, false, idx.Stable, idx.Beta)
if err != nil {
log.Warningf("%s: failed to add resource: %s", reg.Name, err)
}
return nil
}
// CreateSymlinks creates a directory structure with unversions symlinks to the given updates list.
// CreateSymlinks creates a directory structure with unversioned symlinks to the given updates list.
func (reg *ResourceRegistry) CreateSymlinks(symlinkRoot *utils.DirStructure) error {
err := os.RemoveAll(symlinkRoot.Path)
if err != nil {
return fmt.Errorf("failed to wipe symlink root: %s", err)
return fmt.Errorf("failed to wipe symlink root: %w", err)
}
err = symlinkRoot.Ensure()
if err != nil {
return fmt.Errorf("failed to create symlink root: %s", err)
return fmt.Errorf("failed to create symlink root: %w", err)
}
reg.RLock()
@ -141,17 +158,17 @@ func (reg *ResourceRegistry) CreateSymlinks(symlinkRoot *utils.DirStructure) err
err = symlinkRoot.EnsureAbsPath(linkPathDir)
if err != nil {
return fmt.Errorf("failed to create dir for link: %s", err)
return fmt.Errorf("failed to create dir for link: %w", err)
}
relativeTargetPath, err := filepath.Rel(linkPathDir, targetPath)
if err != nil {
return fmt.Errorf("failed to get relative target path: %s", err)
return fmt.Errorf("failed to get relative target path: %w", err)
}
err = os.Symlink(relativeTargetPath, linkPath)
if err != nil {
return fmt.Errorf("failed to link %s: %s", res.Identifier, err)
return fmt.Errorf("failed to link %s: %w", res.Identifier, err)
}
}

View file

@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"github.com/safing/portbase/utils"
@ -13,53 +12,61 @@ import (
"github.com/safing/portbase/log"
)
// UpdateIndexes downloads the current update indexes.
// UpdateIndexes downloads all indexes and returns the first error encountered.
func (reg *ResourceRegistry) UpdateIndexes() error {
err := reg.downloadIndex("stable.json", true, false)
if err != nil {
return err
var firstErr error
for _, idx := range reg.getIndexes() {
if err := reg.downloadIndex(idx); err != nil {
if firstErr == nil {
firstErr = err
}
}
}
return reg.downloadIndex("beta.json", false, true)
return firstErr
}
func (reg *ResourceRegistry) downloadIndex(name string, stableRelease, betaRelease bool) error {
func (reg *ResourceRegistry) downloadIndex(idx Index) error {
var err error
var data []byte
// download new index
for tries := 0; tries < 3; tries++ {
data, err = reg.fetchData(name, tries)
data, err = reg.fetchData(idx.Path, tries)
if err == nil {
break
}
}
if err != nil {
return fmt.Errorf("failed to download index %s: %s", name, err)
return fmt.Errorf("failed to download index %s: %w", idx.Path, err)
}
// parse
new := make(map[string]string)
err = json.Unmarshal(data, &new)
if err != nil {
return fmt.Errorf("failed to parse index %s: %s", name, err)
return fmt.Errorf("failed to parse index %s: %w", idx.Path, err)
}
// check for content
if len(new) == 0 {
return fmt.Errorf("index %s is empty", name)
return fmt.Errorf("index %s is empty", idx.Path)
}
// add resources to registry
_ = reg.AddResources(new, false, stableRelease, betaRelease)
// save index
err = ioutil.WriteFile(filepath.Join(reg.storageDir.Path, name), data, 0644)
err = reg.AddResources(new, false, idx.Stable, idx.Beta)
if err != nil {
log.Warningf("%s: failed to save updated index %s: %s", reg.Name, name, err)
log.Warningf("%s: failed to add resources: %s", reg.Name, err)
}
log.Infof("%s: updated index %s", reg.Name, name)
// save index
err = ioutil.WriteFile(filepath.Join(reg.storageDir.Path, idx.Path), data, 0644)
if err != nil {
log.Warningf("%s: failed to save updated index %s: %s", reg.Name, idx.Path, err)
}
log.Infof("%s: updated index %s", reg.Name, idx.Path)
return nil
}
@ -72,7 +79,7 @@ func (reg *ResourceRegistry) DownloadUpdates(ctx context.Context) error {
res.Lock()
// check if we want to download
if res.ActiveVersion != nil || // resource is currently being used
if res.inUse() ||
res.available() || // resource was used in the past
utils.StringInSlice(reg.MandatoryUpdates, res.Identifier) { // resource is mandatory
@ -97,7 +104,7 @@ func (reg *ResourceRegistry) DownloadUpdates(ctx context.Context) error {
// check download dir
err := reg.tmpDir.Ensure()
if err != nil {
return fmt.Errorf("could not prepare tmp directory for download: %s", err)
return fmt.Errorf("could not prepare tmp directory for download: %w", err)
}
// download updates
@ -106,6 +113,7 @@ func (reg *ResourceRegistry) DownloadUpdates(ctx context.Context) error {
for tries := 0; tries < 3; tries++ {
err = reg.fetchFile(rv, tries)
if err == nil {
rv.Available = true
break
}
}
@ -115,11 +123,5 @@ func (reg *ResourceRegistry) DownloadUpdates(ctx context.Context) error {
}
log.Infof("%s: finished downloading updates", reg.Name)
// remove tmp folder after we are finished
err = os.RemoveAll(reg.tmpDir.Path)
if err != nil {
log.Tracef("%s: failed to remove tmp dir %s after downloading updates: %s", reg.Name, reg.tmpDir.Path, err)
}
return nil
}