diff --git a/database/controller.go b/database/controller.go index f8534dc..2c4875f 100644 --- a/database/controller.go +++ b/database/controller.go @@ -17,7 +17,8 @@ type Controller struct { storage storage.Interface writeLock sync.RWMutex readLock sync.RWMutex - migrating *abool.AtomicBool + migrating *abool.AtomicBool // TODO + hibernating *abool.AtomicBool // TODO } // newController creates a new controller for a storage. @@ -25,6 +26,7 @@ func newController(storageInt storage.Interface) (*Controller, error) { return &Controller{ storage: storageInt, migrating: abool.NewBool(false), + hibernating: abool.NewBool(false), }, nil } @@ -41,6 +43,10 @@ func (c *Controller) Get(key string) (record.Record, error) { r, err := c.storage.Get(key) if err != nil { + // replace not found error + if err == storage.ErrNotFound { + return nil, ErrNotFound + } return nil, err } @@ -60,7 +66,7 @@ func (c *Controller) Put(r record.Record) error { return ErrShuttingDown } - if c.storage.ReadOnly() { + if c.ReadOnly() { return ErrReadOnly } diff --git a/database/databases.go b/database/controllers.go similarity index 53% rename from database/databases.go rename to database/controllers.go index 086e8be..5b9d002 100644 --- a/database/databases.go +++ b/database/controllers.go @@ -4,74 +4,63 @@ import ( "errors" "sync" "fmt" - "path" - "github.com/tevino/abool" "github.com/Safing/portbase/database/storage" - "github.com/Safing/portbase/database/record" ) var ( - databases = make(map[string]*Controller) - databasesLock sync.Mutex - - shuttingDown = abool.NewBool(false) + controllers = make(map[string]*Controller) + controllersLock sync.Mutex ) -func splitKeyAndGetDatabase(key string) (db *Controller, dbKey string, err error) { - var dbName string - dbName, dbKey = record.ParseKey(key) - db, err = getDatabase(dbName) - if err != nil { - return nil, "", err - } - return -} - -func getDatabase(name string) (*Controller, error) { +func getController(name string) (*Controller, error) { if !initialized.IsSet() { return nil, errors.New("database not initialized") } - databasesLock.Lock() - defer databasesLock.Unlock() + controllersLock.Lock() + defer controllersLock.Unlock() // return database if already started - db, ok := databases[name] + controller, ok := controllers[name] if ok { - return db, nil + return controller, nil } - registryLock.Lock() - defer registryLock.Unlock() + // get db registration + registeredDB, err := getDatabase(name) + if err != nil { + return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err) + } - // check if database exists at all - registeredDB, ok := registry[name] - if !ok { - return nil, fmt.Errorf(`database "%s" not registered`, name) - } + // get location + dbLocation, err := getLocation(name, registeredDB.StorageType) + if err != nil { + return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err) + } // start database - storageInt, err := storage.StartDatabase(name, registeredDB.StorageType, path.Join(rootDir, name, registeredDB.StorageType)) + storageInt, err := storage.StartDatabase(name, registeredDB.StorageType, dbLocation) if err != nil { return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err) } - db, err = newController(storageInt) + // create controller + controller, err = newController(storageInt) if err != nil { return nil, fmt.Errorf(`could not create controller for database %s: %s`, name, err) } - databases[name] = db - return db, nil + controllers[name] = controller + return controller, nil } // InjectDatabase injects an already running database into the system. func InjectDatabase(name string, storageInt storage.Interface) error { - databasesLock.Lock() - defer databasesLock.Unlock() + controllersLock.Lock() + defer controllersLock.Unlock() - _, ok := databases[name] + _, ok := controllers[name] if ok { return errors.New(`database "%s" already loaded`) } @@ -88,11 +77,11 @@ func InjectDatabase(name string, storageInt storage.Interface) error { return fmt.Errorf(`database not of type "injected"`) } - db, err := newController(storageInt) + controller, err := newController(storageInt) if err != nil { return fmt.Errorf(`could not create controller for database %s: %s`, name, err) } - databases[name] = db + controllers[name] = controller return nil } diff --git a/database/database.go b/database/database.go new file mode 100644 index 0000000..e8e4504 --- /dev/null +++ b/database/database.go @@ -0,0 +1,32 @@ +package database + +import ( + "errors" + "time" +) + +// Database holds information about registered databases +type Database struct { + Name string + Description string + StorageType string + PrimaryAPI string + Registered time.Time + LastUpdated time.Time + LastLoaded time.Time +} + +// MigrateTo migrates the database to another storage type. +func (db *Database) MigrateTo(newStorageType string) error { + return errors.New("not implemented yet") // TODO +} + +// Loaded updates the LastLoaded timestamp. +func (db *Database) Loaded() { + db.LastLoaded = time.Now().Round(time.Second) +} + +// Updated updates the LastUpdated timestamp. +func (db *Database) Updated() { + db.LastUpdated = time.Now().Round(time.Second) +} diff --git a/database/database_test.go b/database/database_test.go index 5f878a7..bab7579 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -1,12 +1,16 @@ package database import ( + "fmt" "io/ioutil" "os" + "runtime/pprof" "sync" "testing" + "time" "github.com/Safing/portbase/database/record" + _ "github.com/Safing/portbase/database/storage/badger" ) type TestRecord struct { @@ -34,7 +38,46 @@ func (tr *TestRecord) Lock() { func (tr *TestRecord) Unlock() { } -func TestDatabase(t *testing.T) { +func makeKey(storageType, key string) string { + return fmt.Sprintf("%s:%s", storageType, key) +} + +func testDatabase(t *testing.T, storageType string) { + dbName := fmt.Sprintf("testing-%s", storageType) + _, err := Register(&Database{ + Name: dbName, + Description: fmt.Sprintf("Unit Test Database for %s", storageType), + StorageType: storageType, + PrimaryAPI: "", + }) + if err != nil { + t.Fatal(err) + } + + db := NewInterface(nil) + + new := &TestRecord{} + new.SetKey(makeKey(dbName, "A")) + err = db.Put(new) + if err != nil { + t.Fatal(err) + } + + _, err = db.Get(makeKey(dbName, "A")) + if err != nil { + t.Fatal(err) + } +} + +func TestDatabaseSystem(t *testing.T) { + + // panic after 10 seconds, to check for locks + go func() { + time.Sleep(10 * time.Second) + fmt.Println("===== TAKING TOO LONG FOR SHUTDOWN - PRINTING STACK TRACES =====") + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + os.Exit(1) + }() testDir, err := ioutil.TempDir("", "testing-") if err != nil { @@ -47,21 +90,19 @@ func TestDatabase(t *testing.T) { } defer os.RemoveAll(testDir) // clean up - err = RegisterDatabase(&RegisteredDatabase{ - Name: "testing", - Description: "Unit Test Database", - StorageType: "badger", - PrimaryAPI: "", - }) + testDatabase(t, "badger") + + err = Maintain() if err != nil { t.Fatal(err) } - db := NewInterface(nil) + err = MaintainThorough() + if err != nil { + t.Fatal(err) + } - new := &TestRecord{} - new.SetKey("testing:A") - err = db.Put(new) + err = Shutdown() if err != nil { t.Fatal(err) } diff --git a/database/interface.go b/database/interface.go index b36c166..de5910c 100644 --- a/database/interface.go +++ b/database/interface.go @@ -27,6 +27,19 @@ type Options struct { AlwaysMakeCrownjewel bool } +// Apply applies options to the record metadata. +func (o *Options) Apply(r record.Record) { + if r.Meta() == nil { + r.SetMeta(&record.Meta{}) + } + if o.AlwaysMakeSecret { + r.Meta().MakeSecret() + } + if o.AlwaysMakeCrownjewel { + r.Meta().MakeCrownJewel() + } +} + // NewInterface returns a new Interface to the database. func NewInterface(opts *Options) *Interface { if opts == nil { @@ -61,7 +74,7 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri dbName, dbKey = record.ParseKey(dbKey) } - db, err = getDatabase(dbName) + db, err = getController(dbName) if err != nil { return nil, nil, err } @@ -72,6 +85,9 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri r, err = db.Get(dbKey) if err != nil { + if err == ErrNotFound { + return nil, db, err + } return nil, nil, err } @@ -108,26 +124,30 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{}) return fmt.Errorf("failed to set value with %s: %s", acc.Type(), err) } + i.options.Apply(r) return db.Put(r) } // Put saves a record to the database. func (i *Interface) Put(r record.Record) error { _, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) - if err != nil { + if err != nil && err != ErrNotFound { return err } + + i.options.Apply(r) return db.Put(r) } -// PutNew saves a record to the database as a new record (ie. with a new creation timestamp). +// PutNew saves a record to the database as a new record (ie. with new timestamps). func (i *Interface) PutNew(r record.Record) error { _, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) if err != nil && err != ErrNotFound { return err } - r.SetMeta(&record.Meta{}) + i.options.Apply(r) + r.Meta().Reset() return db.Put(r) } @@ -141,6 +161,7 @@ func (i *Interface) SetAbsoluteExpiry(key string, time int64) error { r.Lock() defer r.Unlock() + i.options.Apply(r) r.Meta().SetAbsoluteExpiry(time) return db.Put(r) } @@ -155,6 +176,7 @@ func (i *Interface) SetRelativateExpiry(key string, duration int64) error { r.Lock() defer r.Unlock() + i.options.Apply(r) r.Meta().SetRelativateExpiry(duration) return db.Put(r) } @@ -169,6 +191,7 @@ func (i *Interface) MakeSecret(key string) error { r.Lock() defer r.Unlock() + i.options.Apply(r) r.Meta().MakeSecret() return db.Put(r) } @@ -183,6 +206,7 @@ func (i *Interface) MakeCrownJewel(key string) error { r.Lock() defer r.Unlock() + i.options.Apply(r) r.Meta().MakeCrownJewel() return db.Put(r) } @@ -197,13 +221,14 @@ func (i *Interface) Delete(key string) error { r.Lock() defer r.Unlock() + i.options.Apply(r) r.Meta().Delete() return db.Put(r) } // Query executes the given query on the database. func (i *Interface) Query(q *query.Query) (*iterator.Iterator, error) { - db, err := getDatabase(q.DatabaseName()) + db, err := getController(q.DatabaseName()) if err != nil { return nil, err } diff --git a/database/location.go b/database/location.go index a44e415..0c095b6 100644 --- a/database/location.go +++ b/database/location.go @@ -1,42 +1,26 @@ package database import ( - "path" - "os" - "fmt" "errors" + "fmt" + "os" + "path" +) + +const ( + databasesSubDir = "databases" ) var ( rootDir string ) -// Initialize initialized the database -func Initialize(location string) error { - if initialized.SetToIf(false, true) { - rootDir = location - - err := checkRootDir() - if err != nil { - return fmt.Errorf("could not create/open database directory (%s): %s", rootDir, err) - } - - err = loadRegistry() - if err != nil { - return fmt.Errorf("could not load database registry (%s): %s", path.Join(rootDir, registryFileName), err) - } - - return nil - } - return errors.New("database already initialized") -} - -func checkRootDir() error { +func ensureDirectory(dirPath string) error { // open dir - dir, err := os.Open(rootDir) + dir, err := os.Open(dirPath) if err != nil { - if err == os.ErrNotExist { - return os.MkdirAll(rootDir, 0700) + if os.IsNotExist(err) { + return os.MkdirAll(dirPath, 0700) } return err } @@ -46,7 +30,9 @@ func checkRootDir() error { if err != nil { return err } - + if !fileInfo.IsDir() { + return errors.New("path exists and is not a directory") + } if fileInfo.Mode().Perm() != 0700 { return dir.Chmod(0700) } @@ -54,6 +40,13 @@ func checkRootDir() error { } // getLocation returns the storage location for the given name and type. -func getLocation(name, storageType string) (location string, err error) { - return path.Join(rootDir, name, storageType), nil +func getLocation(name, storageType string) (string, error) { + location := path.Join(rootDir, databasesSubDir, name, storageType) + + // check location + err := ensureDirectory(location) + if err != nil { + return "", fmt.Errorf("location (%s) invalid: %s", location, err) + } + return location, nil } diff --git a/database/main.go b/database/main.go new file mode 100644 index 0000000..7bb781d --- /dev/null +++ b/database/main.go @@ -0,0 +1,55 @@ +package database + +import ( + "errors" + "fmt" + "path" + + "github.com/tevino/abool" +) + +var ( + initialized = abool.NewBool(false) + + shuttingDown = abool.NewBool(false) + shutdownSignal = make(chan struct{}) +) + +// Initialize initialized the database +func Initialize(location string) error { + if initialized.SetToIf(false, true) { + rootDir = location + + err := ensureDirectory(rootDir) + if err != nil { + return fmt.Errorf("could not create/open database directory (%s): %s", rootDir, err) + } + + err = loadRegistry() + if err != nil { + return fmt.Errorf("could not load database registry (%s): %s", path.Join(rootDir, registryFileName), err) + } + + // start registry writer + go registryWriter() + + return nil + } + return errors.New("database already initialized") +} + +// Shutdown shuts down the whole database system. +func Shutdown() (err error) { + if shuttingDown.SetToIf(false, true) { + close(shutdownSignal) + } + + all := duplicateControllers() + for _, c := range all { + err = c.Shutdown() + if err != nil { + return + } + } + return +} diff --git a/database/maintainence.go b/database/maintainence.go index 5b5f1f5..4029a69 100644 --- a/database/maintainence.go +++ b/database/maintainence.go @@ -14,8 +14,8 @@ func Maintain() (err error) { // MaintainThorough runs the MaintainThorough method on all storages. func MaintainThorough() (err error) { - controllers := duplicateControllers() - for _, c := range controllers { + all := duplicateControllers() + for _, c := range all { err = c.MaintainThorough() if err != nil { return @@ -24,26 +24,12 @@ func MaintainThorough() (err error) { return } -// Shutdown shuts down the whole database system. -func Shutdown() (err error) { - shuttingDown.Set() +func duplicateControllers() (all []*Controller) { + controllersLock.Lock() + defer controllersLock.Unlock() - controllers := duplicateControllers() for _, c := range controllers { - err = c.Shutdown() - if err != nil { - return - } - } - return -} - -func duplicateControllers() (controllers []*Controller) { - databasesLock.Lock() - defer databasesLock.Unlock() - - for _, c := range databases { - controllers = append(controllers, c) + all = append(all, c) } return diff --git a/database/registry.go b/database/registry.go index c83a9af..1322854 100644 --- a/database/registry.go +++ b/database/registry.go @@ -3,67 +3,100 @@ package database import ( "encoding/json" "errors" + "fmt" "io/ioutil" "os" "path" "regexp" "sync" + "time" "github.com/tevino/abool" ) -// RegisteredDatabase holds information about registered databases -type RegisteredDatabase struct { - Name string - Description string - StorageType string - PrimaryAPI string -} - -// Equal returns whether this instance equals another. -func (r *RegisteredDatabase) Equal(o *RegisteredDatabase) bool { - if r.Name != o.Name || - r.Description != o.Description || - r.StorageType != o.StorageType || - r.PrimaryAPI != o.PrimaryAPI { - return false - } - return true -} - const ( registryFileName = "databases.json" ) var ( - initialized = abool.NewBool(false) + writeRegistrySoon = abool.NewBool(false) - registry map[string]*RegisteredDatabase + registry map[string]*Database registryLock sync.Mutex - nameConstraint = regexp.MustCompile("^[A-Za-z0-9_-]{5,}$") + nameConstraint = regexp.MustCompile("^[A-Za-z0-9_-]{4,}$") ) -// RegisterDatabase registers a new database. -func RegisterDatabase(new *RegisteredDatabase) error { +// Register registers a new database. +// If the database is already registered, only +// the description and the primary API will be +// updated and the effective object will be returned. +func Register(new *Database) (*Database, error) { if !initialized.IsSet() { - return errors.New("database not initialized") - } - - if !nameConstraint.MatchString(new.Name) { - return errors.New("database name must only contain alphanumeric and `_-` characters and must be at least 5 characters long") + return nil, errors.New("database not initialized") } registryLock.Lock() defer registryLock.Unlock() registeredDB, ok := registry[new.Name] - if !ok || !new.Equal(registeredDB) { + save := false + + if ok { + // update database + if registeredDB.Description != new.Description { + registeredDB.Description = new.Description + save = true + } + if registeredDB.PrimaryAPI != new.PrimaryAPI { + registeredDB.PrimaryAPI = new.PrimaryAPI + save = true + } + } else { + // register new database + if !nameConstraint.MatchString(new.Name) { + return nil, errors.New("database name must only contain alphanumeric and `_-` characters and must be at least 4 characters long") + } + + now := time.Now().Round(time.Second) + new.Registered = now + new.LastUpdated = now + new.LastLoaded = time.Time{} + registry[new.Name] = new - return saveRegistry() + save = true } - return nil + if save { + if ok { + registeredDB.Updated() + } + err := saveRegistry(false) + if err != nil { + return nil, err + } + } + + if ok { + return registeredDB, nil + } + return nil, nil +} + +func getDatabase(name string) (*Database, error) { + registryLock.Lock() + defer registryLock.Unlock() + + registeredDB, ok := registry[name] + if !ok { + return nil, fmt.Errorf(`database "%s" not registered`, name) + } + if time.Now().Add(-24 * time.Hour).After(registeredDB.LastLoaded) { + writeRegistrySoon.Set() + } + registeredDB.Loaded() + + return registeredDB, nil } func loadRegistry() error { @@ -74,15 +107,15 @@ func loadRegistry() error { filePath := path.Join(rootDir, registryFileName) data, err := ioutil.ReadFile(filePath) if err != nil { - if err == os.ErrNotExist { - registry = make(map[string]*RegisteredDatabase) + if os.IsNotExist(err) { + registry = make(map[string]*Database) return nil } return err } // parse - new := make(map[string]*RegisteredDatabase) + new := make(map[string]*Database) err = json.Unmarshal(data, new) if err != nil { return err @@ -93,12 +126,14 @@ func loadRegistry() error { return nil } -func saveRegistry() error { - registryLock.Lock() - defer registryLock.Unlock() +func saveRegistry(lock bool) error { + if lock { + registryLock.Lock() + defer registryLock.Unlock() + } // marshal - data, err := json.Marshal(registry) + data, err := json.MarshalIndent(registry, "", "\t") if err != nil { return err } @@ -107,3 +142,17 @@ func saveRegistry() error { filePath := path.Join(rootDir, registryFileName) return ioutil.WriteFile(filePath, data, 0600) } + +func registryWriter() { + for { + select { + case <-time.After(1 * time.Hour): + if writeRegistrySoon.SetToIf(true, false) { + saveRegistry(true) + } + case <-shutdownSignal: + saveRegistry(true) + return + } + } +} diff --git a/database/storage/badger/badger.go b/database/storage/badger/badger.go index af04bf4..ebe849a 100644 --- a/database/storage/badger/badger.go +++ b/database/storage/badger/badger.go @@ -116,7 +116,7 @@ func (b *Badger) Delete(key string) error { } // Query returns a an iterator for the supplied query. -func (b *Badger) Query(q *query.Query) (*iterator.Iterator, error) { +func (b *Badger) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { return nil, errors.New("query not implemented by badger") } diff --git a/database/storage/badger/badger_test.go b/database/storage/badger/badger_test.go new file mode 100644 index 0000000..cc0f2f8 --- /dev/null +++ b/database/storage/badger/badger_test.go @@ -0,0 +1,69 @@ +package badger + +import ( + "io/ioutil" + "os" + "sync" + "testing" + + "github.com/Safing/portbase/database/record" +) + +type TestRecord struct { + record.Base + lock sync.Mutex + S string + I int + I8 int8 + I16 int16 + I32 int32 + I64 int64 + UI uint + UI8 uint8 + UI16 uint16 + UI32 uint32 + UI64 uint64 + F32 float32 + F64 float64 + B bool +} + +func (tr *TestRecord) Lock() { +} + +func (tr *TestRecord) Unlock() { +} + +func TestBadger(t *testing.T) { + testDir, err := ioutil.TempDir("", "testing-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(testDir) // clean up + + db, err := NewBadger("test", testDir) + if err != nil { + t.Fatal(err) + } + + a := &TestRecord{S: "banana"} + a.SetMeta(&record.Meta{}) + a.Meta().Update() + a.SetKey("test:A") + + err = db.Put(a) + if err != nil { + t.Fatal(err) + } + + r1, err := db.Get("A") + if err != nil { + t.Fatal(err) + } + + a1 := r1.(*TestRecord) + + if a.S != a1.S { + t.Fatal("mismatch") + } +} diff --git a/database/storage/errors.go b/database/storage/errors.go index 91c7689..a280296 100644 --- a/database/storage/errors.go +++ b/database/storage/errors.go @@ -4,5 +4,5 @@ import "errors" // Errors for storages var ( - ErrNotFound = errors.New("not found") + ErrNotFound = errors.New("storage entry could not be found") ) diff --git a/database/storage/storages.go b/database/storage/storages.go index 948af83..1fa7448 100644 --- a/database/storage/storages.go +++ b/database/storage/storages.go @@ -10,7 +10,7 @@ import ( type Factory func(name, location string) (Interface, error) var ( - storages map[string]Factory + storages = make(map[string]Factory) storagesLock sync.Mutex ) @@ -38,9 +38,9 @@ func StartDatabase(name, storageType, location string) (Interface, error) { storagesLock.Lock() defer storagesLock.Unlock() - factory, ok := storages[name] + factory, ok := storages[storageType] if !ok { - return nil, fmt.Errorf("storage of this type (%s) does not exist", storageType) + return nil, fmt.Errorf("storage type %s not registered", storageType) } return factory(name, location)