diff --git a/database/controller.go b/database/controller.go index 918112b..a9f9d48 100644 --- a/database/controller.go +++ b/database/controller.go @@ -138,25 +138,29 @@ func (c *Controller) Put(r record.Record) (err error) { } // PutMany stores many records in the database. -func (c *Controller) PutMany() (batch chan record.Record, err chan error) { +func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { c.writeLock.RLock() defer c.writeLock.RUnlock() if shuttingDown.IsSet() { - batch = make(chan record.Record) - err = make(chan error, 1) - err <- ErrShuttingDown - return + errs := make(chan error, 1) + errs <- ErrShuttingDown + return make(chan record.Record), errs } if c.ReadOnly() { - batch = make(chan record.Record) - err = make(chan error, 1) - err <- ErrReadOnly - return + errs := make(chan error, 1) + errs <- ErrReadOnly + return make(chan record.Record), errs } - return c.storage.PutMany() + if batcher, ok := c.storage.(storage.Batcher); ok { + return batcher.PutMany() + } + + errs := make(chan error, 1) + errs <- ErrNotImplemented + return make(chan record.Record), errs } // Query executes the given query on the database. diff --git a/database/errors.go b/database/errors.go index cae280d..9d22b53 100644 --- a/database/errors.go +++ b/database/errors.go @@ -10,4 +10,5 @@ var ( ErrPermissionDenied = errors.New("access to database record denied") ErrReadOnly = errors.New("database is read only") ErrShuttingDown = errors.New("database system is shutting down") + ErrNotImplemented = errors.New("not implemented by this storage") ) diff --git a/database/interface.go b/database/interface.go index 87beccd..a939960 100644 --- a/database/interface.go +++ b/database/interface.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/tevino/abool" + "github.com/bluele/gcache" "github.com/safing/portbase/database/accessor" @@ -245,16 +247,18 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) { } // start database access - dbBatch, errCh := db.PutMany() + dbBatch, errs := db.PutMany() + finished := abool.New() + var internalErr error // interface options proxy go func() { + defer close(dbBatch) // signify that we are finished for { select { case r := <-interfaceBatch: // finished? if r == nil { - close(dbBatch) // signify that we are finished return } // apply options @@ -263,27 +267,47 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) { dbBatch <- r case <-time.After(1 * time.Second): // bail out - errCh <- errors.New("timeout: putmany unused for too long") - close(dbBatch) // signify that we are finished + internalErr = errors.New("timeout: putmany unused for too long") + finished.Set() return } } }() return func(r record.Record) error { - if r == nil { - interfaceBatch <- nil // signify that we are finished - // do not close, as this fn could be called again with nil. - return <-errCh + // finished? + if finished.IsSet() { + // check for internal error + if internalErr != nil { + return internalErr + } + // check for previous error + select { + case err := <-errs: + return err + default: + return errors.New("batch is closed") + } } + // finish? + if r == nil { + finished.Set() + interfaceBatch <- nil // signify that we are finished + // do not close, as this fn could be called again with nil. + return <-errs + } + + // check record scope if r.DatabaseName() != dbName { return errors.New("record out of database scope") } + + // submit select { case interfaceBatch <- r: return nil - case err := <-errCh: + case err := <-errs: return err } } diff --git a/database/storage/badger/badger.go b/database/storage/badger/badger.go index 8f2c9f3..8f473cd 100644 --- a/database/storage/badger/badger.go +++ b/database/storage/badger/badger.go @@ -94,15 +94,6 @@ func (b *Badger) Put(r record.Record) error { return err } -// PutMany stores many records in the database. -func (b *Badger) PutMany() (batch chan record.Record, err chan error) { - // TODO: implement PutMany - batch = make(chan record.Record) - err = make(chan error, 1) - err <- errors.New("not implemented") - return -} - // Delete deletes a record from the database. func (b *Badger) Delete(key string) error { return b.db.Update(func(txn *badger.Txn) error { diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 832c157..bb049a9 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -106,20 +106,14 @@ func (b *BBolt) Put(r record.Record) error { } // PutMany stores many records in the database. -func (b *BBolt) PutMany() (batch chan record.Record, errCh chan error) { - batch = make(chan record.Record, 100) - errCh = make(chan error, 1) +func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) { + batch := make(chan record.Record, 100) + errs := make(chan error, 1) go func() { err := b.db.Batch(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bucketName) - for { - r := <-batch - // finished? - if r == nil { - return nil - } - + for r := range batch { // marshal data, txErr := r.MarshalRecord(r) if txErr != nil { @@ -132,11 +126,12 @@ func (b *BBolt) PutMany() (batch chan record.Record, errCh chan error) { return txErr } } + return nil }) - errCh <- err + errs <- err }() - return batch, errCh + return batch, errs } // Delete deletes a record from the database. diff --git a/database/storage/fstree/fstree.go b/database/storage/fstree/fstree.go index 654695f..fa160de 100644 --- a/database/storage/fstree/fstree.go +++ b/database/storage/fstree/fstree.go @@ -131,15 +131,6 @@ func (fst *FSTree) Put(r record.Record) error { return nil } -// PutMany stores many records in the database. -func (fst *FSTree) PutMany() (batch chan record.Record, err chan error) { - // TODO: implement PutMany - batch = make(chan record.Record) - err = make(chan error, 1) - err <- errors.New("not implemented") - return -} - // Delete deletes a record from the database. func (fst *FSTree) Delete(key string) error { dstPath, err := fst.buildFilePath(key, true) diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 8e1103a..d9fa3df 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -53,30 +53,24 @@ func (hm *HashMap) Put(r record.Record) error { } // PutMany stores many records in the database. -func (hm *HashMap) PutMany() (batch chan record.Record, err chan error) { +func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) { hm.dbLock.Lock() defer hm.dbLock.Unlock() // we could lock for every record, but we want to have the same behaviour // as the other storage backends, especially for testing. - batch = make(chan record.Record, 100) - err = make(chan error, 1) + batch := make(chan record.Record, 100) + errs := make(chan error, 1) // start handler go func() { - for { - r := <-batch - // finished? - if r == nil { - err <- nil - return - } - // put + for r := range batch { hm.db[r.DatabaseKey()] = r } + errs <- nil }() - return + return batch, errs } // Delete deletes a record from the database. diff --git a/database/storage/interface.go b/database/storage/interface.go index 3f0642a..484f852 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -10,7 +10,6 @@ import ( type Interface interface { Get(key string) (record.Record, error) Put(m record.Record) error - PutMany() (batch chan record.Record, err chan error) Delete(key string) error Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) @@ -20,3 +19,8 @@ type Interface interface { MaintainThorough() error Shutdown() error } + +// Batcher defines the database storage API for backends that support batch operations. +type Batcher interface { + PutMany() (batch chan<- record.Record, errs <-chan error) +} diff --git a/database/storage/sinkhole/sinkhole.go b/database/storage/sinkhole/sinkhole.go index 14bbfb0..e26eaa5 100644 --- a/database/storage/sinkhole/sinkhole.go +++ b/database/storage/sinkhole/sinkhole.go @@ -41,23 +41,19 @@ func (s *Sinkhole) Put(m record.Record) error { } // PutMany stores many records in the database. -func (s *Sinkhole) PutMany() (batch chan record.Record, err chan error) { - batch = make(chan record.Record, 100) - err = make(chan error, 1) +func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) { + batch := make(chan record.Record, 100) + errs := make(chan error, 1) // start handler go func() { - for { - r := <-batch - // finished? - if r == nil { - err <- nil - return - } + for range batch { + // nom, nom, nom } + errs <- nil }() - return + return batch, errs } // Delete deletes a record from the database.