Implement review

This commit is contained in:
Daniel 2020-04-10 10:42:01 +02:00
parent e0f96d5188
commit 0ee19298fa
9 changed files with 73 additions and 73 deletions

View file

@ -138,25 +138,29 @@ func (c *Controller) Put(r record.Record) (err error) {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (c *Controller) PutMany() (batch chan record.Record, err chan error) { func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
c.writeLock.RLock() c.writeLock.RLock()
defer c.writeLock.RUnlock() defer c.writeLock.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
batch = make(chan record.Record) errs := make(chan error, 1)
err = make(chan error, 1) errs <- ErrShuttingDown
err <- ErrShuttingDown return make(chan record.Record), errs
return
} }
if c.ReadOnly() { if c.ReadOnly() {
batch = make(chan record.Record) errs := make(chan error, 1)
err = make(chan error, 1) errs <- ErrReadOnly
err <- ErrReadOnly return make(chan record.Record), errs
return
} }
return c.storage.PutMany() if batcher, ok := c.storage.(storage.Batcher); ok {
return batcher.PutMany()
}
errs := make(chan error, 1)
errs <- ErrNotImplemented
return make(chan record.Record), errs
} }
// Query executes the given query on the database. // Query executes the given query on the database.

View file

@ -10,4 +10,5 @@ var (
ErrPermissionDenied = errors.New("access to database record denied") ErrPermissionDenied = errors.New("access to database record denied")
ErrReadOnly = errors.New("database is read only") ErrReadOnly = errors.New("database is read only")
ErrShuttingDown = errors.New("database system is shutting down") ErrShuttingDown = errors.New("database system is shutting down")
ErrNotImplemented = errors.New("not implemented by this storage")
) )

View file

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/tevino/abool"
"github.com/bluele/gcache" "github.com/bluele/gcache"
"github.com/safing/portbase/database/accessor" "github.com/safing/portbase/database/accessor"
@ -245,16 +247,18 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
} }
// start database access // start database access
dbBatch, errCh := db.PutMany() dbBatch, errs := db.PutMany()
finished := abool.New()
var internalErr error
// interface options proxy // interface options proxy
go func() { go func() {
defer close(dbBatch) // signify that we are finished
for { for {
select { select {
case r := <-interfaceBatch: case r := <-interfaceBatch:
// finished? // finished?
if r == nil { if r == nil {
close(dbBatch) // signify that we are finished
return return
} }
// apply options // apply options
@ -263,27 +267,47 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
dbBatch <- r dbBatch <- r
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
// bail out // bail out
errCh <- errors.New("timeout: putmany unused for too long") internalErr = errors.New("timeout: putmany unused for too long")
close(dbBatch) // signify that we are finished finished.Set()
return return
} }
} }
}() }()
return func(r record.Record) error { return func(r record.Record) error {
if r == nil { // finished?
interfaceBatch <- nil // signify that we are finished if finished.IsSet() {
// do not close, as this fn could be called again with nil. // check for internal error
return <-errCh if internalErr != nil {
return internalErr
}
// check for previous error
select {
case err := <-errs:
return err
default:
return errors.New("batch is closed")
}
} }
// finish?
if r == nil {
finished.Set()
interfaceBatch <- nil // signify that we are finished
// do not close, as this fn could be called again with nil.
return <-errs
}
// check record scope
if r.DatabaseName() != dbName { if r.DatabaseName() != dbName {
return errors.New("record out of database scope") return errors.New("record out of database scope")
} }
// submit
select { select {
case interfaceBatch <- r: case interfaceBatch <- r:
return nil return nil
case err := <-errCh: case err := <-errs:
return err return err
} }
} }

View file

@ -94,15 +94,6 @@ func (b *Badger) Put(r record.Record) error {
return err return err
} }
// PutMany stores many records in the database.
func (b *Badger) PutMany() (batch chan record.Record, err chan error) {
// TODO: implement PutMany
batch = make(chan record.Record)
err = make(chan error, 1)
err <- errors.New("not implemented")
return
}
// Delete deletes a record from the database. // Delete deletes a record from the database.
func (b *Badger) Delete(key string) error { func (b *Badger) Delete(key string) error {
return b.db.Update(func(txn *badger.Txn) error { return b.db.Update(func(txn *badger.Txn) error {

View file

@ -106,20 +106,14 @@ func (b *BBolt) Put(r record.Record) error {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (b *BBolt) PutMany() (batch chan record.Record, errCh chan error) { func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) {
batch = make(chan record.Record, 100) batch := make(chan record.Record, 100)
errCh = make(chan error, 1) errs := make(chan error, 1)
go func() { go func() {
err := b.db.Batch(func(tx *bbolt.Tx) error { err := b.db.Batch(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName) bucket := tx.Bucket(bucketName)
for { for r := range batch {
r := <-batch
// finished?
if r == nil {
return nil
}
// marshal // marshal
data, txErr := r.MarshalRecord(r) data, txErr := r.MarshalRecord(r)
if txErr != nil { if txErr != nil {
@ -132,11 +126,12 @@ func (b *BBolt) PutMany() (batch chan record.Record, errCh chan error) {
return txErr return txErr
} }
} }
return nil
}) })
errCh <- err errs <- err
}() }()
return batch, errCh return batch, errs
} }
// Delete deletes a record from the database. // Delete deletes a record from the database.

View file

@ -131,15 +131,6 @@ func (fst *FSTree) Put(r record.Record) error {
return nil return nil
} }
// PutMany stores many records in the database.
func (fst *FSTree) PutMany() (batch chan record.Record, err chan error) {
// TODO: implement PutMany
batch = make(chan record.Record)
err = make(chan error, 1)
err <- errors.New("not implemented")
return
}
// Delete deletes a record from the database. // Delete deletes a record from the database.
func (fst *FSTree) Delete(key string) error { func (fst *FSTree) Delete(key string) error {
dstPath, err := fst.buildFilePath(key, true) dstPath, err := fst.buildFilePath(key, true)

View file

@ -53,30 +53,24 @@ func (hm *HashMap) Put(r record.Record) error {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (hm *HashMap) PutMany() (batch chan record.Record, err chan error) { func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) {
hm.dbLock.Lock() hm.dbLock.Lock()
defer hm.dbLock.Unlock() defer hm.dbLock.Unlock()
// we could lock for every record, but we want to have the same behaviour // we could lock for every record, but we want to have the same behaviour
// as the other storage backends, especially for testing. // as the other storage backends, especially for testing.
batch = make(chan record.Record, 100) batch := make(chan record.Record, 100)
err = make(chan error, 1) errs := make(chan error, 1)
// start handler // start handler
go func() { go func() {
for { for r := range batch {
r := <-batch
// finished?
if r == nil {
err <- nil
return
}
// put
hm.db[r.DatabaseKey()] = r hm.db[r.DatabaseKey()] = r
} }
errs <- nil
}() }()
return return batch, errs
} }
// Delete deletes a record from the database. // Delete deletes a record from the database.

View file

@ -10,7 +10,6 @@ import (
type Interface interface { type Interface interface {
Get(key string) (record.Record, error) Get(key string) (record.Record, error)
Put(m record.Record) error Put(m record.Record) error
PutMany() (batch chan record.Record, err chan error)
Delete(key string) error Delete(key string) error
Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error)
@ -20,3 +19,8 @@ type Interface interface {
MaintainThorough() error MaintainThorough() error
Shutdown() error Shutdown() error
} }
// Batcher defines the database storage API for backends that support batch operations.
type Batcher interface {
PutMany() (batch chan<- record.Record, errs <-chan error)
}

View file

@ -41,23 +41,19 @@ func (s *Sinkhole) Put(m record.Record) error {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (s *Sinkhole) PutMany() (batch chan record.Record, err chan error) { func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) {
batch = make(chan record.Record, 100) batch := make(chan record.Record, 100)
err = make(chan error, 1) errs := make(chan error, 1)
// start handler // start handler
go func() { go func() {
for { for range batch {
r := <-batch // nom, nom, nom
// finished?
if r == nil {
err <- nil
return
}
} }
errs <- nil
}() }()
return return batch, errs
} }
// Delete deletes a record from the database. // Delete deletes a record from the database.