mirror of
https://github.com/safing/portbase
synced 2025-09-01 18:19:57 +00:00
Work on database + tests
This commit is contained in:
parent
06a34f931e
commit
4802982734
13 changed files with 395 additions and 150 deletions
|
@ -17,7 +17,8 @@ type Controller struct {
|
||||||
storage storage.Interface
|
storage storage.Interface
|
||||||
writeLock sync.RWMutex
|
writeLock sync.RWMutex
|
||||||
readLock sync.RWMutex
|
readLock sync.RWMutex
|
||||||
migrating *abool.AtomicBool
|
migrating *abool.AtomicBool // TODO
|
||||||
|
hibernating *abool.AtomicBool // TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
// newController creates a new controller for a storage.
|
// newController creates a new controller for a storage.
|
||||||
|
@ -25,6 +26,7 @@ func newController(storageInt storage.Interface) (*Controller, error) {
|
||||||
return &Controller{
|
return &Controller{
|
||||||
storage: storageInt,
|
storage: storageInt,
|
||||||
migrating: abool.NewBool(false),
|
migrating: abool.NewBool(false),
|
||||||
|
hibernating: abool.NewBool(false),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +43,10 @@ func (c *Controller) Get(key string) (record.Record, error) {
|
||||||
|
|
||||||
r, err := c.storage.Get(key)
|
r, err := c.storage.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// replace not found error
|
||||||
|
if err == storage.ErrNotFound {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +66,7 @@ func (c *Controller) Put(r record.Record) error {
|
||||||
return ErrShuttingDown
|
return ErrShuttingDown
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.storage.ReadOnly() {
|
if c.ReadOnly() {
|
||||||
return ErrReadOnly
|
return ErrReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,74 +4,63 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
|
||||||
|
|
||||||
"github.com/tevino/abool"
|
|
||||||
"github.com/Safing/portbase/database/storage"
|
"github.com/Safing/portbase/database/storage"
|
||||||
"github.com/Safing/portbase/database/record"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
databases = make(map[string]*Controller)
|
controllers = make(map[string]*Controller)
|
||||||
databasesLock sync.Mutex
|
controllersLock sync.Mutex
|
||||||
|
|
||||||
shuttingDown = abool.NewBool(false)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func splitKeyAndGetDatabase(key string) (db *Controller, dbKey string, err error) {
|
func getController(name string) (*Controller, error) {
|
||||||
var dbName string
|
|
||||||
dbName, dbKey = record.ParseKey(key)
|
|
||||||
db, err = getDatabase(dbName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, "", err
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func getDatabase(name string) (*Controller, error) {
|
|
||||||
if !initialized.IsSet() {
|
if !initialized.IsSet() {
|
||||||
return nil, errors.New("database not initialized")
|
return nil, errors.New("database not initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
databasesLock.Lock()
|
controllersLock.Lock()
|
||||||
defer databasesLock.Unlock()
|
defer controllersLock.Unlock()
|
||||||
|
|
||||||
// return database if already started
|
// return database if already started
|
||||||
db, ok := databases[name]
|
controller, ok := controllers[name]
|
||||||
if ok {
|
if ok {
|
||||||
return db, nil
|
return controller, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
registryLock.Lock()
|
// get db registration
|
||||||
defer registryLock.Unlock()
|
registeredDB, err := getDatabase(name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err)
|
||||||
|
}
|
||||||
|
|
||||||
// check if database exists at all
|
// get location
|
||||||
registeredDB, ok := registry[name]
|
dbLocation, err := getLocation(name, registeredDB.StorageType)
|
||||||
if !ok {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(`database "%s" not registered`, name)
|
return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// start database
|
// start database
|
||||||
storageInt, err := storage.StartDatabase(name, registeredDB.StorageType, path.Join(rootDir, name, registeredDB.StorageType))
|
storageInt, err := storage.StartDatabase(name, registeredDB.StorageType, dbLocation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err)
|
return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err = newController(storageInt)
|
// create controller
|
||||||
|
controller, err = newController(storageInt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(`could not create controller for database %s: %s`, name, err)
|
return nil, fmt.Errorf(`could not create controller for database %s: %s`, name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
databases[name] = db
|
controllers[name] = controller
|
||||||
return db, nil
|
return controller, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// InjectDatabase injects an already running database into the system.
|
// InjectDatabase injects an already running database into the system.
|
||||||
func InjectDatabase(name string, storageInt storage.Interface) error {
|
func InjectDatabase(name string, storageInt storage.Interface) error {
|
||||||
databasesLock.Lock()
|
controllersLock.Lock()
|
||||||
defer databasesLock.Unlock()
|
defer controllersLock.Unlock()
|
||||||
|
|
||||||
_, ok := databases[name]
|
_, ok := controllers[name]
|
||||||
if ok {
|
if ok {
|
||||||
return errors.New(`database "%s" already loaded`)
|
return errors.New(`database "%s" already loaded`)
|
||||||
}
|
}
|
||||||
|
@ -88,11 +77,11 @@ func InjectDatabase(name string, storageInt storage.Interface) error {
|
||||||
return fmt.Errorf(`database not of type "injected"`)
|
return fmt.Errorf(`database not of type "injected"`)
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := newController(storageInt)
|
controller, err := newController(storageInt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf(`could not create controller for database %s: %s`, name, err)
|
return fmt.Errorf(`could not create controller for database %s: %s`, name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
databases[name] = db
|
controllers[name] = controller
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
32
database/database.go
Normal file
32
database/database.go
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Database holds information about registered databases
|
||||||
|
type Database struct {
|
||||||
|
Name string
|
||||||
|
Description string
|
||||||
|
StorageType string
|
||||||
|
PrimaryAPI string
|
||||||
|
Registered time.Time
|
||||||
|
LastUpdated time.Time
|
||||||
|
LastLoaded time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// MigrateTo migrates the database to another storage type.
|
||||||
|
func (db *Database) MigrateTo(newStorageType string) error {
|
||||||
|
return errors.New("not implemented yet") // TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loaded updates the LastLoaded timestamp.
|
||||||
|
func (db *Database) Loaded() {
|
||||||
|
db.LastLoaded = time.Now().Round(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updated updates the LastUpdated timestamp.
|
||||||
|
func (db *Database) Updated() {
|
||||||
|
db.LastUpdated = time.Now().Round(time.Second)
|
||||||
|
}
|
|
@ -1,12 +1,16 @@
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime/pprof"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Safing/portbase/database/record"
|
"github.com/Safing/portbase/database/record"
|
||||||
|
_ "github.com/Safing/portbase/database/storage/badger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TestRecord struct {
|
type TestRecord struct {
|
||||||
|
@ -34,7 +38,46 @@ func (tr *TestRecord) Lock() {
|
||||||
func (tr *TestRecord) Unlock() {
|
func (tr *TestRecord) Unlock() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDatabase(t *testing.T) {
|
func makeKey(storageType, key string) string {
|
||||||
|
return fmt.Sprintf("%s:%s", storageType, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDatabase(t *testing.T, storageType string) {
|
||||||
|
dbName := fmt.Sprintf("testing-%s", storageType)
|
||||||
|
_, err := Register(&Database{
|
||||||
|
Name: dbName,
|
||||||
|
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
|
||||||
|
StorageType: storageType,
|
||||||
|
PrimaryAPI: "",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db := NewInterface(nil)
|
||||||
|
|
||||||
|
new := &TestRecord{}
|
||||||
|
new.SetKey(makeKey(dbName, "A"))
|
||||||
|
err = db.Put(new)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = db.Get(makeKey(dbName, "A"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDatabaseSystem(t *testing.T) {
|
||||||
|
|
||||||
|
// panic after 10 seconds, to check for locks
|
||||||
|
go func() {
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
fmt.Println("===== TAKING TOO LONG FOR SHUTDOWN - PRINTING STACK TRACES =====")
|
||||||
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||||
|
os.Exit(1)
|
||||||
|
}()
|
||||||
|
|
||||||
testDir, err := ioutil.TempDir("", "testing-")
|
testDir, err := ioutil.TempDir("", "testing-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -47,21 +90,19 @@ func TestDatabase(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(testDir) // clean up
|
defer os.RemoveAll(testDir) // clean up
|
||||||
|
|
||||||
err = RegisterDatabase(&RegisteredDatabase{
|
testDatabase(t, "badger")
|
||||||
Name: "testing",
|
|
||||||
Description: "Unit Test Database",
|
err = Maintain()
|
||||||
StorageType: "badger",
|
|
||||||
PrimaryAPI: "",
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
db := NewInterface(nil)
|
err = MaintainThorough()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
new := &TestRecord{}
|
err = Shutdown()
|
||||||
new.SetKey("testing:A")
|
|
||||||
err = db.Put(new)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,19 @@ type Options struct {
|
||||||
AlwaysMakeCrownjewel bool
|
AlwaysMakeCrownjewel bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply applies options to the record metadata.
|
||||||
|
func (o *Options) Apply(r record.Record) {
|
||||||
|
if r.Meta() == nil {
|
||||||
|
r.SetMeta(&record.Meta{})
|
||||||
|
}
|
||||||
|
if o.AlwaysMakeSecret {
|
||||||
|
r.Meta().MakeSecret()
|
||||||
|
}
|
||||||
|
if o.AlwaysMakeCrownjewel {
|
||||||
|
r.Meta().MakeCrownJewel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewInterface returns a new Interface to the database.
|
// NewInterface returns a new Interface to the database.
|
||||||
func NewInterface(opts *Options) *Interface {
|
func NewInterface(opts *Options) *Interface {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
|
@ -61,7 +74,7 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri
|
||||||
dbName, dbKey = record.ParseKey(dbKey)
|
dbName, dbKey = record.ParseKey(dbKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err = getDatabase(dbName)
|
db, err = getController(dbName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -72,6 +85,9 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri
|
||||||
|
|
||||||
r, err = db.Get(dbKey)
|
r, err = db.Get(dbKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == ErrNotFound {
|
||||||
|
return nil, db, err
|
||||||
|
}
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,26 +124,30 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{})
|
||||||
return fmt.Errorf("failed to set value with %s: %s", acc.Type(), err)
|
return fmt.Errorf("failed to set value with %s: %s", acc.Type(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
i.options.Apply(r)
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put saves a record to the database.
|
// Put saves a record to the database.
|
||||||
func (i *Interface) Put(r record.Record) error {
|
func (i *Interface) Put(r record.Record) error {
|
||||||
_, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
_, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||||
if err != nil {
|
if err != nil && err != ErrNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
i.options.Apply(r)
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutNew saves a record to the database as a new record (ie. with a new creation timestamp).
|
// PutNew saves a record to the database as a new record (ie. with new timestamps).
|
||||||
func (i *Interface) PutNew(r record.Record) error {
|
func (i *Interface) PutNew(r record.Record) error {
|
||||||
_, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
_, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||||
if err != nil && err != ErrNotFound {
|
if err != nil && err != ErrNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.SetMeta(&record.Meta{})
|
i.options.Apply(r)
|
||||||
|
r.Meta().Reset()
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,6 +161,7 @@ func (i *Interface) SetAbsoluteExpiry(key string, time int64) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
|
i.options.Apply(r)
|
||||||
r.Meta().SetAbsoluteExpiry(time)
|
r.Meta().SetAbsoluteExpiry(time)
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
@ -155,6 +176,7 @@ func (i *Interface) SetRelativateExpiry(key string, duration int64) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
|
i.options.Apply(r)
|
||||||
r.Meta().SetRelativateExpiry(duration)
|
r.Meta().SetRelativateExpiry(duration)
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
@ -169,6 +191,7 @@ func (i *Interface) MakeSecret(key string) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
|
i.options.Apply(r)
|
||||||
r.Meta().MakeSecret()
|
r.Meta().MakeSecret()
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
@ -183,6 +206,7 @@ func (i *Interface) MakeCrownJewel(key string) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
|
i.options.Apply(r)
|
||||||
r.Meta().MakeCrownJewel()
|
r.Meta().MakeCrownJewel()
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
@ -197,13 +221,14 @@ func (i *Interface) Delete(key string) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
|
i.options.Apply(r)
|
||||||
r.Meta().Delete()
|
r.Meta().Delete()
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query executes the given query on the database.
|
// Query executes the given query on the database.
|
||||||
func (i *Interface) Query(q *query.Query) (*iterator.Iterator, error) {
|
func (i *Interface) Query(q *query.Query) (*iterator.Iterator, error) {
|
||||||
db, err := getDatabase(q.DatabaseName())
|
db, err := getController(q.DatabaseName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,42 +1,26 @@
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"path"
|
|
||||||
"os"
|
|
||||||
"fmt"
|
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
databasesSubDir = "databases"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
rootDir string
|
rootDir string
|
||||||
)
|
)
|
||||||
|
|
||||||
// Initialize initialized the database
|
func ensureDirectory(dirPath string) error {
|
||||||
func Initialize(location string) error {
|
|
||||||
if initialized.SetToIf(false, true) {
|
|
||||||
rootDir = location
|
|
||||||
|
|
||||||
err := checkRootDir()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not create/open database directory (%s): %s", rootDir, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = loadRegistry()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not load database registry (%s): %s", path.Join(rootDir, registryFileName), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errors.New("database already initialized")
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkRootDir() error {
|
|
||||||
// open dir
|
// open dir
|
||||||
dir, err := os.Open(rootDir)
|
dir, err := os.Open(dirPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == os.ErrNotExist {
|
if os.IsNotExist(err) {
|
||||||
return os.MkdirAll(rootDir, 0700)
|
return os.MkdirAll(dirPath, 0700)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -46,7 +30,9 @@ func checkRootDir() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if !fileInfo.IsDir() {
|
||||||
|
return errors.New("path exists and is not a directory")
|
||||||
|
}
|
||||||
if fileInfo.Mode().Perm() != 0700 {
|
if fileInfo.Mode().Perm() != 0700 {
|
||||||
return dir.Chmod(0700)
|
return dir.Chmod(0700)
|
||||||
}
|
}
|
||||||
|
@ -54,6 +40,13 @@ func checkRootDir() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLocation returns the storage location for the given name and type.
|
// getLocation returns the storage location for the given name and type.
|
||||||
func getLocation(name, storageType string) (location string, err error) {
|
func getLocation(name, storageType string) (string, error) {
|
||||||
return path.Join(rootDir, name, storageType), nil
|
location := path.Join(rootDir, databasesSubDir, name, storageType)
|
||||||
|
|
||||||
|
// check location
|
||||||
|
err := ensureDirectory(location)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("location (%s) invalid: %s", location, err)
|
||||||
|
}
|
||||||
|
return location, nil
|
||||||
}
|
}
|
||||||
|
|
55
database/main.go
Normal file
55
database/main.go
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"path"
|
||||||
|
|
||||||
|
"github.com/tevino/abool"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
initialized = abool.NewBool(false)
|
||||||
|
|
||||||
|
shuttingDown = abool.NewBool(false)
|
||||||
|
shutdownSignal = make(chan struct{})
|
||||||
|
)
|
||||||
|
|
||||||
|
// Initialize initialized the database
|
||||||
|
func Initialize(location string) error {
|
||||||
|
if initialized.SetToIf(false, true) {
|
||||||
|
rootDir = location
|
||||||
|
|
||||||
|
err := ensureDirectory(rootDir)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not create/open database directory (%s): %s", rootDir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = loadRegistry()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not load database registry (%s): %s", path.Join(rootDir, registryFileName), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// start registry writer
|
||||||
|
go registryWriter()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errors.New("database already initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown shuts down the whole database system.
|
||||||
|
func Shutdown() (err error) {
|
||||||
|
if shuttingDown.SetToIf(false, true) {
|
||||||
|
close(shutdownSignal)
|
||||||
|
}
|
||||||
|
|
||||||
|
all := duplicateControllers()
|
||||||
|
for _, c := range all {
|
||||||
|
err = c.Shutdown()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
|
@ -14,8 +14,8 @@ func Maintain() (err error) {
|
||||||
|
|
||||||
// MaintainThorough runs the MaintainThorough method on all storages.
|
// MaintainThorough runs the MaintainThorough method on all storages.
|
||||||
func MaintainThorough() (err error) {
|
func MaintainThorough() (err error) {
|
||||||
controllers := duplicateControllers()
|
all := duplicateControllers()
|
||||||
for _, c := range controllers {
|
for _, c := range all {
|
||||||
err = c.MaintainThorough()
|
err = c.MaintainThorough()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -24,26 +24,12 @@ func MaintainThorough() (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown shuts down the whole database system.
|
func duplicateControllers() (all []*Controller) {
|
||||||
func Shutdown() (err error) {
|
controllersLock.Lock()
|
||||||
shuttingDown.Set()
|
defer controllersLock.Unlock()
|
||||||
|
|
||||||
controllers := duplicateControllers()
|
|
||||||
for _, c := range controllers {
|
for _, c := range controllers {
|
||||||
err = c.Shutdown()
|
all = append(all, c)
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func duplicateControllers() (controllers []*Controller) {
|
|
||||||
databasesLock.Lock()
|
|
||||||
defer databasesLock.Unlock()
|
|
||||||
|
|
||||||
for _, c := range databases {
|
|
||||||
controllers = append(controllers, c)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -3,67 +3,100 @@ package database
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/tevino/abool"
|
"github.com/tevino/abool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RegisteredDatabase holds information about registered databases
|
|
||||||
type RegisteredDatabase struct {
|
|
||||||
Name string
|
|
||||||
Description string
|
|
||||||
StorageType string
|
|
||||||
PrimaryAPI string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Equal returns whether this instance equals another.
|
|
||||||
func (r *RegisteredDatabase) Equal(o *RegisteredDatabase) bool {
|
|
||||||
if r.Name != o.Name ||
|
|
||||||
r.Description != o.Description ||
|
|
||||||
r.StorageType != o.StorageType ||
|
|
||||||
r.PrimaryAPI != o.PrimaryAPI {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
registryFileName = "databases.json"
|
registryFileName = "databases.json"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
initialized = abool.NewBool(false)
|
writeRegistrySoon = abool.NewBool(false)
|
||||||
|
|
||||||
registry map[string]*RegisteredDatabase
|
registry map[string]*Database
|
||||||
registryLock sync.Mutex
|
registryLock sync.Mutex
|
||||||
|
|
||||||
nameConstraint = regexp.MustCompile("^[A-Za-z0-9_-]{5,}$")
|
nameConstraint = regexp.MustCompile("^[A-Za-z0-9_-]{4,}$")
|
||||||
)
|
)
|
||||||
|
|
||||||
// RegisterDatabase registers a new database.
|
// Register registers a new database.
|
||||||
func RegisterDatabase(new *RegisteredDatabase) error {
|
// If the database is already registered, only
|
||||||
|
// the description and the primary API will be
|
||||||
|
// updated and the effective object will be returned.
|
||||||
|
func Register(new *Database) (*Database, error) {
|
||||||
if !initialized.IsSet() {
|
if !initialized.IsSet() {
|
||||||
return errors.New("database not initialized")
|
return nil, errors.New("database not initialized")
|
||||||
}
|
|
||||||
|
|
||||||
if !nameConstraint.MatchString(new.Name) {
|
|
||||||
return errors.New("database name must only contain alphanumeric and `_-` characters and must be at least 5 characters long")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
registryLock.Lock()
|
registryLock.Lock()
|
||||||
defer registryLock.Unlock()
|
defer registryLock.Unlock()
|
||||||
|
|
||||||
registeredDB, ok := registry[new.Name]
|
registeredDB, ok := registry[new.Name]
|
||||||
if !ok || !new.Equal(registeredDB) {
|
save := false
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
// update database
|
||||||
|
if registeredDB.Description != new.Description {
|
||||||
|
registeredDB.Description = new.Description
|
||||||
|
save = true
|
||||||
|
}
|
||||||
|
if registeredDB.PrimaryAPI != new.PrimaryAPI {
|
||||||
|
registeredDB.PrimaryAPI = new.PrimaryAPI
|
||||||
|
save = true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// register new database
|
||||||
|
if !nameConstraint.MatchString(new.Name) {
|
||||||
|
return nil, errors.New("database name must only contain alphanumeric and `_-` characters and must be at least 4 characters long")
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().Round(time.Second)
|
||||||
|
new.Registered = now
|
||||||
|
new.LastUpdated = now
|
||||||
|
new.LastLoaded = time.Time{}
|
||||||
|
|
||||||
registry[new.Name] = new
|
registry[new.Name] = new
|
||||||
return saveRegistry()
|
save = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
if save {
|
||||||
|
if ok {
|
||||||
|
registeredDB.Updated()
|
||||||
|
}
|
||||||
|
err := saveRegistry(false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
return registeredDB, nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getDatabase(name string) (*Database, error) {
|
||||||
|
registryLock.Lock()
|
||||||
|
defer registryLock.Unlock()
|
||||||
|
|
||||||
|
registeredDB, ok := registry[name]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf(`database "%s" not registered`, name)
|
||||||
|
}
|
||||||
|
if time.Now().Add(-24 * time.Hour).After(registeredDB.LastLoaded) {
|
||||||
|
writeRegistrySoon.Set()
|
||||||
|
}
|
||||||
|
registeredDB.Loaded()
|
||||||
|
|
||||||
|
return registeredDB, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadRegistry() error {
|
func loadRegistry() error {
|
||||||
|
@ -74,15 +107,15 @@ func loadRegistry() error {
|
||||||
filePath := path.Join(rootDir, registryFileName)
|
filePath := path.Join(rootDir, registryFileName)
|
||||||
data, err := ioutil.ReadFile(filePath)
|
data, err := ioutil.ReadFile(filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == os.ErrNotExist {
|
if os.IsNotExist(err) {
|
||||||
registry = make(map[string]*RegisteredDatabase)
|
registry = make(map[string]*Database)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse
|
// parse
|
||||||
new := make(map[string]*RegisteredDatabase)
|
new := make(map[string]*Database)
|
||||||
err = json.Unmarshal(data, new)
|
err = json.Unmarshal(data, new)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -93,12 +126,14 @@ func loadRegistry() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func saveRegistry() error {
|
func saveRegistry(lock bool) error {
|
||||||
registryLock.Lock()
|
if lock {
|
||||||
defer registryLock.Unlock()
|
registryLock.Lock()
|
||||||
|
defer registryLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// marshal
|
// marshal
|
||||||
data, err := json.Marshal(registry)
|
data, err := json.MarshalIndent(registry, "", "\t")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -107,3 +142,17 @@ func saveRegistry() error {
|
||||||
filePath := path.Join(rootDir, registryFileName)
|
filePath := path.Join(rootDir, registryFileName)
|
||||||
return ioutil.WriteFile(filePath, data, 0600)
|
return ioutil.WriteFile(filePath, data, 0600)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func registryWriter() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(1 * time.Hour):
|
||||||
|
if writeRegistrySoon.SetToIf(true, false) {
|
||||||
|
saveRegistry(true)
|
||||||
|
}
|
||||||
|
case <-shutdownSignal:
|
||||||
|
saveRegistry(true)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -116,7 +116,7 @@ func (b *Badger) Delete(key string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query returns a an iterator for the supplied query.
|
// Query returns a an iterator for the supplied query.
|
||||||
func (b *Badger) Query(q *query.Query) (*iterator.Iterator, error) {
|
func (b *Badger) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
||||||
return nil, errors.New("query not implemented by badger")
|
return nil, errors.New("query not implemented by badger")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
69
database/storage/badger/badger_test.go
Normal file
69
database/storage/badger/badger_test.go
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
package badger
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/Safing/portbase/database/record"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TestRecord struct {
|
||||||
|
record.Base
|
||||||
|
lock sync.Mutex
|
||||||
|
S string
|
||||||
|
I int
|
||||||
|
I8 int8
|
||||||
|
I16 int16
|
||||||
|
I32 int32
|
||||||
|
I64 int64
|
||||||
|
UI uint
|
||||||
|
UI8 uint8
|
||||||
|
UI16 uint16
|
||||||
|
UI32 uint32
|
||||||
|
UI64 uint64
|
||||||
|
F32 float32
|
||||||
|
F64 float64
|
||||||
|
B bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr *TestRecord) Lock() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr *TestRecord) Unlock() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBadger(t *testing.T) {
|
||||||
|
testDir, err := ioutil.TempDir("", "testing-")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(testDir) // clean up
|
||||||
|
|
||||||
|
db, err := NewBadger("test", testDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &TestRecord{S: "banana"}
|
||||||
|
a.SetMeta(&record.Meta{})
|
||||||
|
a.Meta().Update()
|
||||||
|
a.SetKey("test:A")
|
||||||
|
|
||||||
|
err = db.Put(a)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r1, err := db.Get("A")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a1 := r1.(*TestRecord)
|
||||||
|
|
||||||
|
if a.S != a1.S {
|
||||||
|
t.Fatal("mismatch")
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,5 +4,5 @@ import "errors"
|
||||||
|
|
||||||
// Errors for storages
|
// Errors for storages
|
||||||
var (
|
var (
|
||||||
ErrNotFound = errors.New("not found")
|
ErrNotFound = errors.New("storage entry could not be found")
|
||||||
)
|
)
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
type Factory func(name, location string) (Interface, error)
|
type Factory func(name, location string) (Interface, error)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
storages map[string]Factory
|
storages = make(map[string]Factory)
|
||||||
storagesLock sync.Mutex
|
storagesLock sync.Mutex
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -38,9 +38,9 @@ func StartDatabase(name, storageType, location string) (Interface, error) {
|
||||||
storagesLock.Lock()
|
storagesLock.Lock()
|
||||||
defer storagesLock.Unlock()
|
defer storagesLock.Unlock()
|
||||||
|
|
||||||
factory, ok := storages[name]
|
factory, ok := storages[storageType]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("storage of this type (%s) does not exist", storageType)
|
return nil, fmt.Errorf("storage type %s not registered", storageType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return factory(name, location)
|
return factory(name, location)
|
||||||
|
|
Loading…
Add table
Reference in a new issue