diff --git a/database/controller.go b/database/controller.go index 51699f4..a9f9d48 100644 --- a/database/controller.go +++ b/database/controller.go @@ -137,6 +137,32 @@ func (c *Controller) Put(r record.Record) (err error) { return nil } +// PutMany stores many records in the database. +func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { + c.writeLock.RLock() + defer c.writeLock.RUnlock() + + if shuttingDown.IsSet() { + errs := make(chan error, 1) + errs <- ErrShuttingDown + return make(chan record.Record), errs + } + + if c.ReadOnly() { + errs := make(chan error, 1) + errs <- ErrReadOnly + return make(chan record.Record), errs + } + + if batcher, ok := c.storage.(storage.Batcher); ok { + return batcher.PutMany() + } + + errs := make(chan error, 1) + errs <- ErrNotImplemented + return make(chan record.Record), errs +} + // Query executes the given query on the database. func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { c.readLock.RLock() diff --git a/database/database_test.go b/database/database_test.go index f2e2920..6367648 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -10,10 +10,13 @@ import ( "testing" "time" + "github.com/safing/portbase/database/record" + q "github.com/safing/portbase/database/query" _ "github.com/safing/portbase/database/storage/badger" _ "github.com/safing/portbase/database/storage/bbolt" _ "github.com/safing/portbase/database/storage/fstree" + _ "github.com/safing/portbase/database/storage/hashmap" ) func makeKey(dbName, key string) string { @@ -39,7 +42,10 @@ func testDatabase(t *testing.T, storageType string) { } // interface - db := NewInterface(nil) + db := NewInterface(&Options{ + Local: true, + Internal: true, + }) // sub sub, err := db.Subscribe(q.New(dbName).MustBeValid()) @@ -107,6 +113,18 @@ func testDatabase(t *testing.T, storageType string) { t.Fatalf("expected two records, got %d", cnt) } + switch storageType { + case "bbolt", "hashmap": + batchPut := db.PutMany(dbName) + records := []record.Record{A, B, C, nil} // nil is to signify finish + for _, r := range records { + err = batchPut(r) + if err != nil { + t.Fatal(err) + } + } + } + err = hook.Cancel() if err != nil { t.Fatal(err) @@ -142,6 +160,7 @@ func TestDatabaseSystem(t *testing.T) { testDatabase(t, "badger") testDatabase(t, "bbolt") testDatabase(t, "fstree") + testDatabase(t, "hashmap") err = MaintainRecordStates() if err != nil { diff --git a/database/errors.go b/database/errors.go index cae280d..9d22b53 100644 --- a/database/errors.go +++ b/database/errors.go @@ -10,4 +10,5 @@ var ( ErrPermissionDenied = errors.New("access to database record denied") ErrReadOnly = errors.New("database is read only") ErrShuttingDown = errors.New("database system is shutting down") + ErrNotImplemented = errors.New("not implemented by this storage") ) diff --git a/database/interface.go b/database/interface.go index 9e9b9f7..a939960 100644 --- a/database/interface.go +++ b/database/interface.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/tevino/abool" + "github.com/bluele/gcache" "github.com/safing/portbase/database/accessor" @@ -170,10 +172,19 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{}) } // Put saves a record to the database. -func (i *Interface) Put(r record.Record) error { - _, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) - if err != nil && err != ErrNotFound { - return err +func (i *Interface) Put(r record.Record) (err error) { + // get record or only database + var db *Controller + if !i.options.Internal || !i.options.Local { + _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) + if err != nil && err != ErrNotFound { + return err + } + } else { + db, err = getController(r.DatabaseKey()) + if err != nil { + return err + } } r.Lock() @@ -186,24 +197,122 @@ func (i *Interface) Put(r record.Record) error { } // PutNew saves a record to the database as a new record (ie. with new timestamps). -func (i *Interface) PutNew(r record.Record) error { - _, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) - if err != nil && err != ErrNotFound { - return err +func (i *Interface) PutNew(r record.Record) (err error) { + // get record or only database + var db *Controller + if !i.options.Internal || !i.options.Local { + _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) + if err != nil && err != ErrNotFound { + return err + } + } else { + db, err = getController(r.DatabaseKey()) + if err != nil { + return err + } } r.Lock() defer r.Unlock() - if r.Meta() == nil { - r.CreateMeta() + if r.Meta() != nil { + r.Meta().Reset() } - r.Meta().Reset() i.options.Apply(r) i.updateCache(r) return db.Put(r) } +// PutMany stores many records in the database. Warning: This is nearly a direct database access and omits many things: +// - Record locking +// - Hooks +// - Subscriptions +// - Caching +func (i *Interface) PutMany(dbName string) (put func(record.Record) error) { + interfaceBatch := make(chan record.Record, 100) + + // permission check + if !i.options.Internal || !i.options.Local { + return func(r record.Record) error { + return ErrPermissionDenied + } + } + + // get database + db, err := getController(dbName) + if err != nil { + return func(r record.Record) error { + return err + } + } + + // start database access + dbBatch, errs := db.PutMany() + finished := abool.New() + var internalErr error + + // interface options proxy + go func() { + defer close(dbBatch) // signify that we are finished + for { + select { + case r := <-interfaceBatch: + // finished? + if r == nil { + return + } + // apply options + i.options.Apply(r) + // pass along + dbBatch <- r + case <-time.After(1 * time.Second): + // bail out + internalErr = errors.New("timeout: putmany unused for too long") + finished.Set() + return + } + } + }() + + return func(r record.Record) error { + // finished? + if finished.IsSet() { + // check for internal error + if internalErr != nil { + return internalErr + } + // check for previous error + select { + case err := <-errs: + return err + default: + return errors.New("batch is closed") + } + } + + // finish? + if r == nil { + finished.Set() + interfaceBatch <- nil // signify that we are finished + // do not close, as this fn could be called again with nil. + return <-errs + } + + // check record scope + if r.DatabaseName() != dbName { + return errors.New("record out of database scope") + } + + // submit + select { + case interfaceBatch <- r: + return nil + case err := <-errs: + return err + } + } +} + // SetAbsoluteExpiry sets an absolute record expiry. func (i *Interface) SetAbsoluteExpiry(key string, time int64) error { r, db, err := i.getRecord(getDBFromKey, key, true, true) diff --git a/database/record/record_test.go b/database/record/record_test.go index f5e315d..5912d67 100644 --- a/database/record/record_test.go +++ b/database/record/record_test.go @@ -1,16 +1,10 @@ package record -import "sync" +import ( + "sync" +) type TestRecord struct { Base - lock sync.Mutex -} - -func (tm *TestRecord) Lock() { - tm.lock.Lock() -} - -func (tm *TestRecord) Unlock() { - tm.lock.Unlock() + sync.Mutex } diff --git a/database/record/wrapper.go b/database/record/wrapper.go index a456edd..b89a733 100644 --- a/database/record/wrapper.go +++ b/database/record/wrapper.go @@ -37,27 +37,19 @@ func NewRawWrapper(database, key string, data []byte) (*Wrapper, error) { offset += n newMeta := &Meta{} - if len(metaSection) == 34 && metaSection[4] == 0 { - // TODO: remove in 2020 - // backward compatibility: - // format would byte shift and populate metaSection[4] with value > 0 (would naturally populate >0 at 07.02.2106 07:28:15) - // this must be gencode without format - _, err = newMeta.GenCodeUnmarshal(metaSection) - if err != nil { - return nil, fmt.Errorf("could not unmarshal meta section: %s", err) - } - } else { - _, err = dsd.Load(metaSection, newMeta) - if err != nil { - return nil, fmt.Errorf("could not unmarshal meta section: %s", err) - } + _, err = dsd.Load(metaSection, newMeta) + if err != nil { + return nil, fmt.Errorf("could not unmarshal meta section: %s", err) } - format, n, err := varint.Unpack8(data[offset:]) - if err != nil { - return nil, fmt.Errorf("could not get dsd format: %s", err) + var format uint8 = dsd.NONE + if !newMeta.IsDeleted() { + format, n, err = varint.Unpack8(data[offset:]) + if err != nil { + return nil, fmt.Errorf("could not get dsd format: %s", err) + } + offset += n } - offset += n return &Wrapper{ Base{ diff --git a/database/record/wrapper_test.go b/database/record/wrapper_test.go index 460e6e9..de4d5ae 100644 --- a/database/record/wrapper_test.go +++ b/database/record/wrapper_test.go @@ -2,10 +2,7 @@ package record import ( "bytes" - "errors" "testing" - - "github.com/safing/portbase/container" ) func TestWrapper(t *testing.T) { @@ -54,43 +51,4 @@ func TestWrapper(t *testing.T) { if !bytes.Equal(testData, wrapper2.Data) { t.Error("marshal mismatch") } - - // test new format - oldRaw, err := oldWrapperMarshalRecord(wrapper, wrapper) - if err != nil { - t.Fatal(err) - } - - wrapper3, err := NewRawWrapper("test", "a", oldRaw) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(testData, wrapper3.Data) { - t.Error("marshal mismatch") - } -} - -func oldWrapperMarshalRecord(w *Wrapper, r Record) ([]byte, error) { - if w.Meta() == nil { - return nil, errors.New("missing meta") - } - - // version - c := container.New([]byte{1}) - - // meta - metaSection, err := w.meta.GenCodeMarshal(nil) - if err != nil { - return nil, err - } - c.AppendAsBlock(metaSection) - - // data - dataSection, err := w.Marshal(r, JSON) - if err != nil { - return nil, err - } - c.Append(dataSection) - - return c.CompileData(), nil } diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index d37bf60..bb049a9 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -105,6 +105,35 @@ func (b *BBolt) Put(r record.Record) error { return nil } +// PutMany stores many records in the database. +func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) { + batch := make(chan record.Record, 100) + errs := make(chan error, 1) + + go func() { + err := b.db.Batch(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(bucketName) + for r := range batch { + // marshal + data, txErr := r.MarshalRecord(r) + if txErr != nil { + return txErr + } + + // put + txErr = bucket.Put([]byte(r.DatabaseKey()), data) + if txErr != nil { + return txErr + } + } + return nil + }) + errs <- err + }() + + return batch, errs +} + // Delete deletes a record from the database. func (b *BBolt) Delete(key string) error { err := b.db.Update(func(tx *bbolt.Tx) error { diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index c6bb65d..d9fa3df 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -52,6 +52,27 @@ func (hm *HashMap) Put(r record.Record) error { return nil } +// PutMany stores many records in the database. +func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) { + hm.dbLock.Lock() + defer hm.dbLock.Unlock() + // we could lock for every record, but we want to have the same behaviour + // as the other storage backends, especially for testing. + + batch := make(chan record.Record, 100) + errs := make(chan error, 1) + + // start handler + go func() { + for r := range batch { + hm.db[r.DatabaseKey()] = r + } + errs <- nil + }() + + return batch, errs +} + // Delete deletes a record from the database. func (hm *HashMap) Delete(key string) error { hm.dbLock.Lock() diff --git a/database/storage/injectbase.go b/database/storage/injectbase.go index 316c8d6..2596820 100644 --- a/database/storage/injectbase.go +++ b/database/storage/injectbase.go @@ -25,6 +25,14 @@ func (i *InjectBase) Put(m record.Record) error { return errNotImplemented } +// PutMany stores many records in the database. +func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) { + batch = make(chan record.Record) + err = make(chan error, 1) + err <- errNotImplemented + return +} + // Delete deletes a record from the database. func (i *InjectBase) Delete(key string) error { return errNotImplemented diff --git a/database/storage/interface.go b/database/storage/interface.go index 4a6ce1e..484f852 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -19,3 +19,8 @@ type Interface interface { MaintainThorough() error Shutdown() error } + +// Batcher defines the database storage API for backends that support batch operations. +type Batcher interface { + PutMany() (batch chan<- record.Record, errs <-chan error) +} diff --git a/database/storage/sinkhole/sinkhole.go b/database/storage/sinkhole/sinkhole.go index 9c51d29..e26eaa5 100644 --- a/database/storage/sinkhole/sinkhole.go +++ b/database/storage/sinkhole/sinkhole.go @@ -40,6 +40,22 @@ func (s *Sinkhole) Put(m record.Record) error { return nil } +// PutMany stores many records in the database. +func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) { + batch := make(chan record.Record, 100) + errs := make(chan error, 1) + + // start handler + go func() { + for range batch { + // nom, nom, nom + } + errs <- nil + }() + + return batch, errs +} + // Delete deletes a record from the database. func (s *Sinkhole) Delete(key string) error { return nil