Continue work on database module

This commit is contained in:
Daniel 2018-09-06 19:06:13 +02:00
parent 7ad09b60c1
commit b8e7f90dbe
25 changed files with 962 additions and 1073 deletions

View file

@ -1,42 +1,89 @@
package database package database
import ( import (
"github.com/Safing/portbase/database/record" "sync"
"time"
"github.com/tevino/abool"
"github.com/Safing/portbase/database/iterator"
"github.com/Safing/portbase/database/query"
"github.com/Safing/portbase/database/record"
"github.com/Safing/portbase/database/storage"
) )
// A Controller takes care of all the extra database logic.
type Controller struct { type Controller struct {
storage storage storage.Interface
writeLock sync.RWMutex writeLock sync.RWMutex
readLock sync.RWMutex readLock sync.RWMutex
migrating *abool.AtomicBool migrating *abool.AtomicBool
} }
func NewController() (*Controller, error) { // newController creates a new controller for a storage.
func newController(storageInt storage.Interface) (*Controller, error) {
return &Controller{
storage: storageInt,
migrating: abool.NewBool(false),
}, nil
} }
// Retrieve // Get return the record with the given key.
func (c *Controller) Exists(key string) (bool, error) {} func (c *Controller) Get(key string) (record.Record, error) {
func (c *Controller) Get(key string) (record.Record, error) {} r, err := c.storage.Get(key)
if err != nil {
return nil, err
}
// Modify if !r.Meta().CheckValidity(time.Now().Unix()) {
func (c *Controller) Create(model record.Record) error {} return nil, ErrNotFound
// create when not exists }
func (c *Controller) Update(model record.Record) error {}
// update, create if not exists. return r, nil
func (c *Controller) UpdateOrCreate(model record.Record) error {} }
func (c *Controller) Delete(key string) error {}
// Put saves a record in the database.
func (c *Controller) Put(r record.Record) error {
return c.storage.Put(r)
}
func (c *Controller) Delete(key string) error {
r, err := c.Get(key)
if err != nil {
return err
}
r.Meta().Deleted = time.Now().Unix()
return c.Put(r)
}
// Partial // Partial
// What happens if I mutate a value that does not yet exist? How would I know its type? // What happens if I mutate a value that does not yet exist? How would I know its type?
func (c *Controller) InsertPartial(key string, partialObject interface{}) {} func (c *Controller) InsertPartial(key string, partialObject interface{}) error {
func (c *Controller) InsertValue(key string, attribute string, value interface{}) {} return nil
}
func (c *Controller) InsertValue(key string, attribute string, value interface{}) error {
return nil
}
// Query // Query
func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {} func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
return nil, nil
}
// Meta // Meta
func (c *Controller) SetAbsoluteExpiry(key string, time int64) {} func (c *Controller) SetAbsoluteExpiry(key string, time int64) error {
func (c *Controller) SetRelativateExpiry(key string, duration int64) {} return nil
func (c *Controller) MakeCrownJewel(key string) {} }
func (c *Controller) MakeSecret(key string) {}
func (c *Controller) SetRelativateExpiry(key string, duration int64) error {
return nil
}
func (c *Controller) MakeCrownJewel(key string) error {
return nil
}
func (c *Controller) MakeSecret(key string) error {
return nil
}

View file

@ -4,17 +4,13 @@ package database
import ( import (
"errors" "errors"
"fmt"
"os"
"path"
"strings"
"github.com/Safing/safing-core/database/dbutils"
"github.com/Safing/safing-core/log"
"github.com/Safing/safing-core/meta"
) )
var ErrNotFound = errors.New("database: entry could not be found") // Errors
var (
ErrNotFound = errors.New("database: entry could not be found")
ErrPermissionDenied = errors.New("database: access to record denied")
)
func init() { func init() {
// if strings.HasSuffix(os.Args[0], ".test") { // if strings.HasSuffix(os.Args[0], ".test") {
@ -59,84 +55,66 @@ func init() {
// return db.Close() // return db.Close()
// } // }
func Get(key *ds.Key) (Model, error) { // func Get(key *ds.Key) (Model, error) {
data, err := db.Get(*key) // data, err := db.Get(*key)
if err != nil { // if err != nil {
switch err { // switch err {
case ds.ErrNotFound: // case ds.ErrNotFound:
return nil, ErrNotFound // return nil, ErrNotFound
default: // default:
return nil, err // return nil, err
} // }
} // }
model, ok := data.(Model) // model, ok := data.(Model)
if !ok { // if !ok {
return nil, errors.New("database did not return model") // return nil, errors.New("database did not return model")
} // }
return model, nil // return model, nil
} // }
func GetAndEnsureModel(namespace *ds.Key, name string, model Model) (Model, error) { // func Has(key ds.Key) (exists bool, err error) {
newKey := namespace.ChildString(getTypeName(model)).Instance(name) // return db.Has(key)
// }
data, err := Get(&newKey) //
if err != nil { // func Create(key ds.Key, model Model) (err error) {
return nil, err // handleCreateSubscriptions(model)
} // err = db.Put(key, model)
// if err != nil {
newModel, err := EnsureModel(data, model) // log.Tracef("database: failed to create entry %s: %s", key, err)
if err != nil { // }
return nil, err // return err
} // }
//
newModel.SetKey(&newKey) // func Update(key ds.Key, model Model) (err error) {
// handleUpdateSubscriptions(model)
return newModel, nil // err = db.Put(key, model)
} // if err != nil {
// log.Tracef("database: failed to update entry %s: %s", key, err)
func Has(key ds.Key) (exists bool, err error) { // }
return db.Has(key) // return err
} // }
//
func Create(key ds.Key, model Model) (err error) { // func Delete(key ds.Key) (err error) {
handleCreateSubscriptions(model) // handleDeleteSubscriptions(&key)
err = db.Put(key, model) // return db.Delete(key)
if err != nil { // }
log.Tracef("database: failed to create entry %s: %s", key, err) //
} // func Query(q dsq.Query) (dsq.Results, error) {
return err // return db.Query(q)
} // }
//
func Update(key ds.Key, model Model) (err error) { // func RawGet(key ds.Key) (*dbutils.Wrapper, error) {
handleUpdateSubscriptions(model) // data, err := db.Get(key)
err = db.Put(key, model) // if err != nil {
if err != nil { // return nil, err
log.Tracef("database: failed to update entry %s: %s", key, err) // }
} // wrapped, ok := data.(*dbutils.Wrapper)
return err // if !ok {
} // return nil, errors.New("returned data is not a wrapper")
// }
func Delete(key ds.Key) (err error) { // return wrapped, nil
handleDeleteSubscriptions(&key) // }
return db.Delete(key) //
} // func RawPut(key ds.Key, value interface{}) error {
// return db.Put(key, value)
func Query(q dsq.Query) (dsq.Results, error) { // }
return db.Query(q)
}
func RawGet(key ds.Key) (*dbutils.Wrapper, error) {
data, err := db.Get(key)
if err != nil {
return nil, err
}
wrapped, ok := data.(*dbutils.Wrapper)
if !ok {
return nil, errors.New("returned data is not a wrapper")
}
return wrapped, nil
}
func RawPut(key ds.Key, value interface{}) error {
return db.Put(key, value)
}

View file

@ -1,56 +1,95 @@
package database package database
import (
"errors"
"sync"
"fmt"
"path"
var ( "github.com/Safing/portbase/database/storage"
databases = make(map[string]*Controller) "github.com/Safing/portbase/database/record"
databasesLock sync.Mutex
) )
func getDatabase(name string) *Controller { var (
databasesLock.Lock() databases = make(map[string]*Controller)
defer databasesLock.Unlock() databasesLock sync.Mutex
storage, ok := databases[name] )
if ok {
return
}
}
func databaseExists(name string) (exists bool) { func splitKeyAndGetDatabase(key string) (dbKey string, db *Controller, err error) {
// check if folder exists var dbName string
return true dbName, dbKey = record.ParseKey(key)
} db, err = getDatabase(dbName)
// CreateDatabase creates a new database with given name and type.
func CreateDatabase(name string, storageType string) error {
databasesLock.Lock()
defer databasesLock.Unlock()
_, ok := databases[name]
if ok {
return errors.New("database with this name already loaded.")
}
if databaseExists(name) {
return errors.New("database with this name already exists.")
}
iface, err := startDatabase(name)
if err != nil { if err != nil {
return err return "", nil, err
} }
databases[name] = iface return
return nil }
func getDatabase(name string) (*Controller, error) {
if !initialized.IsSet() {
return nil, errors.New("database not initialized")
}
databasesLock.Lock()
defer databasesLock.Unlock()
// return database if already started
db, ok := databases[name]
if ok {
return db, nil
}
registryLock.Lock()
defer registryLock.Unlock()
// check if database exists at all
registeredDB, ok := registry[name]
if !ok {
return nil, fmt.Errorf(`database "%s" not registered`, name)
}
// start database
storageInt, err := storage.StartDatabase(name, registeredDB.StorageType, path.Join(rootDir, name, registeredDB.StorageType))
if err != nil {
return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err)
}
db, err = newController(storageInt)
if err != nil {
return nil, fmt.Errorf(`could not create controller for database %s: %s`, name, err)
}
databases[name] = db
return db, nil
} }
// InjectDatabase injects an already running database into the system. // InjectDatabase injects an already running database into the system.
func InjectDatabase(name string, iface *storage.Interface) error { func InjectDatabase(name string, storageInt storage.Interface) error {
databasesLock.Lock() databasesLock.Lock()
defer databasesLock.Unlock() defer databasesLock.Unlock()
_, ok := databases[name]
if ok { _, ok := databases[name]
return errors.New("database with this name already loaded.") if ok {
return errors.New(`database "%s" already loaded`)
}
registryLock.Lock()
defer registryLock.Unlock()
// check if database is registered
registeredDB, ok := registry[name]
if !ok {
return fmt.Errorf(`database "%s" not registered`, name)
} }
if databaseExists(name) { if registeredDB.StorageType != "injected" {
return errors.New("database with this name already exists.") return fmt.Errorf(`database not of type "injected"`)
} }
databases[name] = iface
return nil db, err := newController(storageInt)
if err != nil {
return fmt.Errorf(`could not create controller for database %s: %s`, name, err)
}
databases[name] = db
return nil
} }

29
database/dbmodule/db.go Normal file
View file

@ -0,0 +1,29 @@
package dbmodule
import (
"github.com/Safing/portbase/database"
)
var (
databaseDir string
)
func init() {
flag.StringVar(&databaseDir, "db", "", "set database directory")
modules.Register("database", prep, start, stop)
}
func prep() error {
if databaseDir == "" {
return errors.New("no database location specified, set with `-db=/path/to/db`")
}
}
func start() error {
return database.Initialize(databaseDir)
}
func stop() {
return database.Shutdown()
}

View file

@ -1,84 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package database
import (
"errors"
"fmt"
"strings"
dsq "github.com/ipfs/go-datastore/query"
)
type FilterMaxDepth struct {
MaxDepth int
}
func (f FilterMaxDepth) Filter(entry dsq.Entry) bool {
return strings.Count(entry.Key, "/") <= f.MaxDepth
}
type FilterKeyLength struct {
Length int
}
func (f FilterKeyLength) Filter(entry dsq.Entry) bool {
return len(entry.Key) == f.Length
}
func EasyQueryIterator(subscriptionKey string) (dsq.Results, error) {
query := dsq.Query{}
namespaces := strings.Split(subscriptionKey, "/")[1:]
lastSpace := ""
if len(namespaces) != 0 {
lastSpace = namespaces[len(namespaces)-1]
}
switch {
case lastSpace == "":
// get all children
query.Prefix = subscriptionKey
case strings.HasPrefix(lastSpace, "*"):
// get children to defined depth
query.Prefix = strings.Trim(subscriptionKey, "*")
query.Filters = []dsq.Filter{
FilterMaxDepth{len(lastSpace) + len(namespaces) - 1},
}
case strings.Contains(lastSpace, ":"):
query.Prefix = subscriptionKey
query.Filters = []dsq.Filter{
FilterKeyLength{len(query.Prefix)},
}
default:
// get only from this location and this type
query.Prefix = subscriptionKey + ":"
query.Filters = []dsq.Filter{
FilterMaxDepth{len(namespaces)},
}
}
// log.Tracef("easyquery: %s has prefix %s", subscriptionKey, query.Prefix)
results, err := db.Query(query)
if err != nil {
return nil, errors.New(fmt.Sprintf("easyquery: %s", err))
}
return results, nil
}
func EasyQuery(subscriptionKey string) (*[]dsq.Entry, error) {
results, err := EasyQueryIterator(subscriptionKey)
if err != nil {
return nil, err
}
entries, err := results.Rest()
if err != nil {
return nil, errors.New(fmt.Sprintf("easyquery: %s", err))
}
return &entries, nil
}

View file

@ -1,68 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package database
import (
"testing"
datastore "github.com/ipfs/go-datastore"
)
func testQuery(t *testing.T, queryString string, expecting []string) {
entries, err := EasyQuery(queryString)
if err != nil {
t.Errorf("error in query %s: %s", queryString, err)
}
totalExcepted := len(expecting)
total := 0
fail := false
keys := datastore.EntryKeys(*entries)
resultLoop:
for _, key := range keys {
total++
for _, expectedName := range expecting {
if key.Name() == expectedName {
continue resultLoop
}
}
fail = true
break
}
if !fail && total == totalExcepted {
return
}
t.Errorf("Query %s got %s, expected %s", queryString, keys, expecting)
}
func TestEasyQuery(t *testing.T) {
// setup test data
(&(TestingModel{})).CreateInNamespace("EasyQuery", "1")
(&(TestingModel{})).CreateInNamespace("EasyQuery", "2")
(&(TestingModel{})).CreateInNamespace("EasyQuery", "3")
(&(TestingModel{})).CreateInNamespace("EasyQuery/A", "4")
(&(TestingModel{})).CreateInNamespace("EasyQuery/A/B", "5")
(&(TestingModel{})).CreateInNamespace("EasyQuery/A/B/C", "6")
(&(TestingModel{})).CreateInNamespace("EasyQuery/A/B/C/D", "7")
(&(TestingModel{})).CreateWithTypeName("EasyQuery", "ConfigModel", "X")
(&(TestingModel{})).CreateWithTypeName("EasyQuery", "ConfigModel", "Y")
(&(TestingModel{})).CreateWithTypeName("EasyQuery/A", "ConfigModel", "Z")
testQuery(t, "/Tests/EasyQuery/TestingModel", []string{"1", "2", "3"})
testQuery(t, "/Tests/EasyQuery/TestingModel:1", []string{"1"})
testQuery(t, "/Tests/EasyQuery/ConfigModel", []string{"X", "Y"})
testQuery(t, "/Tests/EasyQuery/ConfigModel:Y", []string{"Y"})
testQuery(t, "/Tests/EasyQuery/A/", []string{"Z", "4", "5", "6", "7"})
testQuery(t, "/Tests/EasyQuery/A/B/**", []string{"5", "6"})
}

View file

@ -1,27 +1,69 @@
package database package database
import (
"github.com/Safing/portbase/database/record"
)
// Interface provides a method to access the database with attached options. // Interface provides a method to access the database with attached options.
type Interface struct {} type Interface struct {
options *Options
}
// Options holds options that may be set for an Interface instance. // Options holds options that may be set for an Interface instance.
type Options struct { type Options struct {
Local bool Local bool
Internal bool Internal bool
AlwaysMakeSecret bool AlwaysMakeSecret bool
AlwaysMakeCrownjewel bool AlwaysMakeCrownjewel bool
} }
// NewInterface returns a new Interface to the database. // NewInterface returns a new Interface to the database.
func NewInterface(opts *Options) *Interface { func NewInterface(opts *Options) *Interface {
return &Interface{ if opts == nil {
local: local, opts = &Options{}
internal: internal, }
}
return &Interface{
options: opts,
}
} }
// Exists return whether a record with the given key exists.
func (i *Interface) Exists(key string) (bool, error) {
_, err := i.getRecord(key)
if err != nil {
if err == ErrNotFound {
return false, nil
}
return false, err
}
return true, nil
}
// Get return the record with the given key.
func (i *Interface) Get(key string) (record.Record, error) { func (i *Interface) Get(key string) (record.Record, error) {
r, err := i.getRecord(key)
if err != nil {
return nil, err
}
controller if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) {
return nil, ErrPermissionDenied
}
return nil, nil return r, nil
}
func (i *Interface) getRecord(key string) (record.Record, error) {
dbKey, db, err := splitKeyAndGetDatabase(key)
if err != nil {
return nil, err
}
r, err := db.Get(dbKey)
if err != nil {
return nil, err
}
return r, nil
} }

14
database/location.go Normal file
View file

@ -0,0 +1,14 @@
package database
import (
"path"
)
var (
rootDir string
)
// getLocation returns the storage location for the given name and type.
func getLocation(name, storageType string) (location string, err error) {
return path.Join(rootDir, name, storageType), nil
}

View file

@ -1,64 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package database
import (
"fmt"
"sync"
"github.com/Safing/safing-core/database/dbutils"
"github.com/Safing/safing-core/formats/dsd"
)
// Model Registration
var (
registeredModels = make(map[string]func() Model)
registeredModelsLock sync.RWMutex
)
func RegisterModel(model Model, constructor func() Model) {
registeredModelsLock.Lock()
defer registeredModelsLock.Unlock()
registeredModels[fmt.Sprintf("%T", model)] = constructor
}
func NewModel(model Model) (Model, error) {
registeredModelsLock.RLock()
defer registeredModelsLock.RUnlock()
constructor, ok := registeredModels[fmt.Sprintf("%T", model)]
if !ok {
return nil, fmt.Errorf("database: cannot create new %T, not registered", model)
}
return constructor(), nil
}
func EnsureModel(uncertain, model Model) (Model, error) {
wrappedObj, ok := uncertain.(*dbutils.Wrapper)
if !ok {
return uncertain, nil
}
newModel, err := NewModel(model)
if err != nil {
return nil, err
}
_, err = dsd.Load(wrappedObj.Data, &newModel)
if err != nil {
return nil, fmt.Errorf("database: failed to unwrap %T: %s", model, err)
}
newModel.SetKey(wrappedObj.GetKey())
model = newModel
return newModel, nil
}
func SilentEnsureModel(uncertain, model Model) Model {
obj, err := EnsureModel(uncertain, model)
if err != nil {
return nil
}
return obj
}
func NewMismatchError(got, expected interface{}) error {
return fmt.Errorf("database: entry (%T) does not match expected model (%T)", got, expected)
}

View file

@ -1,108 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package database
import (
"testing"
datastore "github.com/ipfs/go-datastore"
)
type TestingModel struct {
Base
Name string
Value string
}
var testingModel *TestingModel
func init() {
RegisterModel(testingModel, func() Model { return new(TestingModel) })
}
func (m *TestingModel) Create(name string) error {
return m.CreateObject(&Tests, name, m)
}
func (m *TestingModel) CreateInNamespace(namespace string, name string) error {
testsNamescace := Tests.ChildString(namespace)
return m.CreateObject(&testsNamescace, name, m)
}
func (m *TestingModel) CreateWithTypeName(namespace string, typeName string, name string) error {
customNamespace := Tests.ChildString(namespace).ChildString(typeName).Instance(name)
m.dbKey = &customNamespace
handleCreateSubscriptions(m)
return Create(*m.dbKey, m)
}
func (m *TestingModel) Save() error {
return m.SaveObject(m)
}
func GetTestingModel(name string) (*TestingModel, error) {
return GetTestingModelFromNamespace(&Tests, name)
}
func GetTestingModelFromNamespace(namespace *datastore.Key, name string) (*TestingModel, error) {
object, err := GetAndEnsureModel(namespace, name, testingModel)
if err != nil {
return nil, err
}
model, ok := object.(*TestingModel)
if !ok {
return nil, NewMismatchError(object, testingModel)
}
return model, nil
}
func TestModel(t *testing.T) {
// create
m := TestingModel{
Name: "a",
Value: "b",
}
// newKey := datastore.NewKey("/Tests/TestingModel:test")
// m.dbKey = &newKey
// err := Put(*m.dbKey, m)
err := m.Create("")
if err != nil {
t.Errorf("database test: could not create object: %s", err)
}
// get
o, err := GetTestingModel(m.dbKey.Name())
if err != nil {
t.Errorf("database test: failed to get model: %s (%s)", err, m.dbKey.Name())
}
// check fetched object
if o.Name != "a" || o.Value != "b" {
t.Errorf("database test: values do not match: got Name=%s and Value=%s", o.Name, o.Value)
}
// o, err := Get(*m.dbKey)
// if err != nil {
// t.Errorf("database: could not get object: %s", err)
// }
// n, ok := o.(*TestingModel)
// if !ok {
// t.Errorf("database: wrong type, got type %T from %s", o, m.dbKey.String())
// }
// save
o.Value = "c"
err = o.Save()
if err != nil {
t.Errorf("database test: could not save object: %s", err)
}
// delete
err = o.Delete()
if err != nil {
t.Errorf("database test: could not delete object: %s", err)
}
}

View file

@ -1,37 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package database
import (
"time"
"github.com/Safing/safing-core/formats/dsd"
"github.com/Safing/safing-core/log"
dsq "github.com/ipfs/go-datastore/query"
)
func init() {
// go dumper()
}
func dumper() {
for {
time.Sleep(10 * time.Second)
result, err := db.Query(dsq.Query{Prefix: "/Run/Process"})
if err != nil {
log.Warningf("Query failed: %s", err)
continue
}
log.Infof("Dumping all processes:")
for model, ok := result.NextSync(); ok; model, ok = result.NextSync() {
bytes, err := dsd.Dump(model, dsd.AUTO)
if err != nil {
log.Warningf("Error dumping: %s", err)
continue
}
log.Info(string(bytes))
}
log.Infof("END")
}
}

View file

@ -85,3 +85,8 @@ func (b *Base) MarshalRecord() ([]byte, error) {
return c.CompileData(), nil return c.CompileData(), nil
} }
// IsWrapped returns whether the record is a Wrapper.
func (b *Base) IsWrapped() bool {
return false
}

View file

@ -32,10 +32,10 @@ import (
var ( var (
testMeta = &Meta{ testMeta = &Meta{
created: time.Now().Unix(), Created: time.Now().Unix(),
modified: time.Now().Unix(), Modified: time.Now().Unix(),
expires: time.Now().Unix(), Expires: time.Now().Unix(),
deleted: time.Now().Unix(), Deleted: time.Now().Unix(),
secret: true, secret: true,
cronjewel: true, cronjewel: true,
} }
@ -65,10 +65,10 @@ func BenchmarkMetaSerializeContainer(b *testing.B) {
// Start benchmark // Start benchmark
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
c := container.New() c := container.New()
c.AppendNumber(uint64(testMeta.created)) c.AppendNumber(uint64(testMeta.Created))
c.AppendNumber(uint64(testMeta.modified)) c.AppendNumber(uint64(testMeta.Modified))
c.AppendNumber(uint64(testMeta.expires)) c.AppendNumber(uint64(testMeta.Expires))
c.AppendNumber(uint64(testMeta.deleted)) c.AppendNumber(uint64(testMeta.Deleted))
switch { switch {
case testMeta.secret && testMeta.cronjewel: case testMeta.secret && testMeta.cronjewel:
c.AppendNumber(3) c.AppendNumber(3)
@ -87,10 +87,10 @@ func BenchmarkMetaUnserializeContainer(b *testing.B) {
// Setup // Setup
c := container.New() c := container.New()
c.AppendNumber(uint64(testMeta.created)) c.AppendNumber(uint64(testMeta.Created))
c.AppendNumber(uint64(testMeta.modified)) c.AppendNumber(uint64(testMeta.Modified))
c.AppendNumber(uint64(testMeta.expires)) c.AppendNumber(uint64(testMeta.Expires))
c.AppendNumber(uint64(testMeta.deleted)) c.AppendNumber(uint64(testMeta.Deleted))
switch { switch {
case testMeta.secret && testMeta.cronjewel: case testMeta.secret && testMeta.cronjewel:
c.AppendNumber(3) c.AppendNumber(3)
@ -113,25 +113,25 @@ func BenchmarkMetaUnserializeContainer(b *testing.B) {
var num uint64 var num uint64
c := container.New(encodedData) c := container.New(encodedData)
num, err = c.GetNextN64() num, err = c.GetNextN64()
newMeta.created = int64(num) newMeta.Created = int64(num)
if err != nil { if err != nil {
b.Errorf("could not decode: %s", err) b.Errorf("could not decode: %s", err)
return return
} }
num, err = c.GetNextN64() num, err = c.GetNextN64()
newMeta.modified = int64(num) newMeta.Modified = int64(num)
if err != nil { if err != nil {
b.Errorf("could not decode: %s", err) b.Errorf("could not decode: %s", err)
return return
} }
num, err = c.GetNextN64() num, err = c.GetNextN64()
newMeta.expires = int64(num) newMeta.Expires = int64(num)
if err != nil { if err != nil {
b.Errorf("could not decode: %s", err) b.Errorf("could not decode: %s", err)
return return
} }
num, err = c.GetNextN64() num, err = c.GetNextN64()
newMeta.deleted = int64(num) newMeta.Deleted = int64(num)
if err != nil { if err != nil {
b.Errorf("could not decode: %s", err) b.Errorf("could not decode: %s", err)
return return
@ -166,22 +166,22 @@ func BenchmarkMetaSerializeVarInt(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
encoded := make([]byte, 33) encoded := make([]byte, 33)
offset := 0 offset := 0
data := varint.Pack64(uint64(testMeta.created)) data := varint.Pack64(uint64(testMeta.Created))
for _, part := range data { for _, part := range data {
encoded[offset] = part encoded[offset] = part
offset++ offset++
} }
data = varint.Pack64(uint64(testMeta.modified)) data = varint.Pack64(uint64(testMeta.Modified))
for _, part := range data { for _, part := range data {
encoded[offset] = part encoded[offset] = part
offset++ offset++
} }
data = varint.Pack64(uint64(testMeta.expires)) data = varint.Pack64(uint64(testMeta.Expires))
for _, part := range data { for _, part := range data {
encoded[offset] = part encoded[offset] = part
offset++ offset++
} }
data = varint.Pack64(uint64(testMeta.deleted)) data = varint.Pack64(uint64(testMeta.Deleted))
for _, part := range data { for _, part := range data {
encoded[offset] = part encoded[offset] = part
offset++ offset++
@ -207,22 +207,22 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) {
// Setup // Setup
encoded := make([]byte, 33) encoded := make([]byte, 33)
offset := 0 offset := 0
data := varint.Pack64(uint64(testMeta.created)) data := varint.Pack64(uint64(testMeta.Created))
for _, part := range data { for _, part := range data {
encoded[offset] = part encoded[offset] = part
offset++ offset++
} }
data = varint.Pack64(uint64(testMeta.modified)) data = varint.Pack64(uint64(testMeta.Modified))
for _, part := range data { for _, part := range data {
encoded[offset] = part encoded[offset] = part
offset++ offset++
} }
data = varint.Pack64(uint64(testMeta.expires)) data = varint.Pack64(uint64(testMeta.Expires))
for _, part := range data { for _, part := range data {
encoded[offset] = part encoded[offset] = part
offset++ offset++
} }
data = varint.Pack64(uint64(testMeta.deleted)) data = varint.Pack64(uint64(testMeta.Deleted))
for _, part := range data { for _, part := range data {
encoded[offset] = part encoded[offset] = part
offset++ offset++
@ -254,7 +254,7 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) {
b.Error(err) b.Error(err)
return return
} }
testMeta.created = int64(num) testMeta.Created = int64(num)
offset += n offset += n
num, n, err = varint.Unpack64(encodedData[offset:]) num, n, err = varint.Unpack64(encodedData[offset:])
@ -262,7 +262,7 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) {
b.Error(err) b.Error(err)
return return
} }
testMeta.modified = int64(num) testMeta.Modified = int64(num)
offset += n offset += n
num, n, err = varint.Unpack64(encodedData[offset:]) num, n, err = varint.Unpack64(encodedData[offset:])
@ -270,7 +270,7 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) {
b.Error(err) b.Error(err)
return return
} }
testMeta.expires = int64(num) testMeta.Expires = int64(num)
offset += n offset += n
num, n, err = varint.Unpack64(encodedData[offset:]) num, n, err = varint.Unpack64(encodedData[offset:])
@ -278,7 +278,7 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) {
b.Error(err) b.Error(err)
return return
} }
testMeta.deleted = int64(num) testMeta.Deleted = int64(num)
offset += n offset += n
switch encodedData[offset] { switch encodedData[offset] {

View file

@ -33,78 +33,78 @@ func (d *Meta) GenCodeMarshal(buf []byte) ([]byte, error) {
{ {
buf[0+0] = byte(d.created >> 0) buf[0+0] = byte(d.Created >> 0)
buf[1+0] = byte(d.created >> 8) buf[1+0] = byte(d.Created >> 8)
buf[2+0] = byte(d.created >> 16) buf[2+0] = byte(d.Created >> 16)
buf[3+0] = byte(d.created >> 24) buf[3+0] = byte(d.Created >> 24)
buf[4+0] = byte(d.created >> 32) buf[4+0] = byte(d.Created >> 32)
buf[5+0] = byte(d.created >> 40) buf[5+0] = byte(d.Created >> 40)
buf[6+0] = byte(d.created >> 48) buf[6+0] = byte(d.Created >> 48)
buf[7+0] = byte(d.created >> 56) buf[7+0] = byte(d.Created >> 56)
} }
{ {
buf[0+8] = byte(d.modified >> 0) buf[0+8] = byte(d.Modified >> 0)
buf[1+8] = byte(d.modified >> 8) buf[1+8] = byte(d.Modified >> 8)
buf[2+8] = byte(d.modified >> 16) buf[2+8] = byte(d.Modified >> 16)
buf[3+8] = byte(d.modified >> 24) buf[3+8] = byte(d.Modified >> 24)
buf[4+8] = byte(d.modified >> 32) buf[4+8] = byte(d.Modified >> 32)
buf[5+8] = byte(d.modified >> 40) buf[5+8] = byte(d.Modified >> 40)
buf[6+8] = byte(d.modified >> 48) buf[6+8] = byte(d.Modified >> 48)
buf[7+8] = byte(d.modified >> 56) buf[7+8] = byte(d.Modified >> 56)
} }
{ {
buf[0+16] = byte(d.expires >> 0) buf[0+16] = byte(d.Expires >> 0)
buf[1+16] = byte(d.expires >> 8) buf[1+16] = byte(d.Expires >> 8)
buf[2+16] = byte(d.expires >> 16) buf[2+16] = byte(d.Expires >> 16)
buf[3+16] = byte(d.expires >> 24) buf[3+16] = byte(d.Expires >> 24)
buf[4+16] = byte(d.expires >> 32) buf[4+16] = byte(d.Expires >> 32)
buf[5+16] = byte(d.expires >> 40) buf[5+16] = byte(d.Expires >> 40)
buf[6+16] = byte(d.expires >> 48) buf[6+16] = byte(d.Expires >> 48)
buf[7+16] = byte(d.expires >> 56) buf[7+16] = byte(d.Expires >> 56)
} }
{ {
buf[0+24] = byte(d.deleted >> 0) buf[0+24] = byte(d.Deleted >> 0)
buf[1+24] = byte(d.deleted >> 8) buf[1+24] = byte(d.Deleted >> 8)
buf[2+24] = byte(d.deleted >> 16) buf[2+24] = byte(d.Deleted >> 16)
buf[3+24] = byte(d.deleted >> 24) buf[3+24] = byte(d.Deleted >> 24)
buf[4+24] = byte(d.deleted >> 32) buf[4+24] = byte(d.Deleted >> 32)
buf[5+24] = byte(d.deleted >> 40) buf[5+24] = byte(d.Deleted >> 40)
buf[6+24] = byte(d.deleted >> 48) buf[6+24] = byte(d.Deleted >> 48)
buf[7+24] = byte(d.deleted >> 56) buf[7+24] = byte(d.Deleted >> 56)
} }
{ {
@ -134,22 +134,22 @@ func (d *Meta) GenCodeUnmarshal(buf []byte) (uint64, error) {
{ {
d.created = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56) d.Created = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56)
} }
{ {
d.modified = 0 | (int64(buf[0+8]) << 0) | (int64(buf[1+8]) << 8) | (int64(buf[2+8]) << 16) | (int64(buf[3+8]) << 24) | (int64(buf[4+8]) << 32) | (int64(buf[5+8]) << 40) | (int64(buf[6+8]) << 48) | (int64(buf[7+8]) << 56) d.Modified = 0 | (int64(buf[0+8]) << 0) | (int64(buf[1+8]) << 8) | (int64(buf[2+8]) << 16) | (int64(buf[3+8]) << 24) | (int64(buf[4+8]) << 32) | (int64(buf[5+8]) << 40) | (int64(buf[6+8]) << 48) | (int64(buf[7+8]) << 56)
} }
{ {
d.expires = 0 | (int64(buf[0+16]) << 0) | (int64(buf[1+16]) << 8) | (int64(buf[2+16]) << 16) | (int64(buf[3+16]) << 24) | (int64(buf[4+16]) << 32) | (int64(buf[5+16]) << 40) | (int64(buf[6+16]) << 48) | (int64(buf[7+16]) << 56) d.Expires = 0 | (int64(buf[0+16]) << 0) | (int64(buf[1+16]) << 8) | (int64(buf[2+16]) << 16) | (int64(buf[3+16]) << 24) | (int64(buf[4+16]) << 32) | (int64(buf[5+16]) << 40) | (int64(buf[6+16]) << 48) | (int64(buf[7+16]) << 56)
} }
{ {
d.deleted = 0 | (int64(buf[0+24]) << 0) | (int64(buf[1+24]) << 8) | (int64(buf[2+24]) << 16) | (int64(buf[3+24]) << 24) | (int64(buf[4+24]) << 32) | (int64(buf[5+24]) << 40) | (int64(buf[6+24]) << 48) | (int64(buf[7+24]) << 56) d.Deleted = 0 | (int64(buf[0+24]) << 0) | (int64(buf[1+24]) << 8) | (int64(buf[2+24]) << 16) | (int64(buf[3+24]) << 24) | (int64(buf[4+24]) << 32) | (int64(buf[5+24]) << 40) | (int64(buf[6+24]) << 48) | (int64(buf[7+24]) << 56)
} }
{ {

View file

@ -8,10 +8,10 @@ import (
var ( var (
genCodeTestMeta = &Meta{ genCodeTestMeta = &Meta{
created: time.Now().Unix(), Created: time.Now().Unix(),
modified: time.Now().Unix(), Modified: time.Now().Unix(),
expires: time.Now().Unix(), Expires: time.Now().Unix(),
deleted: time.Now().Unix(), Deleted: time.Now().Unix(),
secret: true, secret: true,
cronjewel: true, cronjewel: true,
} }

View file

@ -4,39 +4,39 @@ import "time"
// Meta holds // Meta holds
type Meta struct { type Meta struct {
created int64 Created int64
modified int64 Modified int64
expires int64 Expires int64
deleted int64 Deleted int64
secret bool // secrets must not be sent to the UI, only synced between nodes secret bool // secrets must not be sent to the UI, only synced between nodes
cronjewel bool // crownjewels must never leave the instance, but may be read by the UI cronjewel bool // crownjewels must never leave the instance, but may be read by the UI
} }
// SetAbsoluteExpiry sets an absolute expiry time (in seconds), that is not affected when the record is updated. // SetAbsoluteExpiry sets an absolute expiry time (in seconds), that is not affected when the record is updated.
func (m *Meta) SetAbsoluteExpiry(seconds int64) { func (m *Meta) SetAbsoluteExpiry(seconds int64) {
m.expires = seconds m.Expires = seconds
m.deleted = 0 m.Deleted = 0
} }
// SetRelativateExpiry sets a relative expiry time (ie. TTL in seconds) that is automatically updated whenever the record is updated/saved. // SetRelativateExpiry sets a relative expiry time (ie. TTL in seconds) that is automatically updated whenever the record is updated/saved.
func (m *Meta) SetRelativateExpiry(seconds int64) { func (m *Meta) SetRelativateExpiry(seconds int64) {
if seconds >= 0 { if seconds >= 0 {
m.deleted = -seconds m.Deleted = -seconds
} }
} }
// GetAbsoluteExpiry returns the absolute expiry time. // GetAbsoluteExpiry returns the absolute expiry time.
func (m *Meta) GetAbsoluteExpiry() int64 { func (m *Meta) GetAbsoluteExpiry() int64 {
return m.expires return m.Expires
} }
// GetRelativeExpiry returns the current relative expiry time - ie. seconds until expiry. // GetRelativeExpiry returns the current relative expiry time - ie. seconds until expiry.
func (m *Meta) GetRelativeExpiry() int64 { func (m *Meta) GetRelativeExpiry() int64 {
if m.deleted < 0 { if m.Deleted < 0 {
return -m.deleted return -m.Deleted
} }
abs := m.expires - time.Now().Unix() abs := m.Expires - time.Now().Unix()
if abs < 0 { if abs < 0 {
return 0 return 0
} }
@ -56,30 +56,38 @@ func (m *Meta) MakeSecret() {
// Update updates the internal meta states and should be called before writing the record to the database. // Update updates the internal meta states and should be called before writing the record to the database.
func (m *Meta) Update() { func (m *Meta) Update() {
now := time.Now().Unix() now := time.Now().Unix()
m.modified = now m.Modified = now
if m.created == 0 { if m.Created == 0 {
m.created = now m.Created = now
} }
if m.deleted < 0 { if m.Deleted < 0 {
m.expires = now - m.deleted m.Expires = now - m.Deleted
} }
} }
// Reset resets all metadata, except for the secret and crownjewel status. // Reset resets all metadata, except for the secret and crownjewel status.
func (m *Meta) Reset() { func (m *Meta) Reset() {
m.created = 0 m.Created = 0
m.modified = 0 m.Modified = 0
m.expires = 0 m.Expires = 0
m.deleted = 0 m.Deleted = 0
} }
// CheckScope checks whether the current database record exists for the given scope. // CheckValidity checks whether the database record is valid.
func (m *Meta) CheckScope(now int64, local, internal bool) (recordExists bool) { func (m *Meta) CheckValidity(now int64) (valid bool) {
switch { switch {
case m.deleted > 0: case m.Deleted > 0:
return false return false
case m.expires < now: case m.Expires < now:
return false return false
default:
return true
}
}
// CheckPermission checks whether the database record may be accessed with the following scope.
func (m *Meta) CheckPermission(local, internal bool) (permitted bool) {
switch {
case !local && m.cronjewel: case !local && m.cronjewel:
return false return false
case !internal && m.secret: case !internal && m.secret:

View file

@ -16,4 +16,6 @@ type Record interface {
Lock() Lock()
Unlock() Unlock()
IsWrapped() bool
} }

View file

@ -118,3 +118,25 @@ func (w *Wrapper) Lock() {
func (w *Wrapper) Unlock() { func (w *Wrapper) Unlock() {
w.lock.Unlock() w.lock.Unlock()
} }
// IsWrapped returns whether the record is a Wrapper.
func (w *Wrapper) IsWrapped() bool {
return true
}
func Unwrap(wrapped, new Record) (Record, error) {
wrapper, ok := wrapped.(*Wrapper)
if !ok {
return nil, fmt.Errorf("cannot unwrap %T", wrapped)
}
_, err := dsd.Load(wrapper.Data, new)
if err != nil {
return nil, fmt.Errorf("database: failed to unwrap %T: %s", new, err)
}
new.SetKey(wrapped.Key())
new.SetMeta(wrapped.Meta())
return new, nil
}

145
database/registry.go Normal file
View file

@ -0,0 +1,145 @@
package database
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"sync"
"github.com/tevino/abool"
)
// RegisteredDatabase holds information about registered databases
type RegisteredDatabase struct {
Name string
Description string
StorageType string
PrimaryAPI string
}
// Equal returns whether this instance equals another.
func (r *RegisteredDatabase) Equal(o *RegisteredDatabase) bool {
if r.Name != o.Name ||
r.Description != o.Description ||
r.StorageType != o.StorageType ||
r.PrimaryAPI != o.PrimaryAPI {
return false
}
return true
}
const (
registryFileName = "databases.json"
)
var (
initialized = abool.NewBool(false)
registry map[string]*RegisteredDatabase
registryLock sync.Mutex
)
// RegisterDatabase registers a new database.
func RegisterDatabase(new *RegisteredDatabase) error {
if !initialized.IsSet() {
return errors.New("database not initialized")
}
registryLock.Lock()
defer registryLock.Unlock()
registeredDB, ok := registry[new.Name]
if !ok || !new.Equal(registeredDB) {
registry[new.Name] = new
return saveRegistry()
}
return nil
}
// Initialize initialized the database
func Initialize(location string) error {
if initialized.SetToIf(false, true) {
rootDir = location
err := checkRootDir()
if err != nil {
return fmt.Errorf("could not create/open database directory (%s): %s", rootDir, err)
}
err = loadRegistry()
if err != nil {
return fmt.Errorf("could not load database registry (%s): %s", path.Join(rootDir, registryFileName), err)
}
return nil
}
return errors.New("database already initialized")
}
func checkRootDir() error {
// open dir
dir, err := os.Open(rootDir)
if err != nil {
if err == os.ErrNotExist {
return os.MkdirAll(rootDir, 0700)
}
return err
}
defer dir.Close()
fileInfo, err := dir.Stat()
if err != nil {
return err
}
if fileInfo.Mode().Perm() != 0700 {
return dir.Chmod(0700)
}
return nil
}
func loadRegistry() error {
registryLock.Lock()
defer registryLock.Unlock()
// read file
filePath := path.Join(rootDir, registryFileName)
data, err := ioutil.ReadFile(filePath)
if err != nil {
if err == os.ErrNotExist {
registry = make(map[string]*RegisteredDatabase)
return nil
}
return err
}
// parse
new := make(map[string]*RegisteredDatabase)
err = json.Unmarshal(data, new)
if err != nil {
return err
}
// set
registry = new
return nil
}
func saveRegistry() error {
registryLock.Lock()
defer registryLock.Unlock()
// marshal
data, err := json.Marshal(registry)
if err != nil {
return err
}
// write file
filePath := path.Join(rootDir, registryFileName)
return ioutil.WriteFile(filePath, data, 0600)
}

View file

@ -2,13 +2,12 @@ package storage
import ( import (
"github.com/Safing/portbase/database/iterator" "github.com/Safing/portbase/database/iterator"
"github.com/Safing/portbase/database/record"
"github.com/Safing/portbase/database/query" "github.com/Safing/portbase/database/query"
"github.com/Safing/portbase/database/record"
) )
// Interface defines the database storage API. // Interface defines the database storage API.
type Interface interface { type Interface interface {
Exists(key string) (bool, error)
Get(key string) (record.Record, error) Get(key string) (record.Record, error)
Put(m record.Record) error Put(m record.Record) error
Delete(key string) error Delete(key string) error

View file

@ -28,6 +28,11 @@ func Register(name string, factory Factory) error {
return nil return nil
} }
// CreateDatabase starts a new database with the given name and storageType at location.
func CreateDatabase(name, storageType, location string) (Interface, error) {
return nil, nil
}
// StartDatabase starts a new database with the given name and storageType at location. // StartDatabase starts a new database with the given name and storageType at location.
func StartDatabase(name, storageType, location string) (Interface, error) { func StartDatabase(name, storageType, location string) (Interface, error) {
storagesLock.Lock() storagesLock.Lock()

View file

@ -2,271 +2,271 @@
package database package database
import ( // import (
"fmt" // "fmt"
"strings" // "strings"
"sync" // "sync"
//
"github.com/Safing/safing-core/modules" // "github.com/Safing/portbase/database/record"
"github.com/Safing/safing-core/taskmanager" // "github.com/Safing/portbase/modules"
// "github.com/Safing/portbase/taskmanager"
"github.com/ipfs/go-datastore" //
"github.com/tevino/abool" // "github.com/tevino/abool"
) // )
//
var subscriptionModule *modules.Module // var subscriptionModule *modules.Module
var subscriptions []*Subscription // var subscriptions []*Subscription
var subLock sync.Mutex // var subLock sync.Mutex
//
var databaseUpdate chan Model // var databaseUpdate chan Model
var databaseCreate chan Model // var databaseCreate chan Model
var databaseDelete chan *datastore.Key // var databaseDelete chan string
//
var workIsWaiting chan *struct{} // var workIsWaiting chan *struct{}
var workIsWaitingFlag *abool.AtomicBool // var workIsWaitingFlag *abool.AtomicBool
var forceProcessing chan *struct{} // var forceProcessing chan *struct{}
//
type Subscription struct { // type Subscription struct {
typeAndLocation map[string]bool // typeAndLocation map[string]bool
exactObject map[string]bool // exactObject map[string]bool
children map[string]uint8 // children map[string]uint8
Created chan Model // Created chan record.Record
Updated chan Model // Updated chan record.Record
Deleted chan *datastore.Key // Deleted chan string
} // }
//
func NewSubscription() *Subscription { // func NewSubscription() *Subscription {
subLock.Lock() // subLock.Lock()
defer subLock.Unlock() // defer subLock.Unlock()
sub := &Subscription{ // sub := &Subscription{
typeAndLocation: make(map[string]bool), // typeAndLocation: make(map[string]bool),
exactObject: make(map[string]bool), // exactObject: make(map[string]bool),
children: make(map[string]uint8), // children: make(map[string]uint8),
Created: make(chan Model, 128), // Created: make(chan record.Record, 128),
Updated: make(chan Model, 128), // Updated: make(chan record.Record, 128),
Deleted: make(chan *datastore.Key, 128), // Deleted: make(chan string, 128),
} // }
subscriptions = append(subscriptions, sub) // subscriptions = append(subscriptions, sub)
return sub // return sub
} // }
//
func (sub *Subscription) Subscribe(subKey string) { // func (sub *Subscription) Subscribe(subKey string) {
subLock.Lock() // subLock.Lock()
defer subLock.Unlock() // defer subLock.Unlock()
//
namespaces := strings.Split(subKey, "/")[1:] // namespaces := strings.Split(subKey, "/")[1:]
lastSpace := "" // lastSpace := ""
if len(namespaces) != 0 { // if len(namespaces) != 0 {
lastSpace = namespaces[len(namespaces)-1] // lastSpace = namespaces[len(namespaces)-1]
} // }
//
switch { // switch {
case lastSpace == "": // case lastSpace == "":
// save key without leading "/" // // save key without leading "/"
// save with depth 255 to get all // // save with depth 255 to get all
sub.children[strings.Trim(subKey, "/")] = 0xFF // sub.children[strings.Trim(subKey, "/")] = 0xFF
case strings.HasPrefix(lastSpace, "*"): // case strings.HasPrefix(lastSpace, "*"):
// save key without leading or trailing "/" or "*" // // save key without leading or trailing "/" or "*"
// save full wanted depth - this makes comparison easier // // save full wanted depth - this makes comparison easier
sub.children[strings.Trim(subKey, "/*")] = uint8(len(lastSpace) + len(namespaces) - 1) // sub.children[strings.Trim(subKey, "/*")] = uint8(len(lastSpace) + len(namespaces) - 1)
case strings.Contains(lastSpace, ":"): // case strings.Contains(lastSpace, ":"):
sub.exactObject[subKey] = true // sub.exactObject[subKey] = true
default: // default:
sub.typeAndLocation[subKey] = true // sub.typeAndLocation[subKey] = true
} // }
} // }
//
func (sub *Subscription) Unsubscribe(subKey string) { // func (sub *Subscription) Unsubscribe(subKey string) {
subLock.Lock() // subLock.Lock()
defer subLock.Unlock() // defer subLock.Unlock()
//
namespaces := strings.Split(subKey, "/")[1:] // namespaces := strings.Split(subKey, "/")[1:]
lastSpace := "" // lastSpace := ""
if len(namespaces) != 0 { // if len(namespaces) != 0 {
lastSpace = namespaces[len(namespaces)-1] // lastSpace = namespaces[len(namespaces)-1]
} // }
//
switch { // switch {
case lastSpace == "": // case lastSpace == "":
delete(sub.children, strings.Trim(subKey, "/")) // delete(sub.children, strings.Trim(subKey, "/"))
case strings.HasPrefix(lastSpace, "*"): // case strings.HasPrefix(lastSpace, "*"):
delete(sub.children, strings.Trim(subKey, "/*")) // delete(sub.children, strings.Trim(subKey, "/*"))
case strings.Contains(lastSpace, ":"): // case strings.Contains(lastSpace, ":"):
delete(sub.exactObject, subKey) // delete(sub.exactObject, subKey)
default: // default:
delete(sub.typeAndLocation, subKey) // delete(sub.typeAndLocation, subKey)
} // }
} // }
//
func (sub *Subscription) Destroy() { // func (sub *Subscription) Destroy() {
subLock.Lock() // subLock.Lock()
defer subLock.Unlock() // defer subLock.Unlock()
//
for k, v := range subscriptions { // for k, v := range subscriptions {
if v.Created == sub.Created { // if v.Created == sub.Created {
defer func() { // defer func() {
subscriptions = append(subscriptions[:k], subscriptions[k+1:]...) // subscriptions = append(subscriptions[:k], subscriptions[k+1:]...)
}() // }()
close(sub.Created) // close(sub.Created)
close(sub.Updated) // close(sub.Updated)
close(sub.Deleted) // close(sub.Deleted)
return // return
} // }
} // }
} // }
//
func (sub *Subscription) Subscriptions() *[]string { // func (sub *Subscription) Subscriptions() *[]string {
subStrings := make([]string, 0) // subStrings := make([]string, 0)
for subString := range sub.exactObject { // for subString := range sub.exactObject {
subStrings = append(subStrings, subString) // subStrings = append(subStrings, subString)
} // }
for subString := range sub.typeAndLocation { // for subString := range sub.typeAndLocation {
subStrings = append(subStrings, subString) // subStrings = append(subStrings, subString)
} // }
for subString, depth := range sub.children { // for subString, depth := range sub.children {
if depth == 0xFF { // if depth == 0xFF {
subStrings = append(subStrings, fmt.Sprintf("/%s/", subString)) // subStrings = append(subStrings, fmt.Sprintf("/%s/", subString))
} else { // } else {
subStrings = append(subStrings, fmt.Sprintf("/%s/%s", subString, strings.Repeat("*", int(depth)-len(strings.Split(subString, "/"))))) // subStrings = append(subStrings, fmt.Sprintf("/%s/%s", subString, strings.Repeat("*", int(depth)-len(strings.Split(subString, "/")))))
} // }
} // }
return &subStrings // return &subStrings
} // }
//
func (sub *Subscription) String() string { // func (sub *Subscription) String() string {
return fmt.Sprintf("<Subscription [%s]>", strings.Join(*sub.Subscriptions(), " ")) // return fmt.Sprintf("<Subscription [%s]>", strings.Join(*sub.Subscriptions(), " "))
} // }
//
func (sub *Subscription) send(key *datastore.Key, model Model, created bool) { // func (sub *Subscription) send(key string, rec record.Record, created bool) {
if model == nil { // if rec == nil {
sub.Deleted <- key // sub.Deleted <- key
} else if created { // } else if created {
sub.Created <- model // sub.Created <- rec
} else { // } else {
sub.Updated <- model // sub.Updated <- rec
} // }
} // }
//
func process(key *datastore.Key, model Model, created bool) { // func process(key string, rec record.Record, created bool) {
subLock.Lock() // subLock.Lock()
defer subLock.Unlock() // defer subLock.Unlock()
//
stringRep := key.String() // stringRep := key.String()
// "/Comedy/MontyPython/Actor:JohnCleese" // // "/Comedy/MontyPython/Actor:JohnCleese"
typeAndLocation := key.Path().String() // typeAndLocation := key.Path().String()
// "/Comedy/MontyPython/Actor" // // "/Comedy/MontyPython/Actor"
namespaces := key.Namespaces() // namespaces := key.Namespaces()
// ["Comedy", "MontyPython", "Actor:JohnCleese"] // // ["Comedy", "MontyPython", "Actor:JohnCleese"]
depth := uint8(len(namespaces)) // depth := uint8(len(namespaces))
// 3 // // 3
//
subscriptionLoop: // subscriptionLoop:
for _, sub := range subscriptions { // for _, sub := range subscriptions {
if _, ok := sub.exactObject[stringRep]; ok { // if _, ok := sub.exactObject[stringRep]; ok {
sub.send(key, model, created) // sub.send(key, rec, created)
continue subscriptionLoop // continue subscriptionLoop
} // }
if _, ok := sub.typeAndLocation[typeAndLocation]; ok { // if _, ok := sub.typeAndLocation[typeAndLocation]; ok {
sub.send(key, model, created) // sub.send(key, rec, created)
continue subscriptionLoop // continue subscriptionLoop
} // }
for i := 0; i < len(namespaces); i++ { // for i := 0; i < len(namespaces); i++ {
if subscribedDepth, ok := sub.children[strings.Join(namespaces[:i], "/")]; ok { // if subscribedDepth, ok := sub.children[strings.Join(namespaces[:i], "/")]; ok {
if subscribedDepth >= depth { // if subscribedDepth >= depth {
sub.send(key, model, created) // sub.send(key, rec, created)
continue subscriptionLoop // continue subscriptionLoop
} // }
} // }
} // }
} // }
//
} // }
//
func init() { // func init() {
subscriptionModule = modules.Register("Database:Subscriptions", 128) // subscriptionModule = modules.Register("Database:Subscriptions", 128)
subscriptions = make([]*Subscription, 0) // subscriptions = make([]*Subscription, 0)
subLock = sync.Mutex{} // subLock = sync.Mutex{}
//
databaseUpdate = make(chan Model, 32) // databaseUpdate = make(chan Model, 32)
databaseCreate = make(chan Model, 32) // databaseCreate = make(chan Model, 32)
databaseDelete = make(chan *datastore.Key, 32) // databaseDelete = make(chan string, 32)
//
workIsWaiting = make(chan *struct{}, 0) // workIsWaiting = make(chan *struct{}, 0)
workIsWaitingFlag = abool.NewBool(false) // workIsWaitingFlag = abool.NewBool(false)
forceProcessing = make(chan *struct{}, 0) // forceProcessing = make(chan *struct{}, 0)
//
go run() // go run()
} // }
//
func run() { // func run() {
for { // for {
select { // select {
case <-subscriptionModule.Stop: // case <-subscriptionModule.Stop:
subscriptionModule.StopComplete() // subscriptionModule.StopComplete()
return // return
case <-workIsWaiting: // case <-workIsWaiting:
work() // work()
} // }
} // }
} // }
//
func work() { // func work() {
defer workIsWaitingFlag.UnSet() // defer workIsWaitingFlag.UnSet()
//
// wait // // wait
select { // select {
case <-taskmanager.StartMediumPriorityMicroTask(): // case <-taskmanager.StartMediumPriorityMicroTask():
defer taskmanager.EndMicroTask() // defer taskmanager.EndMicroTask()
case <-forceProcessing: // case <-forceProcessing:
} // }
//
// work // // work
for { // for {
select { // select {
case model := <-databaseCreate: // case rec := <-databaseCreate:
process(model.GetKey(), model, true) // process(rec.GetKey(), rec, true)
case model := <-databaseUpdate: // case rec := <-databaseUpdate:
process(model.GetKey(), model, false) // process(rec.GetKey(), rec, false)
case key := <-databaseDelete: // case key := <-databaseDelete:
process(key, nil, false) // process(key, nil, false)
default: // default:
return // return
} // }
} // }
} // }
//
func handleCreateSubscriptions(model Model) { // func handleCreateSubscriptions(rec record.Record) {
select { // select {
case databaseCreate <- model: // case databaseCreate <- rec:
default: // default:
forceProcessing <- nil // forceProcessing <- nil
databaseCreate <- model // databaseCreate <- rec
} // }
if workIsWaitingFlag.SetToIf(false, true) { // if workIsWaitingFlag.SetToIf(false, true) {
workIsWaiting <- nil // workIsWaiting <- nil
} // }
} // }
//
func handleUpdateSubscriptions(model Model) { // func handleUpdateSubscriptions(rec record.Record) {
select { // select {
case databaseUpdate <- model: // case databaseUpdate <- rec:
default: // default:
forceProcessing <- nil // forceProcessing <- nil
databaseUpdate <- model // databaseUpdate <- rec
} // }
if workIsWaitingFlag.SetToIf(false, true) { // if workIsWaitingFlag.SetToIf(false, true) {
workIsWaiting <- nil // workIsWaiting <- nil
} // }
} // }
//
func handleDeleteSubscriptions(key *datastore.Key) { // func handleDeleteSubscriptions(key string) {
select { // select {
case databaseDelete <- key: // case databaseDelete <- key:
default: // default:
forceProcessing <- nil // forceProcessing <- nil
databaseDelete <- key // databaseDelete <- key
} // }
if workIsWaitingFlag.SetToIf(false, true) { // if workIsWaitingFlag.SetToIf(false, true) {
workIsWaiting <- nil // workIsWaiting <- nil
} // }
} // }

View file

@ -2,102 +2,102 @@
package database package database
import ( // import (
"strconv" // "strconv"
"strings" // "strings"
"sync" // "sync"
"testing" // "testing"
) // )
//
var subTestWg sync.WaitGroup // var subTestWg sync.WaitGroup
//
func waitForSubs(t *testing.T, sub *Subscription, highest int) { // func waitForSubs(t *testing.T, sub *Subscription, highest int) {
defer subTestWg.Done() // defer subTestWg.Done()
expecting := 1 // expecting := 1
var subbedModel Model // var subbedModel Model
forLoop: // forLoop:
for { // for {
select { // select {
case subbedModel = <-sub.Created: // case subbedModel = <-sub.Created:
case subbedModel = <-sub.Updated: // case subbedModel = <-sub.Updated:
} // }
t.Logf("got model from subscription: %s", subbedModel.GetKey().String()) // t.Logf("got model from subscription: %s", subbedModel.GetKey().String())
if !strings.HasPrefix(subbedModel.GetKey().Name(), "sub") { // if !strings.HasPrefix(subbedModel.GetKey().Name(), "sub") {
// not a model that we use for testing, other tests might be interfering // // not a model that we use for testing, other tests might be interfering
continue forLoop // continue forLoop
} // }
number, err := strconv.Atoi(strings.TrimPrefix(subbedModel.GetKey().Name(), "sub")) // number, err := strconv.Atoi(strings.TrimPrefix(subbedModel.GetKey().Name(), "sub"))
if err != nil || number != expecting { // if err != nil || number != expecting {
t.Errorf("test subscription: got unexpected model %s, expected sub%d", subbedModel.GetKey().String(), expecting) // t.Errorf("test subscription: got unexpected model %s, expected sub%d", subbedModel.GetKey().String(), expecting)
continue forLoop // continue forLoop
} // }
if number == highest { // if number == highest {
return // return
} // }
expecting++ // expecting++
} // }
} // }
//
func TestSubscriptions(t *testing.T) { // func TestSubscriptions(t *testing.T) {
//
// create subscription // // create subscription
sub := NewSubscription() // sub := NewSubscription()
//
// FIRST TEST // // FIRST TEST
//
subTestWg.Add(1) // subTestWg.Add(1)
go waitForSubs(t, sub, 3) // go waitForSubs(t, sub, 3)
sub.Subscribe("/Tests/") // sub.Subscribe("/Tests/")
t.Log(sub.String()) // t.Log(sub.String())
//
(&(TestingModel{})).CreateInNamespace("", "sub1") // (&(TestingModel{})).CreateInNamespace("", "sub1")
(&(TestingModel{})).CreateInNamespace("A", "sub2") // (&(TestingModel{})).CreateInNamespace("A", "sub2")
(&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "sub3") // (&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "sub3")
//
subTestWg.Wait() // subTestWg.Wait()
//
// SECOND TEST // // SECOND TEST
//
subTestWg.Add(1) // subTestWg.Add(1)
go waitForSubs(t, sub, 3) // go waitForSubs(t, sub, 3)
sub.Unsubscribe("/Tests/") // sub.Unsubscribe("/Tests/")
sub.Subscribe("/Tests/A/****") // sub.Subscribe("/Tests/A/****")
t.Log(sub.String()) // t.Log(sub.String())
//
(&(TestingModel{})).CreateInNamespace("", "subX") // (&(TestingModel{})).CreateInNamespace("", "subX")
(&(TestingModel{})).CreateInNamespace("A", "sub1") // (&(TestingModel{})).CreateInNamespace("A", "sub1")
(&(TestingModel{})).CreateInNamespace("A/B/C/D", "sub2") // (&(TestingModel{})).CreateInNamespace("A/B/C/D", "sub2")
(&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "subX") // (&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "subX")
(&(TestingModel{})).CreateInNamespace("A", "sub3") // (&(TestingModel{})).CreateInNamespace("A", "sub3")
//
subTestWg.Wait() // subTestWg.Wait()
//
// THIRD TEST // // THIRD TEST
//
subTestWg.Add(1) // subTestWg.Add(1)
go waitForSubs(t, sub, 3) // go waitForSubs(t, sub, 3)
sub.Unsubscribe("/Tests/A/****") // sub.Unsubscribe("/Tests/A/****")
sub.Subscribe("/Tests/TestingModel:sub1") // sub.Subscribe("/Tests/TestingModel:sub1")
sub.Subscribe("/Tests/TestingModel:sub1/TestingModel") // sub.Subscribe("/Tests/TestingModel:sub1/TestingModel")
t.Log(sub.String()) // t.Log(sub.String())
//
(&(TestingModel{})).CreateInNamespace("", "sub1") // (&(TestingModel{})).CreateInNamespace("", "sub1")
(&(TestingModel{})).CreateInNamespace("", "subX") // (&(TestingModel{})).CreateInNamespace("", "subX")
(&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub2") // (&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub2")
(&(TestingModel{})).CreateInNamespace("TestingModel:sub1/A", "subX") // (&(TestingModel{})).CreateInNamespace("TestingModel:sub1/A", "subX")
(&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub3") // (&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub3")
//
subTestWg.Wait() // subTestWg.Wait()
//
// FINAL STUFF // // FINAL STUFF
//
model := &TestingModel{} // model := &TestingModel{}
model.CreateInNamespace("Invalid", "subX") // model.CreateInNamespace("Invalid", "subX")
model.Save() // model.Save()
//
sub.Destroy() // sub.Destroy()
//
// time.Sleep(1 * time.Second) // // time.Sleep(1 * time.Second)
// pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) // // pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//
} // }

View file

@ -1,17 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package database
import (
"github.com/ipfs/go-datastore"
"github.com/Safing/safing-core/database/dbutils"
)
func NewWrapper(key *datastore.Key, data []byte) (*dbutils.Wrapper, error) {
return dbutils.NewWrapper(key, data)
}
func DumpModel(uncertain interface{}, storageType uint8) ([]byte, error) {
return dbutils.DumpModel(uncertain, storageType)
}

View file

@ -1,68 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package database
import (
"testing"
"github.com/Safing/safing-core/formats/dsd"
)
func TestWrapper(t *testing.T) {
// create Model
new := &TestingModel{
Name: "a",
Value: "b",
}
newTwo := &TestingModel{
Name: "c",
Value: "d",
}
// dump
bytes, err := DumpModel(new, dsd.JSON)
if err != nil {
panic(err)
}
bytesTwo, err := DumpModel(newTwo, dsd.JSON)
if err != nil {
panic(err)
}
// wrap
wrapped, err := NewWrapper(nil, bytes)
if err != nil {
panic(err)
}
wrappedTwo, err := NewWrapper(nil, bytesTwo)
if err != nil {
panic(err)
}
// model definition for unwrapping
var model *TestingModel
// unwrap
myModel, ok := SilentEnsureModel(wrapped, model).(*TestingModel)
if !ok {
panic("received model does not match expected model")
}
if myModel.Name != "a" || myModel.Value != "b" {
panic("model value mismatch")
}
// verbose unwrap
genericModel, err := EnsureModel(wrappedTwo, model)
if err != nil {
panic(err)
}
myModelTwo, ok := genericModel.(*TestingModel)
if !ok {
panic("received model does not match expected model")
}
if myModelTwo.Name != "c" || myModelTwo.Value != "d" {
panic("model value mismatch")
}
}