From b8e7f90dbea0ad2ecbcd93e8db498a51fca1d1c2 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 6 Sep 2018 19:06:13 +0200 Subject: [PATCH] Continue work on database module --- database/controller.go | 95 +++-- database/database.go | 156 ++++---- database/databases.go | 123 +++--- database/dbmodule/db.go | 29 ++ database/easyquery.go | 84 ----- database/easyquery_test.go | 68 ---- database/interface.go | 64 +++- database/location.go | 14 + database/model.go | 64 ---- database/model_test.go | 108 ------ database/queries.go | 37 -- database/record/base.go | 5 + database/record/meta-bench_test.go | 56 +-- database/record/meta-gencode.go | 72 ++-- database/record/meta-gencode_test.go | 8 +- database/record/meta.go | 56 +-- database/record/record.go | 2 + database/record/wrapper.go | 22 ++ database/registry.go | 145 ++++++++ database/storage/interface.go | 3 +- database/storage/storages.go | 5 + database/subscriptions.go | 536 +++++++++++++-------------- database/subscriptions_test.go | 198 +++++----- database/wrapper.go | 17 - database/wrapper_test.go | 68 ---- 25 files changed, 962 insertions(+), 1073 deletions(-) create mode 100644 database/dbmodule/db.go delete mode 100644 database/easyquery.go delete mode 100644 database/easyquery_test.go create mode 100644 database/location.go delete mode 100644 database/model.go delete mode 100644 database/model_test.go delete mode 100644 database/queries.go create mode 100644 database/registry.go delete mode 100644 database/wrapper.go delete mode 100644 database/wrapper_test.go diff --git a/database/controller.go b/database/controller.go index 499b746..428f319 100644 --- a/database/controller.go +++ b/database/controller.go @@ -1,42 +1,89 @@ package database import ( - "github.com/Safing/portbase/database/record" + "sync" + "time" + + "github.com/tevino/abool" + + "github.com/Safing/portbase/database/iterator" + "github.com/Safing/portbase/database/query" + "github.com/Safing/portbase/database/record" + "github.com/Safing/portbase/database/storage" ) +// A Controller takes care of all the extra database logic. type Controller struct { - storage - writeLock sync.RWMutex - readLock sync.RWMutex - migrating *abool.AtomicBool + storage storage.Interface + writeLock sync.RWMutex + readLock sync.RWMutex + migrating *abool.AtomicBool } -func NewController() (*Controller, error) { - +// newController creates a new controller for a storage. +func newController(storageInt storage.Interface) (*Controller, error) { + return &Controller{ + storage: storageInt, + migrating: abool.NewBool(false), + }, nil } - // Retrieve -func (c *Controller) Exists(key string) (bool, error) {} -func (c *Controller) Get(key string) (record.Record, error) {} +// Get return the record with the given key. +func (c *Controller) Get(key string) (record.Record, error) { + r, err := c.storage.Get(key) + if err != nil { + return nil, err + } -// Modify -func (c *Controller) Create(model record.Record) error {} -// create when not exists -func (c *Controller) Update(model record.Record) error {} -// update, create if not exists. -func (c *Controller) UpdateOrCreate(model record.Record) error {} -func (c *Controller) Delete(key string) error {} + if !r.Meta().CheckValidity(time.Now().Unix()) { + return nil, ErrNotFound + } + + return r, nil +} + +// Put saves a record in the database. +func (c *Controller) Put(r record.Record) error { + return c.storage.Put(r) +} + +func (c *Controller) Delete(key string) error { + r, err := c.Get(key) + if err != nil { + return err + } + r.Meta().Deleted = time.Now().Unix() + return c.Put(r) +} // Partial // What happens if I mutate a value that does not yet exist? How would I know its type? -func (c *Controller) InsertPartial(key string, partialObject interface{}) {} -func (c *Controller) InsertValue(key string, attribute string, value interface{}) {} +func (c *Controller) InsertPartial(key string, partialObject interface{}) error { + return nil +} + +func (c *Controller) InsertValue(key string, attribute string, value interface{}) error { + return nil +} // Query -func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {} +func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { + return nil, nil +} // Meta -func (c *Controller) SetAbsoluteExpiry(key string, time int64) {} -func (c *Controller) SetRelativateExpiry(key string, duration int64) {} -func (c *Controller) MakeCrownJewel(key string) {} -func (c *Controller) MakeSecret(key string) {} +func (c *Controller) SetAbsoluteExpiry(key string, time int64) error { + return nil +} + +func (c *Controller) SetRelativateExpiry(key string, duration int64) error { + return nil +} + +func (c *Controller) MakeCrownJewel(key string) error { + return nil +} + +func (c *Controller) MakeSecret(key string) error { + return nil +} diff --git a/database/database.go b/database/database.go index 863604e..78373f4 100644 --- a/database/database.go +++ b/database/database.go @@ -4,17 +4,13 @@ package database import ( "errors" - "fmt" - "os" - "path" - "strings" - - "github.com/Safing/safing-core/database/dbutils" - "github.com/Safing/safing-core/log" - "github.com/Safing/safing-core/meta" ) -var ErrNotFound = errors.New("database: entry could not be found") +// Errors +var ( + ErrNotFound = errors.New("database: entry could not be found") + ErrPermissionDenied = errors.New("database: access to record denied") +) func init() { // if strings.HasSuffix(os.Args[0], ".test") { @@ -59,84 +55,66 @@ func init() { // return db.Close() // } -func Get(key *ds.Key) (Model, error) { - data, err := db.Get(*key) - if err != nil { - switch err { - case ds.ErrNotFound: - return nil, ErrNotFound - default: - return nil, err - } - } - model, ok := data.(Model) - if !ok { - return nil, errors.New("database did not return model") - } - return model, nil -} +// func Get(key *ds.Key) (Model, error) { +// data, err := db.Get(*key) +// if err != nil { +// switch err { +// case ds.ErrNotFound: +// return nil, ErrNotFound +// default: +// return nil, err +// } +// } +// model, ok := data.(Model) +// if !ok { +// return nil, errors.New("database did not return model") +// } +// return model, nil +// } -func GetAndEnsureModel(namespace *ds.Key, name string, model Model) (Model, error) { - newKey := namespace.ChildString(getTypeName(model)).Instance(name) - - data, err := Get(&newKey) - if err != nil { - return nil, err - } - - newModel, err := EnsureModel(data, model) - if err != nil { - return nil, err - } - - newModel.SetKey(&newKey) - - return newModel, nil -} - -func Has(key ds.Key) (exists bool, err error) { - return db.Has(key) -} - -func Create(key ds.Key, model Model) (err error) { - handleCreateSubscriptions(model) - err = db.Put(key, model) - if err != nil { - log.Tracef("database: failed to create entry %s: %s", key, err) - } - return err -} - -func Update(key ds.Key, model Model) (err error) { - handleUpdateSubscriptions(model) - err = db.Put(key, model) - if err != nil { - log.Tracef("database: failed to update entry %s: %s", key, err) - } - return err -} - -func Delete(key ds.Key) (err error) { - handleDeleteSubscriptions(&key) - return db.Delete(key) -} - -func Query(q dsq.Query) (dsq.Results, error) { - return db.Query(q) -} - -func RawGet(key ds.Key) (*dbutils.Wrapper, error) { - data, err := db.Get(key) - if err != nil { - return nil, err - } - wrapped, ok := data.(*dbutils.Wrapper) - if !ok { - return nil, errors.New("returned data is not a wrapper") - } - return wrapped, nil -} - -func RawPut(key ds.Key, value interface{}) error { - return db.Put(key, value) -} +// func Has(key ds.Key) (exists bool, err error) { +// return db.Has(key) +// } +// +// func Create(key ds.Key, model Model) (err error) { +// handleCreateSubscriptions(model) +// err = db.Put(key, model) +// if err != nil { +// log.Tracef("database: failed to create entry %s: %s", key, err) +// } +// return err +// } +// +// func Update(key ds.Key, model Model) (err error) { +// handleUpdateSubscriptions(model) +// err = db.Put(key, model) +// if err != nil { +// log.Tracef("database: failed to update entry %s: %s", key, err) +// } +// return err +// } +// +// func Delete(key ds.Key) (err error) { +// handleDeleteSubscriptions(&key) +// return db.Delete(key) +// } +// +// func Query(q dsq.Query) (dsq.Results, error) { +// return db.Query(q) +// } +// +// func RawGet(key ds.Key) (*dbutils.Wrapper, error) { +// data, err := db.Get(key) +// if err != nil { +// return nil, err +// } +// wrapped, ok := data.(*dbutils.Wrapper) +// if !ok { +// return nil, errors.New("returned data is not a wrapper") +// } +// return wrapped, nil +// } +// +// func RawPut(key ds.Key, value interface{}) error { +// return db.Put(key, value) +// } diff --git a/database/databases.go b/database/databases.go index b9cc0e9..6170c71 100644 --- a/database/databases.go +++ b/database/databases.go @@ -1,56 +1,95 @@ package database +import ( + "errors" + "sync" + "fmt" + "path" -var ( - databases = make(map[string]*Controller) - databasesLock sync.Mutex + "github.com/Safing/portbase/database/storage" + "github.com/Safing/portbase/database/record" ) -func getDatabase(name string) *Controller { - databasesLock.Lock() - defer databasesLock.Unlock() - storage, ok := databases[name] - if ok { - return - } -} +var ( + databases = make(map[string]*Controller) + databasesLock sync.Mutex +) -func databaseExists(name string) (exists bool) { - // check if folder exists - return true -} - -// CreateDatabase creates a new database with given name and type. -func CreateDatabase(name string, storageType string) error { - databasesLock.Lock() - defer databasesLock.Unlock() - _, ok := databases[name] - if ok { - return errors.New("database with this name already loaded.") - } - if databaseExists(name) { - return errors.New("database with this name already exists.") - } - - iface, err := startDatabase(name) +func splitKeyAndGetDatabase(key string) (dbKey string, db *Controller, err error) { + var dbName string + dbName, dbKey = record.ParseKey(key) + db, err = getDatabase(dbName) if err != nil { - return err + return "", nil, err } - databases[name] = iface - return nil + return +} + +func getDatabase(name string) (*Controller, error) { + if !initialized.IsSet() { + return nil, errors.New("database not initialized") + } + + databasesLock.Lock() + defer databasesLock.Unlock() + + // return database if already started + db, ok := databases[name] + if ok { + return db, nil + } + + registryLock.Lock() + defer registryLock.Unlock() + + // check if database exists at all + registeredDB, ok := registry[name] + if !ok { + return nil, fmt.Errorf(`database "%s" not registered`, name) + } + + // start database + storageInt, err := storage.StartDatabase(name, registeredDB.StorageType, path.Join(rootDir, name, registeredDB.StorageType)) + if err != nil { + return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err) + } + + db, err = newController(storageInt) + if err != nil { + return nil, fmt.Errorf(`could not create controller for database %s: %s`, name, err) + } + + databases[name] = db + return db, nil } // InjectDatabase injects an already running database into the system. -func InjectDatabase(name string, iface *storage.Interface) error { - databasesLock.Lock() - defer databasesLock.Unlock() - _, ok := databases[name] - if ok { - return errors.New("database with this name already loaded.") +func InjectDatabase(name string, storageInt storage.Interface) error { + databasesLock.Lock() + defer databasesLock.Unlock() + + _, ok := databases[name] + if ok { + return errors.New(`database "%s" already loaded`) + } + + registryLock.Lock() + defer registryLock.Unlock() + + // check if database is registered + registeredDB, ok := registry[name] + if !ok { + return fmt.Errorf(`database "%s" not registered`, name) } - if databaseExists(name) { - return errors.New("database with this name already exists.") + if registeredDB.StorageType != "injected" { + return fmt.Errorf(`database not of type "injected"`) } - databases[name] = iface - return nil + + db, err := newController(storageInt) + if err != nil { + return fmt.Errorf(`could not create controller for database %s: %s`, name, err) + } + + databases[name] = db + return nil } diff --git a/database/dbmodule/db.go b/database/dbmodule/db.go new file mode 100644 index 0000000..6c38c1f --- /dev/null +++ b/database/dbmodule/db.go @@ -0,0 +1,29 @@ +package dbmodule + +import ( + "github.com/Safing/portbase/database" +) + +var ( + databaseDir string +) + +func init() { + flag.StringVar(&databaseDir, "db", "", "set database directory") + + modules.Register("database", prep, start, stop) +} + +func prep() error { + if databaseDir == "" { + return errors.New("no database location specified, set with `-db=/path/to/db`") + } +} + +func start() error { + return database.Initialize(databaseDir) +} + +func stop() { + return database.Shutdown() +} diff --git a/database/easyquery.go b/database/easyquery.go deleted file mode 100644 index ee4db82..0000000 --- a/database/easyquery.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package database - -import ( - "errors" - "fmt" - "strings" - - dsq "github.com/ipfs/go-datastore/query" -) - -type FilterMaxDepth struct { - MaxDepth int -} - -func (f FilterMaxDepth) Filter(entry dsq.Entry) bool { - return strings.Count(entry.Key, "/") <= f.MaxDepth -} - -type FilterKeyLength struct { - Length int -} - -func (f FilterKeyLength) Filter(entry dsq.Entry) bool { - return len(entry.Key) == f.Length -} - -func EasyQueryIterator(subscriptionKey string) (dsq.Results, error) { - query := dsq.Query{} - - namespaces := strings.Split(subscriptionKey, "/")[1:] - lastSpace := "" - if len(namespaces) != 0 { - lastSpace = namespaces[len(namespaces)-1] - } - - switch { - case lastSpace == "": - // get all children - query.Prefix = subscriptionKey - case strings.HasPrefix(lastSpace, "*"): - // get children to defined depth - query.Prefix = strings.Trim(subscriptionKey, "*") - query.Filters = []dsq.Filter{ - FilterMaxDepth{len(lastSpace) + len(namespaces) - 1}, - } - case strings.Contains(lastSpace, ":"): - query.Prefix = subscriptionKey - query.Filters = []dsq.Filter{ - FilterKeyLength{len(query.Prefix)}, - } - default: - // get only from this location and this type - query.Prefix = subscriptionKey + ":" - query.Filters = []dsq.Filter{ - FilterMaxDepth{len(namespaces)}, - } - } - - // log.Tracef("easyquery: %s has prefix %s", subscriptionKey, query.Prefix) - - results, err := db.Query(query) - if err != nil { - return nil, errors.New(fmt.Sprintf("easyquery: %s", err)) - } - - return results, nil -} - -func EasyQuery(subscriptionKey string) (*[]dsq.Entry, error) { - - results, err := EasyQueryIterator(subscriptionKey) - if err != nil { - return nil, err - } - - entries, err := results.Rest() - if err != nil { - return nil, errors.New(fmt.Sprintf("easyquery: %s", err)) - } - - return &entries, nil -} diff --git a/database/easyquery_test.go b/database/easyquery_test.go deleted file mode 100644 index 914c0b8..0000000 --- a/database/easyquery_test.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package database - -import ( - "testing" - - datastore "github.com/ipfs/go-datastore" -) - -func testQuery(t *testing.T, queryString string, expecting []string) { - - entries, err := EasyQuery(queryString) - if err != nil { - t.Errorf("error in query %s: %s", queryString, err) - } - - totalExcepted := len(expecting) - total := 0 - fail := false - - keys := datastore.EntryKeys(*entries) - -resultLoop: - for _, key := range keys { - total++ - for _, expectedName := range expecting { - if key.Name() == expectedName { - continue resultLoop - } - } - fail = true - break - } - - if !fail && total == totalExcepted { - return - } - - t.Errorf("Query %s got %s, expected %s", queryString, keys, expecting) - -} - -func TestEasyQuery(t *testing.T) { - - // setup test data - (&(TestingModel{})).CreateInNamespace("EasyQuery", "1") - (&(TestingModel{})).CreateInNamespace("EasyQuery", "2") - (&(TestingModel{})).CreateInNamespace("EasyQuery", "3") - (&(TestingModel{})).CreateInNamespace("EasyQuery/A", "4") - (&(TestingModel{})).CreateInNamespace("EasyQuery/A/B", "5") - (&(TestingModel{})).CreateInNamespace("EasyQuery/A/B/C", "6") - (&(TestingModel{})).CreateInNamespace("EasyQuery/A/B/C/D", "7") - - (&(TestingModel{})).CreateWithTypeName("EasyQuery", "ConfigModel", "X") - (&(TestingModel{})).CreateWithTypeName("EasyQuery", "ConfigModel", "Y") - (&(TestingModel{})).CreateWithTypeName("EasyQuery/A", "ConfigModel", "Z") - - testQuery(t, "/Tests/EasyQuery/TestingModel", []string{"1", "2", "3"}) - testQuery(t, "/Tests/EasyQuery/TestingModel:1", []string{"1"}) - - testQuery(t, "/Tests/EasyQuery/ConfigModel", []string{"X", "Y"}) - testQuery(t, "/Tests/EasyQuery/ConfigModel:Y", []string{"Y"}) - - testQuery(t, "/Tests/EasyQuery/A/", []string{"Z", "4", "5", "6", "7"}) - testQuery(t, "/Tests/EasyQuery/A/B/**", []string{"5", "6"}) - -} diff --git a/database/interface.go b/database/interface.go index dd56955..c765559 100644 --- a/database/interface.go +++ b/database/interface.go @@ -1,27 +1,69 @@ package database +import ( + "github.com/Safing/portbase/database/record" +) + // Interface provides a method to access the database with attached options. -type Interface struct {} +type Interface struct { + options *Options +} // Options holds options that may be set for an Interface instance. type Options struct { - Local bool - Internal bool - AlwaysMakeSecret bool - AlwaysMakeCrownjewel bool + Local bool + Internal bool + AlwaysMakeSecret bool + AlwaysMakeCrownjewel bool } // NewInterface returns a new Interface to the database. func NewInterface(opts *Options) *Interface { - return &Interface{ - local: local, - internal: internal, - } + if opts == nil { + opts = &Options{} + } + + return &Interface{ + options: opts, + } } +// Exists return whether a record with the given key exists. +func (i *Interface) Exists(key string) (bool, error) { + _, err := i.getRecord(key) + if err != nil { + if err == ErrNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +// Get return the record with the given key. func (i *Interface) Get(key string) (record.Record, error) { + r, err := i.getRecord(key) + if err != nil { + return nil, err + } - controller + if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { + return nil, ErrPermissionDenied + } - return nil, nil + return r, nil +} + +func (i *Interface) getRecord(key string) (record.Record, error) { + dbKey, db, err := splitKeyAndGetDatabase(key) + if err != nil { + return nil, err + } + + r, err := db.Get(dbKey) + if err != nil { + return nil, err + } + + return r, nil } diff --git a/database/location.go b/database/location.go new file mode 100644 index 0000000..c0fdb78 --- /dev/null +++ b/database/location.go @@ -0,0 +1,14 @@ +package database + +import ( + "path" +) + +var ( + rootDir string +) + +// getLocation returns the storage location for the given name and type. +func getLocation(name, storageType string) (location string, err error) { + return path.Join(rootDir, name, storageType), nil +} diff --git a/database/model.go b/database/model.go deleted file mode 100644 index 1aa7634..0000000 --- a/database/model.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package database - -import ( - "fmt" - "sync" - - "github.com/Safing/safing-core/database/dbutils" - "github.com/Safing/safing-core/formats/dsd" -) - -// Model Registration - -var ( - registeredModels = make(map[string]func() Model) - registeredModelsLock sync.RWMutex -) - -func RegisterModel(model Model, constructor func() Model) { - registeredModelsLock.Lock() - defer registeredModelsLock.Unlock() - registeredModels[fmt.Sprintf("%T", model)] = constructor -} - -func NewModel(model Model) (Model, error) { - registeredModelsLock.RLock() - defer registeredModelsLock.RUnlock() - constructor, ok := registeredModels[fmt.Sprintf("%T", model)] - if !ok { - return nil, fmt.Errorf("database: cannot create new %T, not registered", model) - } - return constructor(), nil -} - -func EnsureModel(uncertain, model Model) (Model, error) { - wrappedObj, ok := uncertain.(*dbutils.Wrapper) - if !ok { - return uncertain, nil - } - newModel, err := NewModel(model) - if err != nil { - return nil, err - } - _, err = dsd.Load(wrappedObj.Data, &newModel) - if err != nil { - return nil, fmt.Errorf("database: failed to unwrap %T: %s", model, err) - } - newModel.SetKey(wrappedObj.GetKey()) - model = newModel - return newModel, nil -} - -func SilentEnsureModel(uncertain, model Model) Model { - obj, err := EnsureModel(uncertain, model) - if err != nil { - return nil - } - return obj -} - -func NewMismatchError(got, expected interface{}) error { - return fmt.Errorf("database: entry (%T) does not match expected model (%T)", got, expected) -} diff --git a/database/model_test.go b/database/model_test.go deleted file mode 100644 index 9d1f486..0000000 --- a/database/model_test.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package database - -import ( - "testing" - - datastore "github.com/ipfs/go-datastore" -) - -type TestingModel struct { - Base - Name string - Value string -} - -var testingModel *TestingModel - -func init() { - RegisterModel(testingModel, func() Model { return new(TestingModel) }) -} - -func (m *TestingModel) Create(name string) error { - return m.CreateObject(&Tests, name, m) -} - -func (m *TestingModel) CreateInNamespace(namespace string, name string) error { - testsNamescace := Tests.ChildString(namespace) - return m.CreateObject(&testsNamescace, name, m) -} - -func (m *TestingModel) CreateWithTypeName(namespace string, typeName string, name string) error { - customNamespace := Tests.ChildString(namespace).ChildString(typeName).Instance(name) - - m.dbKey = &customNamespace - handleCreateSubscriptions(m) - return Create(*m.dbKey, m) -} - -func (m *TestingModel) Save() error { - return m.SaveObject(m) -} - -func GetTestingModel(name string) (*TestingModel, error) { - return GetTestingModelFromNamespace(&Tests, name) -} - -func GetTestingModelFromNamespace(namespace *datastore.Key, name string) (*TestingModel, error) { - object, err := GetAndEnsureModel(namespace, name, testingModel) - if err != nil { - return nil, err - } - model, ok := object.(*TestingModel) - if !ok { - return nil, NewMismatchError(object, testingModel) - } - return model, nil -} - -func TestModel(t *testing.T) { - - // create - m := TestingModel{ - Name: "a", - Value: "b", - } - // newKey := datastore.NewKey("/Tests/TestingModel:test") - // m.dbKey = &newKey - // err := Put(*m.dbKey, m) - err := m.Create("") - if err != nil { - t.Errorf("database test: could not create object: %s", err) - } - - // get - o, err := GetTestingModel(m.dbKey.Name()) - if err != nil { - t.Errorf("database test: failed to get model: %s (%s)", err, m.dbKey.Name()) - } - - // check fetched object - if o.Name != "a" || o.Value != "b" { - t.Errorf("database test: values do not match: got Name=%s and Value=%s", o.Name, o.Value) - } - - // o, err := Get(*m.dbKey) - // if err != nil { - // t.Errorf("database: could not get object: %s", err) - // } - // n, ok := o.(*TestingModel) - // if !ok { - // t.Errorf("database: wrong type, got type %T from %s", o, m.dbKey.String()) - // } - - // save - o.Value = "c" - err = o.Save() - if err != nil { - t.Errorf("database test: could not save object: %s", err) - } - - // delete - err = o.Delete() - if err != nil { - t.Errorf("database test: could not delete object: %s", err) - } - -} diff --git a/database/queries.go b/database/queries.go deleted file mode 100644 index 3c5ea13..0000000 --- a/database/queries.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package database - -import ( - "time" - - "github.com/Safing/safing-core/formats/dsd" - "github.com/Safing/safing-core/log" - - dsq "github.com/ipfs/go-datastore/query" -) - -func init() { - // go dumper() -} - -func dumper() { - for { - time.Sleep(10 * time.Second) - result, err := db.Query(dsq.Query{Prefix: "/Run/Process"}) - if err != nil { - log.Warningf("Query failed: %s", err) - continue - } - log.Infof("Dumping all processes:") - for model, ok := result.NextSync(); ok; model, ok = result.NextSync() { - bytes, err := dsd.Dump(model, dsd.AUTO) - if err != nil { - log.Warningf("Error dumping: %s", err) - continue - } - log.Info(string(bytes)) - } - log.Infof("END") - } -} diff --git a/database/record/base.go b/database/record/base.go index 30fcfc1..d1e2b04 100644 --- a/database/record/base.go +++ b/database/record/base.go @@ -85,3 +85,8 @@ func (b *Base) MarshalRecord() ([]byte, error) { return c.CompileData(), nil } + +// IsWrapped returns whether the record is a Wrapper. +func (b *Base) IsWrapped() bool { + return false +} diff --git a/database/record/meta-bench_test.go b/database/record/meta-bench_test.go index 2c62083..ca845c6 100644 --- a/database/record/meta-bench_test.go +++ b/database/record/meta-bench_test.go @@ -32,10 +32,10 @@ import ( var ( testMeta = &Meta{ - created: time.Now().Unix(), - modified: time.Now().Unix(), - expires: time.Now().Unix(), - deleted: time.Now().Unix(), + Created: time.Now().Unix(), + Modified: time.Now().Unix(), + Expires: time.Now().Unix(), + Deleted: time.Now().Unix(), secret: true, cronjewel: true, } @@ -65,10 +65,10 @@ func BenchmarkMetaSerializeContainer(b *testing.B) { // Start benchmark for i := 0; i < b.N; i++ { c := container.New() - c.AppendNumber(uint64(testMeta.created)) - c.AppendNumber(uint64(testMeta.modified)) - c.AppendNumber(uint64(testMeta.expires)) - c.AppendNumber(uint64(testMeta.deleted)) + c.AppendNumber(uint64(testMeta.Created)) + c.AppendNumber(uint64(testMeta.Modified)) + c.AppendNumber(uint64(testMeta.Expires)) + c.AppendNumber(uint64(testMeta.Deleted)) switch { case testMeta.secret && testMeta.cronjewel: c.AppendNumber(3) @@ -87,10 +87,10 @@ func BenchmarkMetaUnserializeContainer(b *testing.B) { // Setup c := container.New() - c.AppendNumber(uint64(testMeta.created)) - c.AppendNumber(uint64(testMeta.modified)) - c.AppendNumber(uint64(testMeta.expires)) - c.AppendNumber(uint64(testMeta.deleted)) + c.AppendNumber(uint64(testMeta.Created)) + c.AppendNumber(uint64(testMeta.Modified)) + c.AppendNumber(uint64(testMeta.Expires)) + c.AppendNumber(uint64(testMeta.Deleted)) switch { case testMeta.secret && testMeta.cronjewel: c.AppendNumber(3) @@ -113,25 +113,25 @@ func BenchmarkMetaUnserializeContainer(b *testing.B) { var num uint64 c := container.New(encodedData) num, err = c.GetNextN64() - newMeta.created = int64(num) + newMeta.Created = int64(num) if err != nil { b.Errorf("could not decode: %s", err) return } num, err = c.GetNextN64() - newMeta.modified = int64(num) + newMeta.Modified = int64(num) if err != nil { b.Errorf("could not decode: %s", err) return } num, err = c.GetNextN64() - newMeta.expires = int64(num) + newMeta.Expires = int64(num) if err != nil { b.Errorf("could not decode: %s", err) return } num, err = c.GetNextN64() - newMeta.deleted = int64(num) + newMeta.Deleted = int64(num) if err != nil { b.Errorf("could not decode: %s", err) return @@ -166,22 +166,22 @@ func BenchmarkMetaSerializeVarInt(b *testing.B) { for i := 0; i < b.N; i++ { encoded := make([]byte, 33) offset := 0 - data := varint.Pack64(uint64(testMeta.created)) + data := varint.Pack64(uint64(testMeta.Created)) for _, part := range data { encoded[offset] = part offset++ } - data = varint.Pack64(uint64(testMeta.modified)) + data = varint.Pack64(uint64(testMeta.Modified)) for _, part := range data { encoded[offset] = part offset++ } - data = varint.Pack64(uint64(testMeta.expires)) + data = varint.Pack64(uint64(testMeta.Expires)) for _, part := range data { encoded[offset] = part offset++ } - data = varint.Pack64(uint64(testMeta.deleted)) + data = varint.Pack64(uint64(testMeta.Deleted)) for _, part := range data { encoded[offset] = part offset++ @@ -207,22 +207,22 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) { // Setup encoded := make([]byte, 33) offset := 0 - data := varint.Pack64(uint64(testMeta.created)) + data := varint.Pack64(uint64(testMeta.Created)) for _, part := range data { encoded[offset] = part offset++ } - data = varint.Pack64(uint64(testMeta.modified)) + data = varint.Pack64(uint64(testMeta.Modified)) for _, part := range data { encoded[offset] = part offset++ } - data = varint.Pack64(uint64(testMeta.expires)) + data = varint.Pack64(uint64(testMeta.Expires)) for _, part := range data { encoded[offset] = part offset++ } - data = varint.Pack64(uint64(testMeta.deleted)) + data = varint.Pack64(uint64(testMeta.Deleted)) for _, part := range data { encoded[offset] = part offset++ @@ -254,7 +254,7 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) { b.Error(err) return } - testMeta.created = int64(num) + testMeta.Created = int64(num) offset += n num, n, err = varint.Unpack64(encodedData[offset:]) @@ -262,7 +262,7 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) { b.Error(err) return } - testMeta.modified = int64(num) + testMeta.Modified = int64(num) offset += n num, n, err = varint.Unpack64(encodedData[offset:]) @@ -270,7 +270,7 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) { b.Error(err) return } - testMeta.expires = int64(num) + testMeta.Expires = int64(num) offset += n num, n, err = varint.Unpack64(encodedData[offset:]) @@ -278,7 +278,7 @@ func BenchmarkMetaUnserializeVarInt(b *testing.B) { b.Error(err) return } - testMeta.deleted = int64(num) + testMeta.Deleted = int64(num) offset += n switch encodedData[offset] { diff --git a/database/record/meta-gencode.go b/database/record/meta-gencode.go index d79edff..c0a2142 100644 --- a/database/record/meta-gencode.go +++ b/database/record/meta-gencode.go @@ -33,78 +33,78 @@ func (d *Meta) GenCodeMarshal(buf []byte) ([]byte, error) { { - buf[0+0] = byte(d.created >> 0) + buf[0+0] = byte(d.Created >> 0) - buf[1+0] = byte(d.created >> 8) + buf[1+0] = byte(d.Created >> 8) - buf[2+0] = byte(d.created >> 16) + buf[2+0] = byte(d.Created >> 16) - buf[3+0] = byte(d.created >> 24) + buf[3+0] = byte(d.Created >> 24) - buf[4+0] = byte(d.created >> 32) + buf[4+0] = byte(d.Created >> 32) - buf[5+0] = byte(d.created >> 40) + buf[5+0] = byte(d.Created >> 40) - buf[6+0] = byte(d.created >> 48) + buf[6+0] = byte(d.Created >> 48) - buf[7+0] = byte(d.created >> 56) + buf[7+0] = byte(d.Created >> 56) } { - buf[0+8] = byte(d.modified >> 0) + buf[0+8] = byte(d.Modified >> 0) - buf[1+8] = byte(d.modified >> 8) + buf[1+8] = byte(d.Modified >> 8) - buf[2+8] = byte(d.modified >> 16) + buf[2+8] = byte(d.Modified >> 16) - buf[3+8] = byte(d.modified >> 24) + buf[3+8] = byte(d.Modified >> 24) - buf[4+8] = byte(d.modified >> 32) + buf[4+8] = byte(d.Modified >> 32) - buf[5+8] = byte(d.modified >> 40) + buf[5+8] = byte(d.Modified >> 40) - buf[6+8] = byte(d.modified >> 48) + buf[6+8] = byte(d.Modified >> 48) - buf[7+8] = byte(d.modified >> 56) + buf[7+8] = byte(d.Modified >> 56) } { - buf[0+16] = byte(d.expires >> 0) + buf[0+16] = byte(d.Expires >> 0) - buf[1+16] = byte(d.expires >> 8) + buf[1+16] = byte(d.Expires >> 8) - buf[2+16] = byte(d.expires >> 16) + buf[2+16] = byte(d.Expires >> 16) - buf[3+16] = byte(d.expires >> 24) + buf[3+16] = byte(d.Expires >> 24) - buf[4+16] = byte(d.expires >> 32) + buf[4+16] = byte(d.Expires >> 32) - buf[5+16] = byte(d.expires >> 40) + buf[5+16] = byte(d.Expires >> 40) - buf[6+16] = byte(d.expires >> 48) + buf[6+16] = byte(d.Expires >> 48) - buf[7+16] = byte(d.expires >> 56) + buf[7+16] = byte(d.Expires >> 56) } { - buf[0+24] = byte(d.deleted >> 0) + buf[0+24] = byte(d.Deleted >> 0) - buf[1+24] = byte(d.deleted >> 8) + buf[1+24] = byte(d.Deleted >> 8) - buf[2+24] = byte(d.deleted >> 16) + buf[2+24] = byte(d.Deleted >> 16) - buf[3+24] = byte(d.deleted >> 24) + buf[3+24] = byte(d.Deleted >> 24) - buf[4+24] = byte(d.deleted >> 32) + buf[4+24] = byte(d.Deleted >> 32) - buf[5+24] = byte(d.deleted >> 40) + buf[5+24] = byte(d.Deleted >> 40) - buf[6+24] = byte(d.deleted >> 48) + buf[6+24] = byte(d.Deleted >> 48) - buf[7+24] = byte(d.deleted >> 56) + buf[7+24] = byte(d.Deleted >> 56) } { @@ -134,22 +134,22 @@ func (d *Meta) GenCodeUnmarshal(buf []byte) (uint64, error) { { - d.created = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56) + d.Created = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56) } { - d.modified = 0 | (int64(buf[0+8]) << 0) | (int64(buf[1+8]) << 8) | (int64(buf[2+8]) << 16) | (int64(buf[3+8]) << 24) | (int64(buf[4+8]) << 32) | (int64(buf[5+8]) << 40) | (int64(buf[6+8]) << 48) | (int64(buf[7+8]) << 56) + d.Modified = 0 | (int64(buf[0+8]) << 0) | (int64(buf[1+8]) << 8) | (int64(buf[2+8]) << 16) | (int64(buf[3+8]) << 24) | (int64(buf[4+8]) << 32) | (int64(buf[5+8]) << 40) | (int64(buf[6+8]) << 48) | (int64(buf[7+8]) << 56) } { - d.expires = 0 | (int64(buf[0+16]) << 0) | (int64(buf[1+16]) << 8) | (int64(buf[2+16]) << 16) | (int64(buf[3+16]) << 24) | (int64(buf[4+16]) << 32) | (int64(buf[5+16]) << 40) | (int64(buf[6+16]) << 48) | (int64(buf[7+16]) << 56) + d.Expires = 0 | (int64(buf[0+16]) << 0) | (int64(buf[1+16]) << 8) | (int64(buf[2+16]) << 16) | (int64(buf[3+16]) << 24) | (int64(buf[4+16]) << 32) | (int64(buf[5+16]) << 40) | (int64(buf[6+16]) << 48) | (int64(buf[7+16]) << 56) } { - d.deleted = 0 | (int64(buf[0+24]) << 0) | (int64(buf[1+24]) << 8) | (int64(buf[2+24]) << 16) | (int64(buf[3+24]) << 24) | (int64(buf[4+24]) << 32) | (int64(buf[5+24]) << 40) | (int64(buf[6+24]) << 48) | (int64(buf[7+24]) << 56) + d.Deleted = 0 | (int64(buf[0+24]) << 0) | (int64(buf[1+24]) << 8) | (int64(buf[2+24]) << 16) | (int64(buf[3+24]) << 24) | (int64(buf[4+24]) << 32) | (int64(buf[5+24]) << 40) | (int64(buf[6+24]) << 48) | (int64(buf[7+24]) << 56) } { diff --git a/database/record/meta-gencode_test.go b/database/record/meta-gencode_test.go index baea558..7050e7d 100644 --- a/database/record/meta-gencode_test.go +++ b/database/record/meta-gencode_test.go @@ -8,10 +8,10 @@ import ( var ( genCodeTestMeta = &Meta{ - created: time.Now().Unix(), - modified: time.Now().Unix(), - expires: time.Now().Unix(), - deleted: time.Now().Unix(), + Created: time.Now().Unix(), + Modified: time.Now().Unix(), + Expires: time.Now().Unix(), + Deleted: time.Now().Unix(), secret: true, cronjewel: true, } diff --git a/database/record/meta.go b/database/record/meta.go index c561a17..8ef2810 100644 --- a/database/record/meta.go +++ b/database/record/meta.go @@ -4,39 +4,39 @@ import "time" // Meta holds type Meta struct { - created int64 - modified int64 - expires int64 - deleted int64 + Created int64 + Modified int64 + Expires int64 + Deleted int64 secret bool // secrets must not be sent to the UI, only synced between nodes cronjewel bool // crownjewels must never leave the instance, but may be read by the UI } // SetAbsoluteExpiry sets an absolute expiry time (in seconds), that is not affected when the record is updated. func (m *Meta) SetAbsoluteExpiry(seconds int64) { - m.expires = seconds - m.deleted = 0 + m.Expires = seconds + m.Deleted = 0 } // SetRelativateExpiry sets a relative expiry time (ie. TTL in seconds) that is automatically updated whenever the record is updated/saved. func (m *Meta) SetRelativateExpiry(seconds int64) { if seconds >= 0 { - m.deleted = -seconds + m.Deleted = -seconds } } // GetAbsoluteExpiry returns the absolute expiry time. func (m *Meta) GetAbsoluteExpiry() int64 { - return m.expires + return m.Expires } // GetRelativeExpiry returns the current relative expiry time - ie. seconds until expiry. func (m *Meta) GetRelativeExpiry() int64 { - if m.deleted < 0 { - return -m.deleted + if m.Deleted < 0 { + return -m.Deleted } - abs := m.expires - time.Now().Unix() + abs := m.Expires - time.Now().Unix() if abs < 0 { return 0 } @@ -56,30 +56,38 @@ func (m *Meta) MakeSecret() { // Update updates the internal meta states and should be called before writing the record to the database. func (m *Meta) Update() { now := time.Now().Unix() - m.modified = now - if m.created == 0 { - m.created = now + m.Modified = now + if m.Created == 0 { + m.Created = now } - if m.deleted < 0 { - m.expires = now - m.deleted + if m.Deleted < 0 { + m.Expires = now - m.Deleted } } // Reset resets all metadata, except for the secret and crownjewel status. func (m *Meta) Reset() { - m.created = 0 - m.modified = 0 - m.expires = 0 - m.deleted = 0 + m.Created = 0 + m.Modified = 0 + m.Expires = 0 + m.Deleted = 0 } -// CheckScope checks whether the current database record exists for the given scope. -func (m *Meta) CheckScope(now int64, local, internal bool) (recordExists bool) { +// CheckValidity checks whether the database record is valid. +func (m *Meta) CheckValidity(now int64) (valid bool) { switch { - case m.deleted > 0: + case m.Deleted > 0: return false - case m.expires < now: + case m.Expires < now: return false + default: + return true + } +} + +// CheckPermission checks whether the database record may be accessed with the following scope. +func (m *Meta) CheckPermission(local, internal bool) (permitted bool) { + switch { case !local && m.cronjewel: return false case !internal && m.secret: diff --git a/database/record/record.go b/database/record/record.go index c002f25..c40fca2 100644 --- a/database/record/record.go +++ b/database/record/record.go @@ -16,4 +16,6 @@ type Record interface { Lock() Unlock() + + IsWrapped() bool } diff --git a/database/record/wrapper.go b/database/record/wrapper.go index 3f73263..63b0367 100644 --- a/database/record/wrapper.go +++ b/database/record/wrapper.go @@ -118,3 +118,25 @@ func (w *Wrapper) Lock() { func (w *Wrapper) Unlock() { w.lock.Unlock() } + +// IsWrapped returns whether the record is a Wrapper. +func (w *Wrapper) IsWrapped() bool { + return true +} + +func Unwrap(wrapped, new Record) (Record, error) { + wrapper, ok := wrapped.(*Wrapper) + if !ok { + return nil, fmt.Errorf("cannot unwrap %T", wrapped) + } + + _, err := dsd.Load(wrapper.Data, new) + if err != nil { + return nil, fmt.Errorf("database: failed to unwrap %T: %s", new, err) + } + + new.SetKey(wrapped.Key()) + new.SetMeta(wrapped.Meta()) + + return new, nil +} diff --git a/database/registry.go b/database/registry.go new file mode 100644 index 0000000..94734af --- /dev/null +++ b/database/registry.go @@ -0,0 +1,145 @@ +package database + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "path" + "sync" + + "github.com/tevino/abool" +) + +// RegisteredDatabase holds information about registered databases +type RegisteredDatabase struct { + Name string + Description string + StorageType string + PrimaryAPI string +} + +// Equal returns whether this instance equals another. +func (r *RegisteredDatabase) Equal(o *RegisteredDatabase) bool { + if r.Name != o.Name || + r.Description != o.Description || + r.StorageType != o.StorageType || + r.PrimaryAPI != o.PrimaryAPI { + return false + } + return true +} + +const ( + registryFileName = "databases.json" +) + +var ( + initialized = abool.NewBool(false) + + registry map[string]*RegisteredDatabase + registryLock sync.Mutex +) + +// RegisterDatabase registers a new database. +func RegisterDatabase(new *RegisteredDatabase) error { + if !initialized.IsSet() { + return errors.New("database not initialized") + } + + registryLock.Lock() + defer registryLock.Unlock() + + registeredDB, ok := registry[new.Name] + if !ok || !new.Equal(registeredDB) { + registry[new.Name] = new + return saveRegistry() + } + + return nil +} + +// Initialize initialized the database +func Initialize(location string) error { + if initialized.SetToIf(false, true) { + rootDir = location + + err := checkRootDir() + if err != nil { + return fmt.Errorf("could not create/open database directory (%s): %s", rootDir, err) + } + + err = loadRegistry() + if err != nil { + return fmt.Errorf("could not load database registry (%s): %s", path.Join(rootDir, registryFileName), err) + } + + return nil + } + return errors.New("database already initialized") +} + +func checkRootDir() error { + // open dir + dir, err := os.Open(rootDir) + if err != nil { + if err == os.ErrNotExist { + return os.MkdirAll(rootDir, 0700) + } + return err + } + defer dir.Close() + + fileInfo, err := dir.Stat() + if err != nil { + return err + } + + if fileInfo.Mode().Perm() != 0700 { + return dir.Chmod(0700) + } + return nil +} + +func loadRegistry() error { + registryLock.Lock() + defer registryLock.Unlock() + + // read file + filePath := path.Join(rootDir, registryFileName) + data, err := ioutil.ReadFile(filePath) + if err != nil { + if err == os.ErrNotExist { + registry = make(map[string]*RegisteredDatabase) + return nil + } + return err + } + + // parse + new := make(map[string]*RegisteredDatabase) + err = json.Unmarshal(data, new) + if err != nil { + return err + } + + // set + registry = new + return nil +} + +func saveRegistry() error { + registryLock.Lock() + defer registryLock.Unlock() + + // marshal + data, err := json.Marshal(registry) + if err != nil { + return err + } + + // write file + filePath := path.Join(rootDir, registryFileName) + return ioutil.WriteFile(filePath, data, 0600) +} diff --git a/database/storage/interface.go b/database/storage/interface.go index ae19a90..3832ca4 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -2,13 +2,12 @@ package storage import ( "github.com/Safing/portbase/database/iterator" - "github.com/Safing/portbase/database/record" "github.com/Safing/portbase/database/query" + "github.com/Safing/portbase/database/record" ) // Interface defines the database storage API. type Interface interface { - Exists(key string) (bool, error) Get(key string) (record.Record, error) Put(m record.Record) error Delete(key string) error diff --git a/database/storage/storages.go b/database/storage/storages.go index f4a3c27..948af83 100644 --- a/database/storage/storages.go +++ b/database/storage/storages.go @@ -28,6 +28,11 @@ func Register(name string, factory Factory) error { return nil } +// CreateDatabase starts a new database with the given name and storageType at location. +func CreateDatabase(name, storageType, location string) (Interface, error) { + return nil, nil +} + // StartDatabase starts a new database with the given name and storageType at location. func StartDatabase(name, storageType, location string) (Interface, error) { storagesLock.Lock() diff --git a/database/subscriptions.go b/database/subscriptions.go index 684016b..460713e 100644 --- a/database/subscriptions.go +++ b/database/subscriptions.go @@ -2,271 +2,271 @@ package database -import ( - "fmt" - "strings" - "sync" - - "github.com/Safing/safing-core/modules" - "github.com/Safing/safing-core/taskmanager" - - "github.com/ipfs/go-datastore" - "github.com/tevino/abool" -) - -var subscriptionModule *modules.Module -var subscriptions []*Subscription -var subLock sync.Mutex - -var databaseUpdate chan Model -var databaseCreate chan Model -var databaseDelete chan *datastore.Key - -var workIsWaiting chan *struct{} -var workIsWaitingFlag *abool.AtomicBool -var forceProcessing chan *struct{} - -type Subscription struct { - typeAndLocation map[string]bool - exactObject map[string]bool - children map[string]uint8 - Created chan Model - Updated chan Model - Deleted chan *datastore.Key -} - -func NewSubscription() *Subscription { - subLock.Lock() - defer subLock.Unlock() - sub := &Subscription{ - typeAndLocation: make(map[string]bool), - exactObject: make(map[string]bool), - children: make(map[string]uint8), - Created: make(chan Model, 128), - Updated: make(chan Model, 128), - Deleted: make(chan *datastore.Key, 128), - } - subscriptions = append(subscriptions, sub) - return sub -} - -func (sub *Subscription) Subscribe(subKey string) { - subLock.Lock() - defer subLock.Unlock() - - namespaces := strings.Split(subKey, "/")[1:] - lastSpace := "" - if len(namespaces) != 0 { - lastSpace = namespaces[len(namespaces)-1] - } - - switch { - case lastSpace == "": - // save key without leading "/" - // save with depth 255 to get all - sub.children[strings.Trim(subKey, "/")] = 0xFF - case strings.HasPrefix(lastSpace, "*"): - // save key without leading or trailing "/" or "*" - // save full wanted depth - this makes comparison easier - sub.children[strings.Trim(subKey, "/*")] = uint8(len(lastSpace) + len(namespaces) - 1) - case strings.Contains(lastSpace, ":"): - sub.exactObject[subKey] = true - default: - sub.typeAndLocation[subKey] = true - } -} - -func (sub *Subscription) Unsubscribe(subKey string) { - subLock.Lock() - defer subLock.Unlock() - - namespaces := strings.Split(subKey, "/")[1:] - lastSpace := "" - if len(namespaces) != 0 { - lastSpace = namespaces[len(namespaces)-1] - } - - switch { - case lastSpace == "": - delete(sub.children, strings.Trim(subKey, "/")) - case strings.HasPrefix(lastSpace, "*"): - delete(sub.children, strings.Trim(subKey, "/*")) - case strings.Contains(lastSpace, ":"): - delete(sub.exactObject, subKey) - default: - delete(sub.typeAndLocation, subKey) - } -} - -func (sub *Subscription) Destroy() { - subLock.Lock() - defer subLock.Unlock() - - for k, v := range subscriptions { - if v.Created == sub.Created { - defer func() { - subscriptions = append(subscriptions[:k], subscriptions[k+1:]...) - }() - close(sub.Created) - close(sub.Updated) - close(sub.Deleted) - return - } - } -} - -func (sub *Subscription) Subscriptions() *[]string { - subStrings := make([]string, 0) - for subString := range sub.exactObject { - subStrings = append(subStrings, subString) - } - for subString := range sub.typeAndLocation { - subStrings = append(subStrings, subString) - } - for subString, depth := range sub.children { - if depth == 0xFF { - subStrings = append(subStrings, fmt.Sprintf("/%s/", subString)) - } else { - subStrings = append(subStrings, fmt.Sprintf("/%s/%s", subString, strings.Repeat("*", int(depth)-len(strings.Split(subString, "/"))))) - } - } - return &subStrings -} - -func (sub *Subscription) String() string { - return fmt.Sprintf("", strings.Join(*sub.Subscriptions(), " ")) -} - -func (sub *Subscription) send(key *datastore.Key, model Model, created bool) { - if model == nil { - sub.Deleted <- key - } else if created { - sub.Created <- model - } else { - sub.Updated <- model - } -} - -func process(key *datastore.Key, model Model, created bool) { - subLock.Lock() - defer subLock.Unlock() - - stringRep := key.String() - // "/Comedy/MontyPython/Actor:JohnCleese" - typeAndLocation := key.Path().String() - // "/Comedy/MontyPython/Actor" - namespaces := key.Namespaces() - // ["Comedy", "MontyPython", "Actor:JohnCleese"] - depth := uint8(len(namespaces)) - // 3 - -subscriptionLoop: - for _, sub := range subscriptions { - if _, ok := sub.exactObject[stringRep]; ok { - sub.send(key, model, created) - continue subscriptionLoop - } - if _, ok := sub.typeAndLocation[typeAndLocation]; ok { - sub.send(key, model, created) - continue subscriptionLoop - } - for i := 0; i < len(namespaces); i++ { - if subscribedDepth, ok := sub.children[strings.Join(namespaces[:i], "/")]; ok { - if subscribedDepth >= depth { - sub.send(key, model, created) - continue subscriptionLoop - } - } - } - } - -} - -func init() { - subscriptionModule = modules.Register("Database:Subscriptions", 128) - subscriptions = make([]*Subscription, 0) - subLock = sync.Mutex{} - - databaseUpdate = make(chan Model, 32) - databaseCreate = make(chan Model, 32) - databaseDelete = make(chan *datastore.Key, 32) - - workIsWaiting = make(chan *struct{}, 0) - workIsWaitingFlag = abool.NewBool(false) - forceProcessing = make(chan *struct{}, 0) - - go run() -} - -func run() { - for { - select { - case <-subscriptionModule.Stop: - subscriptionModule.StopComplete() - return - case <-workIsWaiting: - work() - } - } -} - -func work() { - defer workIsWaitingFlag.UnSet() - - // wait - select { - case <-taskmanager.StartMediumPriorityMicroTask(): - defer taskmanager.EndMicroTask() - case <-forceProcessing: - } - - // work - for { - select { - case model := <-databaseCreate: - process(model.GetKey(), model, true) - case model := <-databaseUpdate: - process(model.GetKey(), model, false) - case key := <-databaseDelete: - process(key, nil, false) - default: - return - } - } -} - -func handleCreateSubscriptions(model Model) { - select { - case databaseCreate <- model: - default: - forceProcessing <- nil - databaseCreate <- model - } - if workIsWaitingFlag.SetToIf(false, true) { - workIsWaiting <- nil - } -} - -func handleUpdateSubscriptions(model Model) { - select { - case databaseUpdate <- model: - default: - forceProcessing <- nil - databaseUpdate <- model - } - if workIsWaitingFlag.SetToIf(false, true) { - workIsWaiting <- nil - } -} - -func handleDeleteSubscriptions(key *datastore.Key) { - select { - case databaseDelete <- key: - default: - forceProcessing <- nil - databaseDelete <- key - } - if workIsWaitingFlag.SetToIf(false, true) { - workIsWaiting <- nil - } -} +// import ( +// "fmt" +// "strings" +// "sync" +// +// "github.com/Safing/portbase/database/record" +// "github.com/Safing/portbase/modules" +// "github.com/Safing/portbase/taskmanager" +// +// "github.com/tevino/abool" +// ) +// +// var subscriptionModule *modules.Module +// var subscriptions []*Subscription +// var subLock sync.Mutex +// +// var databaseUpdate chan Model +// var databaseCreate chan Model +// var databaseDelete chan string +// +// var workIsWaiting chan *struct{} +// var workIsWaitingFlag *abool.AtomicBool +// var forceProcessing chan *struct{} +// +// type Subscription struct { +// typeAndLocation map[string]bool +// exactObject map[string]bool +// children map[string]uint8 +// Created chan record.Record +// Updated chan record.Record +// Deleted chan string +// } +// +// func NewSubscription() *Subscription { +// subLock.Lock() +// defer subLock.Unlock() +// sub := &Subscription{ +// typeAndLocation: make(map[string]bool), +// exactObject: make(map[string]bool), +// children: make(map[string]uint8), +// Created: make(chan record.Record, 128), +// Updated: make(chan record.Record, 128), +// Deleted: make(chan string, 128), +// } +// subscriptions = append(subscriptions, sub) +// return sub +// } +// +// func (sub *Subscription) Subscribe(subKey string) { +// subLock.Lock() +// defer subLock.Unlock() +// +// namespaces := strings.Split(subKey, "/")[1:] +// lastSpace := "" +// if len(namespaces) != 0 { +// lastSpace = namespaces[len(namespaces)-1] +// } +// +// switch { +// case lastSpace == "": +// // save key without leading "/" +// // save with depth 255 to get all +// sub.children[strings.Trim(subKey, "/")] = 0xFF +// case strings.HasPrefix(lastSpace, "*"): +// // save key without leading or trailing "/" or "*" +// // save full wanted depth - this makes comparison easier +// sub.children[strings.Trim(subKey, "/*")] = uint8(len(lastSpace) + len(namespaces) - 1) +// case strings.Contains(lastSpace, ":"): +// sub.exactObject[subKey] = true +// default: +// sub.typeAndLocation[subKey] = true +// } +// } +// +// func (sub *Subscription) Unsubscribe(subKey string) { +// subLock.Lock() +// defer subLock.Unlock() +// +// namespaces := strings.Split(subKey, "/")[1:] +// lastSpace := "" +// if len(namespaces) != 0 { +// lastSpace = namespaces[len(namespaces)-1] +// } +// +// switch { +// case lastSpace == "": +// delete(sub.children, strings.Trim(subKey, "/")) +// case strings.HasPrefix(lastSpace, "*"): +// delete(sub.children, strings.Trim(subKey, "/*")) +// case strings.Contains(lastSpace, ":"): +// delete(sub.exactObject, subKey) +// default: +// delete(sub.typeAndLocation, subKey) +// } +// } +// +// func (sub *Subscription) Destroy() { +// subLock.Lock() +// defer subLock.Unlock() +// +// for k, v := range subscriptions { +// if v.Created == sub.Created { +// defer func() { +// subscriptions = append(subscriptions[:k], subscriptions[k+1:]...) +// }() +// close(sub.Created) +// close(sub.Updated) +// close(sub.Deleted) +// return +// } +// } +// } +// +// func (sub *Subscription) Subscriptions() *[]string { +// subStrings := make([]string, 0) +// for subString := range sub.exactObject { +// subStrings = append(subStrings, subString) +// } +// for subString := range sub.typeAndLocation { +// subStrings = append(subStrings, subString) +// } +// for subString, depth := range sub.children { +// if depth == 0xFF { +// subStrings = append(subStrings, fmt.Sprintf("/%s/", subString)) +// } else { +// subStrings = append(subStrings, fmt.Sprintf("/%s/%s", subString, strings.Repeat("*", int(depth)-len(strings.Split(subString, "/"))))) +// } +// } +// return &subStrings +// } +// +// func (sub *Subscription) String() string { +// return fmt.Sprintf("", strings.Join(*sub.Subscriptions(), " ")) +// } +// +// func (sub *Subscription) send(key string, rec record.Record, created bool) { +// if rec == nil { +// sub.Deleted <- key +// } else if created { +// sub.Created <- rec +// } else { +// sub.Updated <- rec +// } +// } +// +// func process(key string, rec record.Record, created bool) { +// subLock.Lock() +// defer subLock.Unlock() +// +// stringRep := key.String() +// // "/Comedy/MontyPython/Actor:JohnCleese" +// typeAndLocation := key.Path().String() +// // "/Comedy/MontyPython/Actor" +// namespaces := key.Namespaces() +// // ["Comedy", "MontyPython", "Actor:JohnCleese"] +// depth := uint8(len(namespaces)) +// // 3 +// +// subscriptionLoop: +// for _, sub := range subscriptions { +// if _, ok := sub.exactObject[stringRep]; ok { +// sub.send(key, rec, created) +// continue subscriptionLoop +// } +// if _, ok := sub.typeAndLocation[typeAndLocation]; ok { +// sub.send(key, rec, created) +// continue subscriptionLoop +// } +// for i := 0; i < len(namespaces); i++ { +// if subscribedDepth, ok := sub.children[strings.Join(namespaces[:i], "/")]; ok { +// if subscribedDepth >= depth { +// sub.send(key, rec, created) +// continue subscriptionLoop +// } +// } +// } +// } +// +// } +// +// func init() { +// subscriptionModule = modules.Register("Database:Subscriptions", 128) +// subscriptions = make([]*Subscription, 0) +// subLock = sync.Mutex{} +// +// databaseUpdate = make(chan Model, 32) +// databaseCreate = make(chan Model, 32) +// databaseDelete = make(chan string, 32) +// +// workIsWaiting = make(chan *struct{}, 0) +// workIsWaitingFlag = abool.NewBool(false) +// forceProcessing = make(chan *struct{}, 0) +// +// go run() +// } +// +// func run() { +// for { +// select { +// case <-subscriptionModule.Stop: +// subscriptionModule.StopComplete() +// return +// case <-workIsWaiting: +// work() +// } +// } +// } +// +// func work() { +// defer workIsWaitingFlag.UnSet() +// +// // wait +// select { +// case <-taskmanager.StartMediumPriorityMicroTask(): +// defer taskmanager.EndMicroTask() +// case <-forceProcessing: +// } +// +// // work +// for { +// select { +// case rec := <-databaseCreate: +// process(rec.GetKey(), rec, true) +// case rec := <-databaseUpdate: +// process(rec.GetKey(), rec, false) +// case key := <-databaseDelete: +// process(key, nil, false) +// default: +// return +// } +// } +// } +// +// func handleCreateSubscriptions(rec record.Record) { +// select { +// case databaseCreate <- rec: +// default: +// forceProcessing <- nil +// databaseCreate <- rec +// } +// if workIsWaitingFlag.SetToIf(false, true) { +// workIsWaiting <- nil +// } +// } +// +// func handleUpdateSubscriptions(rec record.Record) { +// select { +// case databaseUpdate <- rec: +// default: +// forceProcessing <- nil +// databaseUpdate <- rec +// } +// if workIsWaitingFlag.SetToIf(false, true) { +// workIsWaiting <- nil +// } +// } +// +// func handleDeleteSubscriptions(key string) { +// select { +// case databaseDelete <- key: +// default: +// forceProcessing <- nil +// databaseDelete <- key +// } +// if workIsWaitingFlag.SetToIf(false, true) { +// workIsWaiting <- nil +// } +// } diff --git a/database/subscriptions_test.go b/database/subscriptions_test.go index 817a9df..5ab06d3 100644 --- a/database/subscriptions_test.go +++ b/database/subscriptions_test.go @@ -2,102 +2,102 @@ package database -import ( - "strconv" - "strings" - "sync" - "testing" -) - -var subTestWg sync.WaitGroup - -func waitForSubs(t *testing.T, sub *Subscription, highest int) { - defer subTestWg.Done() - expecting := 1 - var subbedModel Model -forLoop: - for { - select { - case subbedModel = <-sub.Created: - case subbedModel = <-sub.Updated: - } - t.Logf("got model from subscription: %s", subbedModel.GetKey().String()) - if !strings.HasPrefix(subbedModel.GetKey().Name(), "sub") { - // not a model that we use for testing, other tests might be interfering - continue forLoop - } - number, err := strconv.Atoi(strings.TrimPrefix(subbedModel.GetKey().Name(), "sub")) - if err != nil || number != expecting { - t.Errorf("test subscription: got unexpected model %s, expected sub%d", subbedModel.GetKey().String(), expecting) - continue forLoop - } - if number == highest { - return - } - expecting++ - } -} - -func TestSubscriptions(t *testing.T) { - - // create subscription - sub := NewSubscription() - - // FIRST TEST - - subTestWg.Add(1) - go waitForSubs(t, sub, 3) - sub.Subscribe("/Tests/") - t.Log(sub.String()) - - (&(TestingModel{})).CreateInNamespace("", "sub1") - (&(TestingModel{})).CreateInNamespace("A", "sub2") - (&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "sub3") - - subTestWg.Wait() - - // SECOND TEST - - subTestWg.Add(1) - go waitForSubs(t, sub, 3) - sub.Unsubscribe("/Tests/") - sub.Subscribe("/Tests/A/****") - t.Log(sub.String()) - - (&(TestingModel{})).CreateInNamespace("", "subX") - (&(TestingModel{})).CreateInNamespace("A", "sub1") - (&(TestingModel{})).CreateInNamespace("A/B/C/D", "sub2") - (&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "subX") - (&(TestingModel{})).CreateInNamespace("A", "sub3") - - subTestWg.Wait() - - // THIRD TEST - - subTestWg.Add(1) - go waitForSubs(t, sub, 3) - sub.Unsubscribe("/Tests/A/****") - sub.Subscribe("/Tests/TestingModel:sub1") - sub.Subscribe("/Tests/TestingModel:sub1/TestingModel") - t.Log(sub.String()) - - (&(TestingModel{})).CreateInNamespace("", "sub1") - (&(TestingModel{})).CreateInNamespace("", "subX") - (&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub2") - (&(TestingModel{})).CreateInNamespace("TestingModel:sub1/A", "subX") - (&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub3") - - subTestWg.Wait() - - // FINAL STUFF - - model := &TestingModel{} - model.CreateInNamespace("Invalid", "subX") - model.Save() - - sub.Destroy() - - // time.Sleep(1 * time.Second) - // pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - -} +// import ( +// "strconv" +// "strings" +// "sync" +// "testing" +// ) +// +// var subTestWg sync.WaitGroup +// +// func waitForSubs(t *testing.T, sub *Subscription, highest int) { +// defer subTestWg.Done() +// expecting := 1 +// var subbedModel Model +// forLoop: +// for { +// select { +// case subbedModel = <-sub.Created: +// case subbedModel = <-sub.Updated: +// } +// t.Logf("got model from subscription: %s", subbedModel.GetKey().String()) +// if !strings.HasPrefix(subbedModel.GetKey().Name(), "sub") { +// // not a model that we use for testing, other tests might be interfering +// continue forLoop +// } +// number, err := strconv.Atoi(strings.TrimPrefix(subbedModel.GetKey().Name(), "sub")) +// if err != nil || number != expecting { +// t.Errorf("test subscription: got unexpected model %s, expected sub%d", subbedModel.GetKey().String(), expecting) +// continue forLoop +// } +// if number == highest { +// return +// } +// expecting++ +// } +// } +// +// func TestSubscriptions(t *testing.T) { +// +// // create subscription +// sub := NewSubscription() +// +// // FIRST TEST +// +// subTestWg.Add(1) +// go waitForSubs(t, sub, 3) +// sub.Subscribe("/Tests/") +// t.Log(sub.String()) +// +// (&(TestingModel{})).CreateInNamespace("", "sub1") +// (&(TestingModel{})).CreateInNamespace("A", "sub2") +// (&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "sub3") +// +// subTestWg.Wait() +// +// // SECOND TEST +// +// subTestWg.Add(1) +// go waitForSubs(t, sub, 3) +// sub.Unsubscribe("/Tests/") +// sub.Subscribe("/Tests/A/****") +// t.Log(sub.String()) +// +// (&(TestingModel{})).CreateInNamespace("", "subX") +// (&(TestingModel{})).CreateInNamespace("A", "sub1") +// (&(TestingModel{})).CreateInNamespace("A/B/C/D", "sub2") +// (&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "subX") +// (&(TestingModel{})).CreateInNamespace("A", "sub3") +// +// subTestWg.Wait() +// +// // THIRD TEST +// +// subTestWg.Add(1) +// go waitForSubs(t, sub, 3) +// sub.Unsubscribe("/Tests/A/****") +// sub.Subscribe("/Tests/TestingModel:sub1") +// sub.Subscribe("/Tests/TestingModel:sub1/TestingModel") +// t.Log(sub.String()) +// +// (&(TestingModel{})).CreateInNamespace("", "sub1") +// (&(TestingModel{})).CreateInNamespace("", "subX") +// (&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub2") +// (&(TestingModel{})).CreateInNamespace("TestingModel:sub1/A", "subX") +// (&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub3") +// +// subTestWg.Wait() +// +// // FINAL STUFF +// +// model := &TestingModel{} +// model.CreateInNamespace("Invalid", "subX") +// model.Save() +// +// sub.Destroy() +// +// // time.Sleep(1 * time.Second) +// // pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) +// +// } diff --git a/database/wrapper.go b/database/wrapper.go deleted file mode 100644 index 74d3089..0000000 --- a/database/wrapper.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package database - -import ( - "github.com/ipfs/go-datastore" - - "github.com/Safing/safing-core/database/dbutils" -) - -func NewWrapper(key *datastore.Key, data []byte) (*dbutils.Wrapper, error) { - return dbutils.NewWrapper(key, data) -} - -func DumpModel(uncertain interface{}, storageType uint8) ([]byte, error) { - return dbutils.DumpModel(uncertain, storageType) -} diff --git a/database/wrapper_test.go b/database/wrapper_test.go deleted file mode 100644 index 27ce5f2..0000000 --- a/database/wrapper_test.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package database - -import ( - "testing" - - "github.com/Safing/safing-core/formats/dsd" -) - -func TestWrapper(t *testing.T) { - - // create Model - new := &TestingModel{ - Name: "a", - Value: "b", - } - newTwo := &TestingModel{ - Name: "c", - Value: "d", - } - - // dump - bytes, err := DumpModel(new, dsd.JSON) - if err != nil { - panic(err) - } - bytesTwo, err := DumpModel(newTwo, dsd.JSON) - if err != nil { - panic(err) - } - - // wrap - wrapped, err := NewWrapper(nil, bytes) - if err != nil { - panic(err) - } - wrappedTwo, err := NewWrapper(nil, bytesTwo) - if err != nil { - panic(err) - } - - // model definition for unwrapping - var model *TestingModel - - // unwrap - myModel, ok := SilentEnsureModel(wrapped, model).(*TestingModel) - if !ok { - panic("received model does not match expected model") - } - if myModel.Name != "a" || myModel.Value != "b" { - panic("model value mismatch") - } - - // verbose unwrap - genericModel, err := EnsureModel(wrappedTwo, model) - if err != nil { - panic(err) - } - myModelTwo, ok := genericModel.(*TestingModel) - if !ok { - panic("received model does not match expected model") - } - if myModelTwo.Name != "c" || myModelTwo.Value != "d" { - panic("model value mismatch") - } - -}