From 50a10485e1299d16f15094ee70574f4d6d8f302c Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Mon, 21 Sep 2020 15:18:54 +0200 Subject: [PATCH] Fix race condition in database package. Simplify db locking --- database/controller.go | 62 ++++++++++++++++++---------------------- database/hook.go | 12 +++----- database/subscription.go | 6 ++-- 3 files changed, 34 insertions(+), 46 deletions(-) diff --git a/database/controller.go b/database/controller.go index 4f7ae3e..6b97239 100644 --- a/database/controller.go +++ b/database/controller.go @@ -21,12 +21,7 @@ type Controller struct { hooks []*RegisteredHook subscriptions []*Subscription - writeLock sync.RWMutex - // Lock: nobody may write - // RLock: concurrent writing - readLock sync.RWMutex - // Lock: nobody may read - // RLock: concurrent reading + exclusiveAccess sync.RWMutex migrating *abool.AtomicBool // TODO hibernating *abool.AtomicBool // TODO @@ -53,8 +48,8 @@ func (c *Controller) Injected() bool { // Get return the record with the given key. func (c *Controller) Get(key string) (record.Record, error) { - c.readLock.RLock() - defer c.readLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return nil, ErrShuttingDown @@ -101,8 +96,8 @@ func (c *Controller) Get(key string) (record.Record, error) { // Put saves a record in the database. func (c *Controller) Put(r record.Record) (err error) { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return ErrShuttingDown @@ -145,8 +140,8 @@ func (c *Controller) Put(r record.Record) (err error) { // PutMany stores many records in the database. func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { errs := make(chan error, 1) @@ -171,16 +166,16 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { // Query executes the given query on the database. func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { - c.readLock.RLock() + c.exclusiveAccess.RLock() if shuttingDown.IsSet() { - c.readLock.RUnlock() + c.exclusiveAccess.RUnlock() return nil, ErrShuttingDown } it, err := c.storage.Query(q, local, internal) if err != nil { - c.readLock.RUnlock() + c.exclusiveAccess.RUnlock() return nil, err } @@ -191,15 +186,19 @@ func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iter // PushUpdate pushes a record update to subscribers. func (c *Controller) PushUpdate(r record.Record) { if c != nil { - c.readLock.RLock() - defer c.readLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return } for _, sub := range c.subscriptions { - if r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) { + r.Lock() + push := r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) + r.Unlock() + + if push { select { case sub.Feed <- r: default: @@ -210,10 +209,8 @@ func (c *Controller) PushUpdate(r record.Record) { } func (c *Controller) addSubscription(sub *Subscription) { - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() if shuttingDown.IsSet() { return @@ -223,14 +220,14 @@ func (c *Controller) addSubscription(sub *Subscription) { } func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) { + defer c.exclusiveAccess.RUnlock() <-it.Done - c.readLock.RUnlock() } // Maintain runs the Maintain method on the storage. func (c *Controller) Maintain(ctx context.Context) error { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return nil @@ -241,8 +238,8 @@ func (c *Controller) Maintain(ctx context.Context) error { // MaintainThorough runs the MaintainThorough method on the storage. func (c *Controller) MaintainThorough(ctx context.Context) error { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return nil @@ -253,8 +250,8 @@ func (c *Controller) MaintainThorough(ctx context.Context) error { // MaintainRecordStates runs the record state lifecycle maintenance on the storage. func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return nil @@ -265,11 +262,8 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor // Shutdown shuts down the storage. func (c *Controller) Shutdown() error { - // acquire full locks - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() return c.storage.Shutdown() } diff --git a/database/hook.go b/database/hook.go index e6649b2..43a1eaf 100644 --- a/database/hook.go +++ b/database/hook.go @@ -35,10 +35,8 @@ func RegisterHook(q *query.Query, hook Hook) (*RegisteredHook, error) { return nil, err } - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() rh := &RegisteredHook{ q: q, @@ -55,10 +53,8 @@ func (h *RegisteredHook) Cancel() error { return err } - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() for key, hook := range c.hooks { if hook.q == h.q { diff --git a/database/subscription.go b/database/subscription.go index ffe516a..923b7f8 100644 --- a/database/subscription.go +++ b/database/subscription.go @@ -22,10 +22,8 @@ func (s *Subscription) Cancel() error { return err } - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() if s.canceled { return nil