Remove exclusiveAccess lock from database controller

This commit is contained in:
Patrick Pacher 2020-10-15 11:01:15 +02:00
parent 458d4e7f15
commit 7e4d441c2a
No known key found for this signature in database
GPG key ID: E8CD2DA160925A6D
4 changed files with 160 additions and 131 deletions

View file

@ -6,8 +6,6 @@ import (
"sync"
"time"
"github.com/tevino/abool"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
@ -19,13 +17,11 @@ type Controller struct {
storage storage.Interface
shadowDelete bool
hooks []*RegisteredHook
subscriptions []*Subscription
hooksLock sync.RWMutex
hooks []*RegisteredHook
exclusiveAccess sync.RWMutex
migrating *abool.AtomicBool // TODO
hibernating *abool.AtomicBool // TODO
subscriptionLock sync.RWMutex
subscriptions []*Subscription
}
// newController creates a new controller for a storage.
@ -33,8 +29,6 @@ func newController(storageInt storage.Interface, shadowDelete bool) *Controller
return &Controller{
storage: storageInt,
shadowDelete: shadowDelete,
migrating: abool.NewBool(false),
hibernating: abool.NewBool(false),
}
}
@ -50,21 +44,12 @@ func (c *Controller) Injected() bool {
// Get return the record with the given key.
func (c *Controller) Get(key string) (record.Record, error) {
c.exclusiveAccess.RLock()
defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() {
return nil, ErrShuttingDown
}
// process hooks
for _, hook := range c.hooks {
if hook.h.UsesPreGet() && hook.q.MatchesKey(key) {
err := hook.h.PreGet(key)
if err != nil {
return nil, err
}
}
if err := c.runPreGetHooks(key); err != nil {
return nil, err
}
r, err := c.storage.Get(key)
@ -79,14 +64,9 @@ func (c *Controller) Get(key string) (record.Record, error) {
r.Lock()
defer r.Unlock()
// process hooks
for _, hook := range c.hooks {
if hook.h.UsesPostGet() && hook.q.Matches(r) {
r, err = hook.h.PostGet(r)
if err != nil {
return nil, err
}
}
r, err = c.runPostGetHooks(r)
if err != nil {
return nil, err
}
if !r.Meta().CheckValidity() {
@ -96,11 +76,11 @@ func (c *Controller) Get(key string) (record.Record, error) {
return r, nil
}
// Put saves a record in the database.
// Put saves a record in the database, executes any registered
// pre-put hooks and finally send an update to all subscribers.
// The record must be locked and secured from concurrent access
// when calling Put().
func (c *Controller) Put(r record.Record) (err error) {
c.exclusiveAccess.RLock()
defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() {
return ErrShuttingDown
}
@ -109,51 +89,35 @@ func (c *Controller) Put(r record.Record) (err error) {
return ErrReadOnly
}
// process hooks
for _, hook := range c.hooks {
if hook.h.UsesPrePut() && hook.q.Matches(r) {
r, err = hook.h.PrePut(r)
if err != nil {
return err
}
}
r, err = c.runPrePutHooks(r)
if err != nil {
return err
}
if !c.shadowDelete && r.Meta().IsDeleted() {
// Immediate delete.
err = c.storage.Delete(r.DatabaseKey())
if err != nil {
return err
}
} else {
// Put or shadow delete.
r, err = c.storage.Put(r)
if err != nil {
return err
}
if r == nil {
return errors.New("storage returned nil record after successful put operation")
}
}
// process subscriptions
for _, sub := range c.subscriptions {
if r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) {
select {
case sub.Feed <- r:
default:
}
}
if err != nil {
return err
}
if r == nil {
return errors.New("storage returned nil record after successful put operation")
}
c.notifySubscribers(r)
return nil
}
// PutMany stores many records in the database.
// PutMany stores many records in the database. It does not
// process any hooks or update subscriptions. Use with care!
func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
c.exclusiveAccess.RLock()
defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() {
errs := make(chan error, 1)
errs <- ErrShuttingDown
@ -177,20 +141,15 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
// Query executes the given query on the database.
func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
c.exclusiveAccess.RLock()
if shuttingDown.IsSet() {
c.exclusiveAccess.RUnlock()
return nil, ErrShuttingDown
}
it, err := c.storage.Query(q, local, internal)
if err != nil {
c.exclusiveAccess.RUnlock()
return nil, err
}
go c.readUnlockerAfterQuery(it)
return it, nil
}
@ -199,47 +158,27 @@ func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iter
// PushUpdate.
func (c *Controller) PushUpdate(r record.Record) {
if c != nil {
c.exclusiveAccess.RLock()
defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() {
return
}
for _, sub := range c.subscriptions {
push := r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r)
if push {
select {
case sub.Feed <- r:
default:
}
}
}
c.notifySubscribers(r)
}
}
func (c *Controller) addSubscription(sub *Subscription) {
c.exclusiveAccess.Lock()
defer c.exclusiveAccess.Unlock()
if shuttingDown.IsSet() {
return
}
c.subscriptions = append(c.subscriptions, sub)
}
c.subscriptionLock.Lock()
defer c.subscriptionLock.Unlock()
func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) {
defer c.exclusiveAccess.RUnlock()
<-it.Done
c.subscriptions = append(c.subscriptions, sub)
}
// Maintain runs the Maintain method on the storage.
func (c *Controller) Maintain(ctx context.Context) error {
c.exclusiveAccess.RLock()
defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() {
return ErrShuttingDown
}
@ -250,11 +189,9 @@ func (c *Controller) Maintain(ctx context.Context) error {
return nil
}
// MaintainThorough runs the MaintainThorough method on the storage.
// MaintainThorough runs the MaintainThorough method on the
// storage.
func (c *Controller) MaintainThorough(ctx context.Context) error {
c.exclusiveAccess.RLock()
defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() {
return ErrShuttingDown
}
@ -265,11 +202,9 @@ func (c *Controller) MaintainThorough(ctx context.Context) error {
return nil
}
// MaintainRecordStates runs the record state lifecycle maintenance on the storage.
// MaintainRecordStates runs the record state lifecycle
// maintenance on the storage.
func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
c.exclusiveAccess.RLock()
defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() {
return ErrShuttingDown
}
@ -277,11 +212,9 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore, c.shadowDelete)
}
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.
// Purge deletes all records that match the given query.
// It returns the number of successful deletes and an error.
func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal bool) (int, error) {
c.exclusiveAccess.RLock()
defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() {
return 0, ErrShuttingDown
}
@ -289,13 +222,96 @@ func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal
if purger, ok := c.storage.(storage.Purger); ok {
return purger.Purge(ctx, q, local, internal, c.shadowDelete)
}
return 0, ErrNotImplemented
}
// Shutdown shuts down the storage.
func (c *Controller) Shutdown() error {
c.exclusiveAccess.Lock()
defer c.exclusiveAccess.Unlock()
return c.storage.Shutdown()
}
// notifySubscribers notifies all subscribers that are interested
// in r. r must be locked when calling notifySubscribers.
// Any subscriber that is not blocking on it's feed channel will
// be skipped.
func (c *Controller) notifySubscribers(r record.Record) {
c.subscriptionLock.RLock()
defer c.subscriptionLock.RUnlock()
for _, sub := range c.subscriptions {
if r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) {
select {
case sub.Feed <- r:
default:
}
}
}
}
func (c *Controller) runPreGetHooks(key string) error {
c.hooksLock.RLock()
defer c.hooksLock.RUnlock()
for _, hook := range c.hooks {
if !hook.h.UsesPreGet() {
continue
}
if !hook.q.MatchesKey(key) {
continue
}
if err := hook.h.PreGet(key); err != nil {
return err
}
}
return nil
}
func (c *Controller) runPostGetHooks(r record.Record) (record.Record, error) {
c.hooksLock.RLock()
defer c.hooksLock.RUnlock()
var err error
for _, hook := range c.hooks {
if !hook.h.UsesPostGet() {
continue
}
if !hook.q.Matches(r) {
continue
}
r, err = hook.h.PostGet(r)
if err != nil {
return nil, err
}
}
return r, nil
}
func (c *Controller) runPrePutHooks(r record.Record) (record.Record, error) {
c.hooksLock.RLock()
defer c.hooksLock.RUnlock()
var err error
for _, hook := range c.hooks {
if !hook.h.UsesPrePut() {
continue
}
if !hook.q.Matches(r) {
continue
}
r, err = hook.h.PrePut(r)
if err != nil {
return nil, err
}
}
return r, nil
}

View file

@ -1,7 +1,6 @@
package database
import (
"errors"
"time"
)
@ -16,11 +15,6 @@ type Database struct {
LastLoaded time.Time
}
// MigrateTo migrates the database to another storage type.
func (db *Database) MigrateTo(newStorageType string) error {
return errors.New("not implemented yet") // TODO
}
// Loaded updates the LastLoaded timestamp.
func (db *Database) Loaded() {
db.LastLoaded = time.Now().Round(time.Second)

View file

@ -5,15 +5,36 @@ import (
"github.com/safing/portbase/database/record"
)
// Hook describes a hook
// Hook can be registered for a database query and
// will be executed at certain points during the life
// cycle of a database record.
type Hook interface {
// UsesPreGet should return true if the hook's PreGet
// should be called prior to loading a database record
// from the underlying storage.
UsesPreGet() bool
// PreGet is called before a database record is loaded from
// the underlying storage. A PreGet hookd may be used to
// implement more advanced access control on database keys.
PreGet(dbKey string) error
// UsesPostGet should returnd true if the hook's PostGet
// should be called after loading a database record from
// the underlying storage.
UsesPostGet() bool
// PostGet is called after a record has been loaded form the
// underlying storage and may perform additional mutation
// or access check based on the records data.
// The passed record is already locked by the database system
// so users can safely access all data of r.
PostGet(r record.Record) (record.Record, error)
// UsesPrePut should return true if the hook's PrePut method
// should be called prior to saving a record in the database.
UsesPrePut() bool
// PrePut is called prior to saving (creating or updating) a
// record in the database storage. It may be used to perform
// extended validation or mutations on the record.
// The passed record is already locked by the database system
// so users can safely access all data of r.
PrePut(r record.Record) (record.Record, error)
}
@ -23,7 +44,8 @@ type RegisteredHook struct {
h Hook
}
// RegisterHook registers a hook for records matching the given query in the database.
// RegisterHook registers a hook for records matching the given
// query in the database.
func RegisterHook(q *query.Query, hook Hook) (*RegisteredHook, error) {
_, err := q.Check()
if err != nil {
@ -35,26 +57,29 @@ func RegisterHook(q *query.Query, hook Hook) (*RegisteredHook, error) {
return nil, err
}
c.exclusiveAccess.Lock()
defer c.exclusiveAccess.Unlock()
rh := &RegisteredHook{
q: q,
h: hook,
}
c.hooksLock.Lock()
defer c.hooksLock.Unlock()
c.hooks = append(c.hooks, rh)
return rh, nil
}
// Cancel unhooks the hook.
// Cancel unregisteres the hook from the database. Once
// Cancel returned the hook's methods will not be called
// anymore for updates that matched the registered query.
func (h *RegisteredHook) Cancel() error {
c, err := getController(h.q.DatabaseName())
if err != nil {
return err
}
c.exclusiveAccess.Lock()
defer c.exclusiveAccess.Unlock()
c.hooksLock.Lock()
defer c.hooksLock.Unlock()
for key, hook := range c.hooks {
if hook.q == h.q {

View file

@ -10,7 +10,6 @@ type Subscription struct {
q *query.Query
local bool
internal bool
canceled bool
Feed chan record.Record
}
@ -22,18 +21,13 @@ func (s *Subscription) Cancel() error {
return err
}
c.exclusiveAccess.Lock()
defer c.exclusiveAccess.Unlock()
if s.canceled {
return nil
}
s.canceled = true
close(s.Feed)
c.subscriptionLock.Lock()
defer c.subscriptionLock.Unlock()
for key, sub := range c.subscriptions {
if sub.q == s.q {
c.subscriptions = append(c.subscriptions[:key], c.subscriptions[key+1:]...)
close(s.Feed) // this close is guarded by the controllers subscriptionLock.
return nil
}
}