mirror of
https://github.com/safing/portbase
synced 2025-09-01 18:19:57 +00:00
commit
ff1610b731
12 changed files with 260 additions and 82 deletions
|
@ -137,6 +137,32 @@ func (c *Controller) Put(r record.Record) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
|
||||
c.writeLock.RLock()
|
||||
defer c.writeLock.RUnlock()
|
||||
|
||||
if shuttingDown.IsSet() {
|
||||
errs := make(chan error, 1)
|
||||
errs <- ErrShuttingDown
|
||||
return make(chan record.Record), errs
|
||||
}
|
||||
|
||||
if c.ReadOnly() {
|
||||
errs := make(chan error, 1)
|
||||
errs <- ErrReadOnly
|
||||
return make(chan record.Record), errs
|
||||
}
|
||||
|
||||
if batcher, ok := c.storage.(storage.Batcher); ok {
|
||||
return batcher.PutMany()
|
||||
}
|
||||
|
||||
errs := make(chan error, 1)
|
||||
errs <- ErrNotImplemented
|
||||
return make(chan record.Record), errs
|
||||
}
|
||||
|
||||
// Query executes the given query on the database.
|
||||
func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
||||
c.readLock.RLock()
|
||||
|
|
|
@ -10,10 +10,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/safing/portbase/database/record"
|
||||
|
||||
q "github.com/safing/portbase/database/query"
|
||||
_ "github.com/safing/portbase/database/storage/badger"
|
||||
_ "github.com/safing/portbase/database/storage/bbolt"
|
||||
_ "github.com/safing/portbase/database/storage/fstree"
|
||||
_ "github.com/safing/portbase/database/storage/hashmap"
|
||||
)
|
||||
|
||||
func makeKey(dbName, key string) string {
|
||||
|
@ -39,7 +42,10 @@ func testDatabase(t *testing.T, storageType string) {
|
|||
}
|
||||
|
||||
// interface
|
||||
db := NewInterface(nil)
|
||||
db := NewInterface(&Options{
|
||||
Local: true,
|
||||
Internal: true,
|
||||
})
|
||||
|
||||
// sub
|
||||
sub, err := db.Subscribe(q.New(dbName).MustBeValid())
|
||||
|
@ -107,6 +113,18 @@ func testDatabase(t *testing.T, storageType string) {
|
|||
t.Fatalf("expected two records, got %d", cnt)
|
||||
}
|
||||
|
||||
switch storageType {
|
||||
case "bbolt", "hashmap":
|
||||
batchPut := db.PutMany(dbName)
|
||||
records := []record.Record{A, B, C, nil} // nil is to signify finish
|
||||
for _, r := range records {
|
||||
err = batchPut(r)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = hook.Cancel()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -142,6 +160,7 @@ func TestDatabaseSystem(t *testing.T) {
|
|||
testDatabase(t, "badger")
|
||||
testDatabase(t, "bbolt")
|
||||
testDatabase(t, "fstree")
|
||||
testDatabase(t, "hashmap")
|
||||
|
||||
err = MaintainRecordStates()
|
||||
if err != nil {
|
||||
|
|
|
@ -10,4 +10,5 @@ var (
|
|||
ErrPermissionDenied = errors.New("access to database record denied")
|
||||
ErrReadOnly = errors.New("database is read only")
|
||||
ErrShuttingDown = errors.New("database system is shutting down")
|
||||
ErrNotImplemented = errors.New("not implemented by this storage")
|
||||
)
|
||||
|
|
|
@ -5,6 +5,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/tevino/abool"
|
||||
|
||||
"github.com/bluele/gcache"
|
||||
|
||||
"github.com/safing/portbase/database/accessor"
|
||||
|
@ -170,10 +172,19 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{})
|
|||
}
|
||||
|
||||
// Put saves a record to the database.
|
||||
func (i *Interface) Put(r record.Record) error {
|
||||
_, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
func (i *Interface) Put(r record.Record) (err error) {
|
||||
// get record or only database
|
||||
var db *Controller
|
||||
if !i.options.Internal || !i.options.Local {
|
||||
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
db, err = getController(r.DatabaseKey())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
|
@ -186,24 +197,122 @@ func (i *Interface) Put(r record.Record) error {
|
|||
}
|
||||
|
||||
// PutNew saves a record to the database as a new record (ie. with new timestamps).
|
||||
func (i *Interface) PutNew(r record.Record) error {
|
||||
_, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
func (i *Interface) PutNew(r record.Record) (err error) {
|
||||
// get record or only database
|
||||
var db *Controller
|
||||
if !i.options.Internal || !i.options.Local {
|
||||
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
db, err = getController(r.DatabaseKey())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
if r.Meta() == nil {
|
||||
r.CreateMeta()
|
||||
if r.Meta() != nil {
|
||||
r.Meta().Reset()
|
||||
}
|
||||
r.Meta().Reset()
|
||||
i.options.Apply(r)
|
||||
i.updateCache(r)
|
||||
return db.Put(r)
|
||||
}
|
||||
|
||||
// PutMany stores many records in the database. Warning: This is nearly a direct database access and omits many things:
|
||||
// - Record locking
|
||||
// - Hooks
|
||||
// - Subscriptions
|
||||
// - Caching
|
||||
func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
|
||||
interfaceBatch := make(chan record.Record, 100)
|
||||
|
||||
// permission check
|
||||
if !i.options.Internal || !i.options.Local {
|
||||
return func(r record.Record) error {
|
||||
return ErrPermissionDenied
|
||||
}
|
||||
}
|
||||
|
||||
// get database
|
||||
db, err := getController(dbName)
|
||||
if err != nil {
|
||||
return func(r record.Record) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// start database access
|
||||
dbBatch, errs := db.PutMany()
|
||||
finished := abool.New()
|
||||
var internalErr error
|
||||
|
||||
// interface options proxy
|
||||
go func() {
|
||||
defer close(dbBatch) // signify that we are finished
|
||||
for {
|
||||
select {
|
||||
case r := <-interfaceBatch:
|
||||
// finished?
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
// apply options
|
||||
i.options.Apply(r)
|
||||
// pass along
|
||||
dbBatch <- r
|
||||
case <-time.After(1 * time.Second):
|
||||
// bail out
|
||||
internalErr = errors.New("timeout: putmany unused for too long")
|
||||
finished.Set()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return func(r record.Record) error {
|
||||
// finished?
|
||||
if finished.IsSet() {
|
||||
// check for internal error
|
||||
if internalErr != nil {
|
||||
return internalErr
|
||||
}
|
||||
// check for previous error
|
||||
select {
|
||||
case err := <-errs:
|
||||
return err
|
||||
default:
|
||||
return errors.New("batch is closed")
|
||||
}
|
||||
}
|
||||
|
||||
// finish?
|
||||
if r == nil {
|
||||
finished.Set()
|
||||
interfaceBatch <- nil // signify that we are finished
|
||||
// do not close, as this fn could be called again with nil.
|
||||
return <-errs
|
||||
}
|
||||
|
||||
// check record scope
|
||||
if r.DatabaseName() != dbName {
|
||||
return errors.New("record out of database scope")
|
||||
}
|
||||
|
||||
// submit
|
||||
select {
|
||||
case interfaceBatch <- r:
|
||||
return nil
|
||||
case err := <-errs:
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetAbsoluteExpiry sets an absolute record expiry.
|
||||
func (i *Interface) SetAbsoluteExpiry(key string, time int64) error {
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
||||
|
|
|
@ -1,16 +1,10 @@
|
|||
package record
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type TestRecord struct {
|
||||
Base
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (tm *TestRecord) Lock() {
|
||||
tm.lock.Lock()
|
||||
}
|
||||
|
||||
func (tm *TestRecord) Unlock() {
|
||||
tm.lock.Unlock()
|
||||
sync.Mutex
|
||||
}
|
||||
|
|
|
@ -37,27 +37,19 @@ func NewRawWrapper(database, key string, data []byte) (*Wrapper, error) {
|
|||
offset += n
|
||||
|
||||
newMeta := &Meta{}
|
||||
if len(metaSection) == 34 && metaSection[4] == 0 {
|
||||
// TODO: remove in 2020
|
||||
// backward compatibility:
|
||||
// format would byte shift and populate metaSection[4] with value > 0 (would naturally populate >0 at 07.02.2106 07:28:15)
|
||||
// this must be gencode without format
|
||||
_, err = newMeta.GenCodeUnmarshal(metaSection)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal meta section: %s", err)
|
||||
}
|
||||
} else {
|
||||
_, err = dsd.Load(metaSection, newMeta)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal meta section: %s", err)
|
||||
}
|
||||
_, err = dsd.Load(metaSection, newMeta)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal meta section: %s", err)
|
||||
}
|
||||
|
||||
format, n, err := varint.Unpack8(data[offset:])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get dsd format: %s", err)
|
||||
var format uint8 = dsd.NONE
|
||||
if !newMeta.IsDeleted() {
|
||||
format, n, err = varint.Unpack8(data[offset:])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get dsd format: %s", err)
|
||||
}
|
||||
offset += n
|
||||
}
|
||||
offset += n
|
||||
|
||||
return &Wrapper{
|
||||
Base{
|
||||
|
|
|
@ -2,10 +2,7 @@ package record
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/safing/portbase/container"
|
||||
)
|
||||
|
||||
func TestWrapper(t *testing.T) {
|
||||
|
@ -54,43 +51,4 @@ func TestWrapper(t *testing.T) {
|
|||
if !bytes.Equal(testData, wrapper2.Data) {
|
||||
t.Error("marshal mismatch")
|
||||
}
|
||||
|
||||
// test new format
|
||||
oldRaw, err := oldWrapperMarshalRecord(wrapper, wrapper)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wrapper3, err := NewRawWrapper("test", "a", oldRaw)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(testData, wrapper3.Data) {
|
||||
t.Error("marshal mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
func oldWrapperMarshalRecord(w *Wrapper, r Record) ([]byte, error) {
|
||||
if w.Meta() == nil {
|
||||
return nil, errors.New("missing meta")
|
||||
}
|
||||
|
||||
// version
|
||||
c := container.New([]byte{1})
|
||||
|
||||
// meta
|
||||
metaSection, err := w.meta.GenCodeMarshal(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.AppendAsBlock(metaSection)
|
||||
|
||||
// data
|
||||
dataSection, err := w.Marshal(r, JSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Append(dataSection)
|
||||
|
||||
return c.CompileData(), nil
|
||||
}
|
||||
|
|
|
@ -105,6 +105,35 @@ func (b *BBolt) Put(r record.Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) {
|
||||
batch := make(chan record.Record, 100)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
err := b.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(bucketName)
|
||||
for r := range batch {
|
||||
// marshal
|
||||
data, txErr := r.MarshalRecord(r)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
|
||||
// put
|
||||
txErr = bucket.Put([]byte(r.DatabaseKey()), data)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
errs <- err
|
||||
}()
|
||||
|
||||
return batch, errs
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
func (b *BBolt) Delete(key string) error {
|
||||
err := b.db.Update(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -52,6 +52,27 @@ func (hm *HashMap) Put(r record.Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) {
|
||||
hm.dbLock.Lock()
|
||||
defer hm.dbLock.Unlock()
|
||||
// we could lock for every record, but we want to have the same behaviour
|
||||
// as the other storage backends, especially for testing.
|
||||
|
||||
batch := make(chan record.Record, 100)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
// start handler
|
||||
go func() {
|
||||
for r := range batch {
|
||||
hm.db[r.DatabaseKey()] = r
|
||||
}
|
||||
errs <- nil
|
||||
}()
|
||||
|
||||
return batch, errs
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
func (hm *HashMap) Delete(key string) error {
|
||||
hm.dbLock.Lock()
|
||||
|
|
|
@ -25,6 +25,14 @@ func (i *InjectBase) Put(m record.Record) error {
|
|||
return errNotImplemented
|
||||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) {
|
||||
batch = make(chan record.Record)
|
||||
err = make(chan error, 1)
|
||||
err <- errNotImplemented
|
||||
return
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
func (i *InjectBase) Delete(key string) error {
|
||||
return errNotImplemented
|
||||
|
|
|
@ -19,3 +19,8 @@ type Interface interface {
|
|||
MaintainThorough() error
|
||||
Shutdown() error
|
||||
}
|
||||
|
||||
// Batcher defines the database storage API for backends that support batch operations.
|
||||
type Batcher interface {
|
||||
PutMany() (batch chan<- record.Record, errs <-chan error)
|
||||
}
|
||||
|
|
|
@ -40,6 +40,22 @@ func (s *Sinkhole) Put(m record.Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) {
|
||||
batch := make(chan record.Record, 100)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
// start handler
|
||||
go func() {
|
||||
for range batch {
|
||||
// nom, nom, nom
|
||||
}
|
||||
errs <- nil
|
||||
}()
|
||||
|
||||
return batch, errs
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
func (s *Sinkhole) Delete(key string) error {
|
||||
return nil
|
||||
|
|
Loading…
Add table
Reference in a new issue