diff --git a/database/controller.go b/database/controller.go index 35e073a..e015cfb 100644 --- a/database/controller.go +++ b/database/controller.go @@ -6,8 +6,6 @@ import ( "sync" "time" - "github.com/tevino/abool" - "github.com/safing/portbase/database/iterator" "github.com/safing/portbase/database/query" "github.com/safing/portbase/database/record" @@ -19,13 +17,11 @@ type Controller struct { storage storage.Interface shadowDelete bool - hooks []*RegisteredHook - subscriptions []*Subscription + hooksLock sync.RWMutex + hooks []*RegisteredHook - exclusiveAccess sync.RWMutex - - migrating *abool.AtomicBool // TODO - hibernating *abool.AtomicBool // TODO + subscriptionLock sync.RWMutex + subscriptions []*Subscription } // newController creates a new controller for a storage. @@ -33,8 +29,6 @@ func newController(storageInt storage.Interface, shadowDelete bool) *Controller return &Controller{ storage: storageInt, shadowDelete: shadowDelete, - migrating: abool.NewBool(false), - hibernating: abool.NewBool(false), } } @@ -50,21 +44,12 @@ func (c *Controller) Injected() bool { // Get return the record with the given key. func (c *Controller) Get(key string) (record.Record, error) { - c.exclusiveAccess.RLock() - defer c.exclusiveAccess.RUnlock() - if shuttingDown.IsSet() { return nil, ErrShuttingDown } - // process hooks - for _, hook := range c.hooks { - if hook.h.UsesPreGet() && hook.q.MatchesKey(key) { - err := hook.h.PreGet(key) - if err != nil { - return nil, err - } - } + if err := c.runPreGetHooks(key); err != nil { + return nil, err } r, err := c.storage.Get(key) @@ -79,14 +64,9 @@ func (c *Controller) Get(key string) (record.Record, error) { r.Lock() defer r.Unlock() - // process hooks - for _, hook := range c.hooks { - if hook.h.UsesPostGet() && hook.q.Matches(r) { - r, err = hook.h.PostGet(r) - if err != nil { - return nil, err - } - } + r, err = c.runPostGetHooks(r) + if err != nil { + return nil, err } if !r.Meta().CheckValidity() { @@ -96,11 +76,11 @@ func (c *Controller) Get(key string) (record.Record, error) { return r, nil } -// Put saves a record in the database. +// Put saves a record in the database, executes any registered +// pre-put hooks and finally send an update to all subscribers. +// The record must be locked and secured from concurrent access +// when calling Put(). func (c *Controller) Put(r record.Record) (err error) { - c.exclusiveAccess.RLock() - defer c.exclusiveAccess.RUnlock() - if shuttingDown.IsSet() { return ErrShuttingDown } @@ -109,51 +89,35 @@ func (c *Controller) Put(r record.Record) (err error) { return ErrReadOnly } - // process hooks - for _, hook := range c.hooks { - if hook.h.UsesPrePut() && hook.q.Matches(r) { - r, err = hook.h.PrePut(r) - if err != nil { - return err - } - } + r, err = c.runPrePutHooks(r) + if err != nil { + return err } if !c.shadowDelete && r.Meta().IsDeleted() { // Immediate delete. err = c.storage.Delete(r.DatabaseKey()) - if err != nil { - return err - } } else { // Put or shadow delete. r, err = c.storage.Put(r) - if err != nil { - return err - } - if r == nil { - return errors.New("storage returned nil record after successful put operation") - } } - // process subscriptions - for _, sub := range c.subscriptions { - if r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) { - select { - case sub.Feed <- r: - default: - } - } + if err != nil { + return err } + if r == nil { + return errors.New("storage returned nil record after successful put operation") + } + + c.notifySubscribers(r) + return nil } -// PutMany stores many records in the database. +// PutMany stores many records in the database. It does not +// process any hooks or update subscriptions. Use with care! func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { - c.exclusiveAccess.RLock() - defer c.exclusiveAccess.RUnlock() - if shuttingDown.IsSet() { errs := make(chan error, 1) errs <- ErrShuttingDown @@ -177,20 +141,15 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { // Query executes the given query on the database. func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { - c.exclusiveAccess.RLock() - if shuttingDown.IsSet() { - c.exclusiveAccess.RUnlock() return nil, ErrShuttingDown } it, err := c.storage.Query(q, local, internal) if err != nil { - c.exclusiveAccess.RUnlock() return nil, err } - go c.readUnlockerAfterQuery(it) return it, nil } @@ -199,47 +158,27 @@ func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iter // PushUpdate. func (c *Controller) PushUpdate(r record.Record) { if c != nil { - c.exclusiveAccess.RLock() - defer c.exclusiveAccess.RUnlock() - if shuttingDown.IsSet() { return } - for _, sub := range c.subscriptions { - push := r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) - - if push { - select { - case sub.Feed <- r: - default: - } - } - } + c.notifySubscribers(r) } } func (c *Controller) addSubscription(sub *Subscription) { - c.exclusiveAccess.Lock() - defer c.exclusiveAccess.Unlock() - if shuttingDown.IsSet() { return } - c.subscriptions = append(c.subscriptions, sub) -} + c.subscriptionLock.Lock() + defer c.subscriptionLock.Unlock() -func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) { - defer c.exclusiveAccess.RUnlock() - <-it.Done + c.subscriptions = append(c.subscriptions, sub) } // Maintain runs the Maintain method on the storage. func (c *Controller) Maintain(ctx context.Context) error { - c.exclusiveAccess.RLock() - defer c.exclusiveAccess.RUnlock() - if shuttingDown.IsSet() { return ErrShuttingDown } @@ -250,11 +189,9 @@ func (c *Controller) Maintain(ctx context.Context) error { return nil } -// MaintainThorough runs the MaintainThorough method on the storage. +// MaintainThorough runs the MaintainThorough method on the +// storage. func (c *Controller) MaintainThorough(ctx context.Context) error { - c.exclusiveAccess.RLock() - defer c.exclusiveAccess.RUnlock() - if shuttingDown.IsSet() { return ErrShuttingDown } @@ -265,11 +202,9 @@ func (c *Controller) MaintainThorough(ctx context.Context) error { return nil } -// MaintainRecordStates runs the record state lifecycle maintenance on the storage. +// MaintainRecordStates runs the record state lifecycle +// maintenance on the storage. func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { - c.exclusiveAccess.RLock() - defer c.exclusiveAccess.RUnlock() - if shuttingDown.IsSet() { return ErrShuttingDown } @@ -277,11 +212,9 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore, c.shadowDelete) } -// Purge deletes all records that match the given query. It returns the number of successful deletes and an error. +// Purge deletes all records that match the given query. +// It returns the number of successful deletes and an error. func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal bool) (int, error) { - c.exclusiveAccess.RLock() - defer c.exclusiveAccess.RUnlock() - if shuttingDown.IsSet() { return 0, ErrShuttingDown } @@ -289,13 +222,96 @@ func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal if purger, ok := c.storage.(storage.Purger); ok { return purger.Purge(ctx, q, local, internal, c.shadowDelete) } + return 0, ErrNotImplemented } // Shutdown shuts down the storage. func (c *Controller) Shutdown() error { - c.exclusiveAccess.Lock() - defer c.exclusiveAccess.Unlock() - return c.storage.Shutdown() } + +// notifySubscribers notifies all subscribers that are interested +// in r. r must be locked when calling notifySubscribers. +// Any subscriber that is not blocking on it's feed channel will +// be skipped. +func (c *Controller) notifySubscribers(r record.Record) { + c.subscriptionLock.RLock() + defer c.subscriptionLock.RUnlock() + + for _, sub := range c.subscriptions { + if r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) { + select { + case sub.Feed <- r: + default: + } + } + } +} + +func (c *Controller) runPreGetHooks(key string) error { + c.hooksLock.RLock() + defer c.hooksLock.RUnlock() + + for _, hook := range c.hooks { + if !hook.h.UsesPreGet() { + continue + } + + if !hook.q.MatchesKey(key) { + continue + } + + if err := hook.h.PreGet(key); err != nil { + return err + } + } + + return nil +} + +func (c *Controller) runPostGetHooks(r record.Record) (record.Record, error) { + c.hooksLock.RLock() + defer c.hooksLock.RUnlock() + + var err error + for _, hook := range c.hooks { + if !hook.h.UsesPostGet() { + continue + } + + if !hook.q.Matches(r) { + continue + } + + r, err = hook.h.PostGet(r) + if err != nil { + return nil, err + } + } + + return r, nil +} + +func (c *Controller) runPrePutHooks(r record.Record) (record.Record, error) { + c.hooksLock.RLock() + defer c.hooksLock.RUnlock() + + var err error + for _, hook := range c.hooks { + if !hook.h.UsesPrePut() { + continue + } + + if !hook.q.Matches(r) { + continue + } + + r, err = hook.h.PrePut(r) + if err != nil { + return nil, err + } + } + + return r, nil +} diff --git a/database/database.go b/database/database.go index 048f35f..9b77f04 100644 --- a/database/database.go +++ b/database/database.go @@ -1,7 +1,6 @@ package database import ( - "errors" "time" ) @@ -16,11 +15,6 @@ type Database struct { LastLoaded time.Time } -// MigrateTo migrates the database to another storage type. -func (db *Database) MigrateTo(newStorageType string) error { - return errors.New("not implemented yet") // TODO -} - // Loaded updates the LastLoaded timestamp. func (db *Database) Loaded() { db.LastLoaded = time.Now().Round(time.Second) diff --git a/database/hook.go b/database/hook.go index 43a1eaf..f395ef0 100644 --- a/database/hook.go +++ b/database/hook.go @@ -5,15 +5,36 @@ import ( "github.com/safing/portbase/database/record" ) -// Hook describes a hook +// Hook can be registered for a database query and +// will be executed at certain points during the life +// cycle of a database record. type Hook interface { + // UsesPreGet should return true if the hook's PreGet + // should be called prior to loading a database record + // from the underlying storage. UsesPreGet() bool + // PreGet is called before a database record is loaded from + // the underlying storage. A PreGet hookd may be used to + // implement more advanced access control on database keys. PreGet(dbKey string) error - + // UsesPostGet should returnd true if the hook's PostGet + // should be called after loading a database record from + // the underlying storage. UsesPostGet() bool + // PostGet is called after a record has been loaded form the + // underlying storage and may perform additional mutation + // or access check based on the records data. + // The passed record is already locked by the database system + // so users can safely access all data of r. PostGet(r record.Record) (record.Record, error) - + // UsesPrePut should return true if the hook's PrePut method + // should be called prior to saving a record in the database. UsesPrePut() bool + // PrePut is called prior to saving (creating or updating) a + // record in the database storage. It may be used to perform + // extended validation or mutations on the record. + // The passed record is already locked by the database system + // so users can safely access all data of r. PrePut(r record.Record) (record.Record, error) } @@ -23,7 +44,8 @@ type RegisteredHook struct { h Hook } -// RegisterHook registers a hook for records matching the given query in the database. +// RegisterHook registers a hook for records matching the given +// query in the database. func RegisterHook(q *query.Query, hook Hook) (*RegisteredHook, error) { _, err := q.Check() if err != nil { @@ -35,26 +57,29 @@ func RegisterHook(q *query.Query, hook Hook) (*RegisteredHook, error) { return nil, err } - c.exclusiveAccess.Lock() - defer c.exclusiveAccess.Unlock() - rh := &RegisteredHook{ q: q, h: hook, } + + c.hooksLock.Lock() + defer c.hooksLock.Unlock() c.hooks = append(c.hooks, rh) + return rh, nil } -// Cancel unhooks the hook. +// Cancel unregisteres the hook from the database. Once +// Cancel returned the hook's methods will not be called +// anymore for updates that matched the registered query. func (h *RegisteredHook) Cancel() error { c, err := getController(h.q.DatabaseName()) if err != nil { return err } - c.exclusiveAccess.Lock() - defer c.exclusiveAccess.Unlock() + c.hooksLock.Lock() + defer c.hooksLock.Unlock() for key, hook := range c.hooks { if hook.q == h.q { diff --git a/database/subscription.go b/database/subscription.go index 923b7f8..803ba00 100644 --- a/database/subscription.go +++ b/database/subscription.go @@ -10,7 +10,6 @@ type Subscription struct { q *query.Query local bool internal bool - canceled bool Feed chan record.Record } @@ -22,18 +21,13 @@ func (s *Subscription) Cancel() error { return err } - c.exclusiveAccess.Lock() - defer c.exclusiveAccess.Unlock() - - if s.canceled { - return nil - } - s.canceled = true - close(s.Feed) + c.subscriptionLock.Lock() + defer c.subscriptionLock.Unlock() for key, sub := range c.subscriptions { if sub.q == s.q { c.subscriptions = append(c.subscriptions[:key], c.subscriptions[key+1:]...) + close(s.Feed) // this close is guarded by the controllers subscriptionLock. return nil } }