mirror of
https://github.com/safing/portbase
synced 2025-09-01 01:59:48 +00:00
Complete database MVP
This commit is contained in:
parent
2c70416873
commit
ac13b73d65
12 changed files with 160 additions and 480 deletions
|
@ -54,8 +54,8 @@ func (c *Controller) Get(key string) (record.Record, error) {
|
|||
|
||||
// process hooks
|
||||
for _, hook := range c.hooks {
|
||||
if hook.q.MatchesKey(key) {
|
||||
err := hook.hook.PreGet(key)
|
||||
if hook.h.UsesPreGet() && hook.q.MatchesKey(key) {
|
||||
err := hook.h.PreGet(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -76,8 +76,8 @@ func (c *Controller) Get(key string) (record.Record, error) {
|
|||
|
||||
// process hooks
|
||||
for _, hook := range c.hooks {
|
||||
if hook.q.Matches(r) {
|
||||
r, err = hook.hook.PostGet(r)
|
||||
if hook.h.UsesPostGet() && hook.q.Matches(r) {
|
||||
r, err = hook.h.PostGet(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -106,13 +106,13 @@ func (c *Controller) Put(r record.Record) (err error) {
|
|||
|
||||
// process hooks
|
||||
for _, hook := range c.hooks {
|
||||
if hook.q.Matches(r) {
|
||||
r, err = hook.hook.PrePut(r)
|
||||
if err != nil {
|
||||
return err
|
||||
if hook.h.UsesPrePut() && hook.q.Matches(r) {
|
||||
r, err = hook.h.PrePut(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if r.Meta() == nil {
|
||||
r.SetMeta(&record.Meta{})
|
||||
|
@ -127,10 +127,13 @@ func (c *Controller) Put(r record.Record) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
// process hooks
|
||||
for _, hook := range c.hooks {
|
||||
if hook.q.Matches(r) {
|
||||
hook.hook.PostPut(r)
|
||||
// process subscriptions
|
||||
for _, sub := range c.subscriptions {
|
||||
if sub.q.Matches(r) {
|
||||
select {
|
||||
case sub.Feed <- r:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,19 @@ func testDatabase(t *testing.T, storageType string) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// hook
|
||||
hook, err := RegisterHook(q.New(dbName).MustBeValid(), &HookBase{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// sub
|
||||
sub, err := Subscribe(q.New(dbName).MustBeValid())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// interface
|
||||
db := NewInterface(nil)
|
||||
|
||||
A := NewExample(makeKey(dbName, "A"), "Herbert", 411)
|
||||
|
@ -92,6 +105,15 @@ func testDatabase(t *testing.T, storageType string) {
|
|||
t.Fatal("expected two records")
|
||||
}
|
||||
|
||||
err = hook.Cancel()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = sub.Cancel()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestDatabaseSystem(t *testing.T) {
|
||||
|
@ -99,7 +121,7 @@ func TestDatabaseSystem(t *testing.T) {
|
|||
// panic after 10 seconds, to check for locks
|
||||
go func() {
|
||||
time.Sleep(10 * time.Second)
|
||||
fmt.Println("===== TAKING TOO LONG FOR SHUTDOWN - PRINTING STACK TRACES =====")
|
||||
fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====")
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
os.Exit(1)
|
||||
}()
|
||||
|
|
|
@ -15,39 +15,37 @@ type Hook interface {
|
|||
|
||||
UsesPrePut() bool
|
||||
PrePut(r record.Record) (record.Record, error)
|
||||
|
||||
UsesPostPut() bool
|
||||
PostPut(r record.Record)
|
||||
}
|
||||
|
||||
// RegisteredHook is a registered database hook.
|
||||
type RegisteredHook struct {
|
||||
q *query.Query
|
||||
hook Hook
|
||||
q *query.Query
|
||||
h Hook
|
||||
}
|
||||
|
||||
// RegisterHook registeres a hook for records matching the given query in the database.
|
||||
func RegisterHook(q *query.Query, hook Hook) error {
|
||||
func RegisterHook(q *query.Query, hook Hook) (*RegisteredHook, error) {
|
||||
_, err := q.Check()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := getController(q.DatabaseName())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.readLock.Lock()
|
||||
defer c.readLock.Lock()
|
||||
defer c.readLock.Unlock()
|
||||
c.writeLock.Lock()
|
||||
defer c.writeLock.Unlock()
|
||||
|
||||
c.hooks = append(c.hooks, &RegisteredHook{
|
||||
q: q,
|
||||
hook: hook,
|
||||
})
|
||||
return nil
|
||||
rh := &RegisteredHook{
|
||||
q: q,
|
||||
h: hook,
|
||||
}
|
||||
c.hooks = append(c.hooks, rh)
|
||||
return rh, nil
|
||||
}
|
||||
|
||||
// Cancel unhooks the hook.
|
||||
|
@ -58,7 +56,7 @@ func (h *RegisteredHook) Cancel() error {
|
|||
}
|
||||
|
||||
c.readLock.Lock()
|
||||
defer c.readLock.Lock()
|
||||
defer c.readLock.Unlock()
|
||||
c.writeLock.Lock()
|
||||
defer c.writeLock.Unlock()
|
||||
|
||||
|
|
|
@ -4,10 +4,25 @@ import (
|
|||
"github.com/Safing/portbase/database/record"
|
||||
)
|
||||
|
||||
// HookBase implements the Hook interface.
|
||||
// HookBase implements the Hook interface and provides dummy functions to reduce boilerplate.
|
||||
type HookBase struct {
|
||||
}
|
||||
|
||||
// UsesPreGet implements the Hook interface and returns false.
|
||||
func (b *HookBase) UsesPreGet() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// UsesPostGet implements the Hook interface and returns false.
|
||||
func (b *HookBase) UsesPostGet() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// UsesPrePut implements the Hook interface and returns false.
|
||||
func (b *HookBase) UsesPrePut() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// PreGet implements the Hook interface.
|
||||
func (b *HookBase) PreGet(dbKey string) error {
|
||||
return nil
|
||||
|
@ -22,8 +37,3 @@ func (b *HookBase) PostGet(r record.Record) (record.Record, error) {
|
|||
func (b *HookBase) PrePut(r record.Record) (record.Record, error) {
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// PostPut implements the Hook interface.
|
||||
func (b *HookBase) PostPut(r record.Record) {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,20 +0,0 @@
|
|||
package query
|
||||
|
||||
import (
|
||||
"github.com/Safing/portbase/database/accessor"
|
||||
)
|
||||
|
||||
type noCond struct {
|
||||
}
|
||||
|
||||
func (c *noCond) complies(acc accessor.Accessor) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *noCond) check() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *noCond) string() string {
|
||||
return ""
|
||||
}
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/Safing/portbase/database/accessor"
|
||||
"github.com/Safing/portbase/database/record"
|
||||
)
|
||||
|
||||
|
@ -101,14 +102,10 @@ func (q *Query) MatchesKey(dbKey string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// Matches checks whether the query matches the supplied data object.
|
||||
func (q *Query) Matches(r record.Record) bool {
|
||||
if !strings.HasPrefix(r.DatabaseKey(), q.dbKeyPrefix) {
|
||||
return false
|
||||
}
|
||||
|
||||
// MatchesRecord checks whether the query matches the supplied database record (value only).
|
||||
func (q *Query) MatchesRecord(r record.Record) bool {
|
||||
if q.where == nil {
|
||||
return false
|
||||
return true
|
||||
}
|
||||
|
||||
acc := r.GetAccessor(r)
|
||||
|
@ -118,14 +115,33 @@ func (q *Query) Matches(r record.Record) bool {
|
|||
return q.where.complies(acc)
|
||||
}
|
||||
|
||||
// MatchesAccessor checks whether the query matches the supplied accessor (value only).
|
||||
func (q *Query) MatchesAccessor(acc accessor.Accessor) bool {
|
||||
if q.where == nil {
|
||||
return true
|
||||
}
|
||||
return q.where.complies(acc)
|
||||
}
|
||||
|
||||
// Matches checks whether the query matches the supplied database record.
|
||||
func (q *Query) Matches(r record.Record) bool {
|
||||
if q.MatchesKey(r.DatabaseKey()) {
|
||||
return true
|
||||
}
|
||||
return q.MatchesRecord(r)
|
||||
}
|
||||
|
||||
// Print returns the string representation of the query.
|
||||
func (q *Query) Print() string {
|
||||
where := q.where.string()
|
||||
if where != "" {
|
||||
if strings.HasPrefix(where, "(") {
|
||||
where = where[1 : len(where)-1]
|
||||
var where string
|
||||
if q.where != nil {
|
||||
where = q.where.string()
|
||||
if where != "" {
|
||||
if strings.HasPrefix(where, "(") {
|
||||
where = where[1 : len(where)-1]
|
||||
}
|
||||
where = fmt.Sprintf(" where %s", where)
|
||||
}
|
||||
where = fmt.Sprintf(" where %s", where)
|
||||
}
|
||||
|
||||
var orderBy string
|
||||
|
|
|
@ -3,7 +3,7 @@ package query
|
|||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/Safing/portbase/database/accessor"
|
||||
"github.com/Safing/portbase/database/record"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -46,12 +46,12 @@ var (
|
|||
}`
|
||||
)
|
||||
|
||||
func testQuery(t *testing.T, acc accessor.Accessor, shouldMatch bool, condition Condition) {
|
||||
func testQuery(t *testing.T, r record.Record, shouldMatch bool, condition Condition) {
|
||||
q := New("test:").Where(condition).MustBeValid()
|
||||
|
||||
// fmt.Printf("%s\n", q.String())
|
||||
// fmt.Printf("%s\n", q.Print())
|
||||
|
||||
matched := q.Matches(acc)
|
||||
matched := q.Matches(r)
|
||||
switch {
|
||||
case !matched && shouldMatch:
|
||||
t.Errorf("should match: %s", q.Print())
|
||||
|
@ -65,36 +65,39 @@ func TestQuery(t *testing.T) {
|
|||
// if !gjson.Valid(testJSON) {
|
||||
// t.Fatal("test json is invalid")
|
||||
// }
|
||||
f := accessor.NewJSONAccessor(&testJSON)
|
||||
r, err := record.NewWrapper("", nil, append([]byte("J"), []byte(testJSON)...))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testQuery(t, f, true, Where("age", Equals, 100))
|
||||
testQuery(t, f, true, Where("age", GreaterThan, uint8(99)))
|
||||
testQuery(t, f, true, Where("age", GreaterThanOrEqual, 99))
|
||||
testQuery(t, f, true, Where("age", GreaterThanOrEqual, 100))
|
||||
testQuery(t, f, true, Where("age", LessThan, 101))
|
||||
testQuery(t, f, true, Where("age", LessThanOrEqual, "101"))
|
||||
testQuery(t, f, true, Where("age", LessThanOrEqual, 100))
|
||||
testQuery(t, r, true, Where("age", Equals, 100))
|
||||
testQuery(t, r, true, Where("age", GreaterThan, uint8(99)))
|
||||
testQuery(t, r, true, Where("age", GreaterThanOrEqual, 99))
|
||||
testQuery(t, r, true, Where("age", GreaterThanOrEqual, 100))
|
||||
testQuery(t, r, true, Where("age", LessThan, 101))
|
||||
testQuery(t, r, true, Where("age", LessThanOrEqual, "101"))
|
||||
testQuery(t, r, true, Where("age", LessThanOrEqual, 100))
|
||||
|
||||
testQuery(t, f, true, Where("temperature", FloatEquals, 120.413))
|
||||
testQuery(t, f, true, Where("temperature", FloatGreaterThan, 120))
|
||||
testQuery(t, f, true, Where("temperature", FloatGreaterThanOrEqual, 120))
|
||||
testQuery(t, f, true, Where("temperature", FloatGreaterThanOrEqual, 120.413))
|
||||
testQuery(t, f, true, Where("temperature", FloatLessThan, 121))
|
||||
testQuery(t, f, true, Where("temperature", FloatLessThanOrEqual, "121"))
|
||||
testQuery(t, f, true, Where("temperature", FloatLessThanOrEqual, "120.413"))
|
||||
testQuery(t, r, true, Where("temperature", FloatEquals, 120.413))
|
||||
testQuery(t, r, true, Where("temperature", FloatGreaterThan, 120))
|
||||
testQuery(t, r, true, Where("temperature", FloatGreaterThanOrEqual, 120))
|
||||
testQuery(t, r, true, Where("temperature", FloatGreaterThanOrEqual, 120.413))
|
||||
testQuery(t, r, true, Where("temperature", FloatLessThan, 121))
|
||||
testQuery(t, r, true, Where("temperature", FloatLessThanOrEqual, "121"))
|
||||
testQuery(t, r, true, Where("temperature", FloatLessThanOrEqual, "120.413"))
|
||||
|
||||
testQuery(t, f, true, Where("lastly.yay", SameAs, "final"))
|
||||
testQuery(t, f, true, Where("lastly.yay", Contains, "ina"))
|
||||
testQuery(t, f, true, Where("lastly.yay", StartsWith, "fin"))
|
||||
testQuery(t, f, true, Where("lastly.yay", EndsWith, "nal"))
|
||||
testQuery(t, f, true, Where("lastly.yay", In, "draft,final"))
|
||||
testQuery(t, f, true, Where("lastly.yay", In, "final,draft"))
|
||||
testQuery(t, r, true, Where("lastly.yay", SameAs, "final"))
|
||||
testQuery(t, r, true, Where("lastly.yay", Contains, "ina"))
|
||||
testQuery(t, r, true, Where("lastly.yay", StartsWith, "fin"))
|
||||
testQuery(t, r, true, Where("lastly.yay", EndsWith, "nal"))
|
||||
testQuery(t, r, true, Where("lastly.yay", In, "draft,final"))
|
||||
testQuery(t, r, true, Where("lastly.yay", In, "final,draft"))
|
||||
|
||||
testQuery(t, f, true, Where("happy", Is, true))
|
||||
testQuery(t, f, true, Where("happy", Is, "true"))
|
||||
testQuery(t, f, true, Where("happy", Is, "t"))
|
||||
testQuery(t, f, true, Not(Where("happy", Is, "0")))
|
||||
testQuery(t, f, true, And(
|
||||
testQuery(t, r, true, Where("happy", Is, true))
|
||||
testQuery(t, r, true, Where("happy", Is, "true"))
|
||||
testQuery(t, r, true, Where("happy", Is, "t"))
|
||||
testQuery(t, r, true, Not(Where("happy", Is, "0")))
|
||||
testQuery(t, r, true, And(
|
||||
Where("happy", Is, "1"),
|
||||
Not(Or(
|
||||
Where("happy", Is, false),
|
||||
|
@ -102,8 +105,8 @@ func TestQuery(t *testing.T) {
|
|||
)),
|
||||
))
|
||||
|
||||
testQuery(t, f, true, Where("happy", Exists, nil))
|
||||
testQuery(t, r, true, Where("happy", Exists, nil))
|
||||
|
||||
testQuery(t, f, true, Where("created", Matches, "^2014-[0-9]{2}-[0-9]{2}T"))
|
||||
testQuery(t, r, true, Where("created", Matches, "^2014-[0-9]{2}-[0-9]{2}T"))
|
||||
|
||||
}
|
||||
|
|
|
@ -138,13 +138,12 @@ func (b *Badger) queryExecutor(queryIter *iterator.Iterator, q *query.Query, loc
|
|||
continue
|
||||
}
|
||||
|
||||
acc := r.GetAccessor(r)
|
||||
if acc != nil && q.Matches(acc) {
|
||||
if q.MatchesRecord(r) {
|
||||
copiedData, err := item.ValueCopy(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
new, err := record.NewRawWrapper(b.name, string(item.Key()), copiedData)
|
||||
new, err := record.NewRawWrapper(b.name, r.DatabaseKey(), copiedData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/Safing/portbase/database/query"
|
||||
"github.com/Safing/portbase/database/record"
|
||||
)
|
||||
|
||||
|
@ -42,6 +43,7 @@ func TestBadger(t *testing.T) {
|
|||
}
|
||||
defer os.RemoveAll(testDir) // clean up
|
||||
|
||||
// start
|
||||
db, err := NewBadger("test", testDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -67,11 +69,13 @@ func TestBadger(t *testing.T) {
|
|||
a.Meta().Update()
|
||||
a.SetKey("test:A")
|
||||
|
||||
// put record
|
||||
err = db.Put(a)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// get and compare
|
||||
r1, err := db.Get("A")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -87,26 +91,46 @@ func TestBadger(t *testing.T) {
|
|||
t.Fatalf("mismatch, got %v", a1)
|
||||
}
|
||||
|
||||
// test query
|
||||
q := query.New("").MustBeValid()
|
||||
it, err := db.Query(q, true, true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cnt := 0
|
||||
for _ = range it.Next {
|
||||
cnt++
|
||||
}
|
||||
if it.Error != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cnt != 1 {
|
||||
t.Fatalf("unexpected query result count: %d", cnt)
|
||||
}
|
||||
|
||||
// delete
|
||||
err = db.Delete("A")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// check if its gone
|
||||
_, err = db.Get("A")
|
||||
if err == nil {
|
||||
t.Fatal("should fail")
|
||||
}
|
||||
|
||||
// maintenance
|
||||
err = db.Maintain()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = db.MaintainThorough()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// shutdown
|
||||
err = db.Shutdown()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -25,7 +25,7 @@ func Subscribe(q *query.Query) (*Subscription, error) {
|
|||
}
|
||||
|
||||
c.readLock.Lock()
|
||||
defer c.readLock.Lock()
|
||||
defer c.readLock.Unlock()
|
||||
c.writeLock.Lock()
|
||||
defer c.writeLock.Unlock()
|
||||
|
||||
|
@ -45,7 +45,7 @@ func (s *Subscription) Cancel() error {
|
|||
}
|
||||
|
||||
c.readLock.Lock()
|
||||
defer c.readLock.Lock()
|
||||
defer c.readLock.Unlock()
|
||||
c.writeLock.Lock()
|
||||
defer c.writeLock.Unlock()
|
||||
|
||||
|
|
|
@ -1,272 +0,0 @@
|
|||
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
|
||||
|
||||
package database
|
||||
|
||||
// import (
|
||||
// "fmt"
|
||||
// "strings"
|
||||
// "sync"
|
||||
//
|
||||
// "github.com/Safing/portbase/database/record"
|
||||
// "github.com/Safing/portbase/modules"
|
||||
// "github.com/Safing/portbase/taskmanager"
|
||||
//
|
||||
// "github.com/tevino/abool"
|
||||
// )
|
||||
//
|
||||
// var subscriptionModule *modules.Module
|
||||
// var subscriptions []*Subscription
|
||||
// var subLock sync.Mutex
|
||||
//
|
||||
// var databaseUpdate chan Model
|
||||
// var databaseCreate chan Model
|
||||
// var databaseDelete chan string
|
||||
//
|
||||
// var workIsWaiting chan *struct{}
|
||||
// var workIsWaitingFlag *abool.AtomicBool
|
||||
// var forceProcessing chan *struct{}
|
||||
//
|
||||
// type Subscription struct {
|
||||
// typeAndLocation map[string]bool
|
||||
// exactObject map[string]bool
|
||||
// children map[string]uint8
|
||||
// Created chan record.Record
|
||||
// Updated chan record.Record
|
||||
// Deleted chan string
|
||||
// }
|
||||
//
|
||||
// func NewSubscription() *Subscription {
|
||||
// subLock.Lock()
|
||||
// defer subLock.Unlock()
|
||||
// sub := &Subscription{
|
||||
// typeAndLocation: make(map[string]bool),
|
||||
// exactObject: make(map[string]bool),
|
||||
// children: make(map[string]uint8),
|
||||
// Created: make(chan record.Record, 128),
|
||||
// Updated: make(chan record.Record, 128),
|
||||
// Deleted: make(chan string, 128),
|
||||
// }
|
||||
// subscriptions = append(subscriptions, sub)
|
||||
// return sub
|
||||
// }
|
||||
//
|
||||
// func (sub *Subscription) Subscribe(subKey string) {
|
||||
// subLock.Lock()
|
||||
// defer subLock.Unlock()
|
||||
//
|
||||
// namespaces := strings.Split(subKey, "/")[1:]
|
||||
// lastSpace := ""
|
||||
// if len(namespaces) != 0 {
|
||||
// lastSpace = namespaces[len(namespaces)-1]
|
||||
// }
|
||||
//
|
||||
// switch {
|
||||
// case lastSpace == "":
|
||||
// // save key without leading "/"
|
||||
// // save with depth 255 to get all
|
||||
// sub.children[strings.Trim(subKey, "/")] = 0xFF
|
||||
// case strings.HasPrefix(lastSpace, "*"):
|
||||
// // save key without leading or trailing "/" or "*"
|
||||
// // save full wanted depth - this makes comparison easier
|
||||
// sub.children[strings.Trim(subKey, "/*")] = uint8(len(lastSpace) + len(namespaces) - 1)
|
||||
// case strings.Contains(lastSpace, ":"):
|
||||
// sub.exactObject[subKey] = true
|
||||
// default:
|
||||
// sub.typeAndLocation[subKey] = true
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func (sub *Subscription) Unsubscribe(subKey string) {
|
||||
// subLock.Lock()
|
||||
// defer subLock.Unlock()
|
||||
//
|
||||
// namespaces := strings.Split(subKey, "/")[1:]
|
||||
// lastSpace := ""
|
||||
// if len(namespaces) != 0 {
|
||||
// lastSpace = namespaces[len(namespaces)-1]
|
||||
// }
|
||||
//
|
||||
// switch {
|
||||
// case lastSpace == "":
|
||||
// delete(sub.children, strings.Trim(subKey, "/"))
|
||||
// case strings.HasPrefix(lastSpace, "*"):
|
||||
// delete(sub.children, strings.Trim(subKey, "/*"))
|
||||
// case strings.Contains(lastSpace, ":"):
|
||||
// delete(sub.exactObject, subKey)
|
||||
// default:
|
||||
// delete(sub.typeAndLocation, subKey)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func (sub *Subscription) Destroy() {
|
||||
// subLock.Lock()
|
||||
// defer subLock.Unlock()
|
||||
//
|
||||
// for k, v := range subscriptions {
|
||||
// if v.Created == sub.Created {
|
||||
// defer func() {
|
||||
// subscriptions = append(subscriptions[:k], subscriptions[k+1:]...)
|
||||
// }()
|
||||
// close(sub.Created)
|
||||
// close(sub.Updated)
|
||||
// close(sub.Deleted)
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func (sub *Subscription) Subscriptions() *[]string {
|
||||
// subStrings := make([]string, 0)
|
||||
// for subString := range sub.exactObject {
|
||||
// subStrings = append(subStrings, subString)
|
||||
// }
|
||||
// for subString := range sub.typeAndLocation {
|
||||
// subStrings = append(subStrings, subString)
|
||||
// }
|
||||
// for subString, depth := range sub.children {
|
||||
// if depth == 0xFF {
|
||||
// subStrings = append(subStrings, fmt.Sprintf("/%s/", subString))
|
||||
// } else {
|
||||
// subStrings = append(subStrings, fmt.Sprintf("/%s/%s", subString, strings.Repeat("*", int(depth)-len(strings.Split(subString, "/")))))
|
||||
// }
|
||||
// }
|
||||
// return &subStrings
|
||||
// }
|
||||
//
|
||||
// func (sub *Subscription) String() string {
|
||||
// return fmt.Sprintf("<Subscription [%s]>", strings.Join(*sub.Subscriptions(), " "))
|
||||
// }
|
||||
//
|
||||
// func (sub *Subscription) send(key string, rec record.Record, created bool) {
|
||||
// if rec == nil {
|
||||
// sub.Deleted <- key
|
||||
// } else if created {
|
||||
// sub.Created <- rec
|
||||
// } else {
|
||||
// sub.Updated <- rec
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func process(key string, rec record.Record, created bool) {
|
||||
// subLock.Lock()
|
||||
// defer subLock.Unlock()
|
||||
//
|
||||
// stringRep := key.String()
|
||||
// // "/Comedy/MontyPython/Actor:JohnCleese"
|
||||
// typeAndLocation := key.Path().String()
|
||||
// // "/Comedy/MontyPython/Actor"
|
||||
// namespaces := key.Namespaces()
|
||||
// // ["Comedy", "MontyPython", "Actor:JohnCleese"]
|
||||
// depth := uint8(len(namespaces))
|
||||
// // 3
|
||||
//
|
||||
// subscriptionLoop:
|
||||
// for _, sub := range subscriptions {
|
||||
// if _, ok := sub.exactObject[stringRep]; ok {
|
||||
// sub.send(key, rec, created)
|
||||
// continue subscriptionLoop
|
||||
// }
|
||||
// if _, ok := sub.typeAndLocation[typeAndLocation]; ok {
|
||||
// sub.send(key, rec, created)
|
||||
// continue subscriptionLoop
|
||||
// }
|
||||
// for i := 0; i < len(namespaces); i++ {
|
||||
// if subscribedDepth, ok := sub.children[strings.Join(namespaces[:i], "/")]; ok {
|
||||
// if subscribedDepth >= depth {
|
||||
// sub.send(key, rec, created)
|
||||
// continue subscriptionLoop
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//
|
||||
// func init() {
|
||||
// subscriptionModule = modules.Register("Database:Subscriptions", 128)
|
||||
// subscriptions = make([]*Subscription, 0)
|
||||
// subLock = sync.Mutex{}
|
||||
//
|
||||
// databaseUpdate = make(chan Model, 32)
|
||||
// databaseCreate = make(chan Model, 32)
|
||||
// databaseDelete = make(chan string, 32)
|
||||
//
|
||||
// workIsWaiting = make(chan *struct{}, 0)
|
||||
// workIsWaitingFlag = abool.NewBool(false)
|
||||
// forceProcessing = make(chan *struct{}, 0)
|
||||
//
|
||||
// go run()
|
||||
// }
|
||||
//
|
||||
// func run() {
|
||||
// for {
|
||||
// select {
|
||||
// case <-subscriptionModule.Stop:
|
||||
// subscriptionModule.StopComplete()
|
||||
// return
|
||||
// case <-workIsWaiting:
|
||||
// work()
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func work() {
|
||||
// defer workIsWaitingFlag.UnSet()
|
||||
//
|
||||
// // wait
|
||||
// select {
|
||||
// case <-taskmanager.StartMediumPriorityMicroTask():
|
||||
// defer taskmanager.EndMicroTask()
|
||||
// case <-forceProcessing:
|
||||
// }
|
||||
//
|
||||
// // work
|
||||
// for {
|
||||
// select {
|
||||
// case rec := <-databaseCreate:
|
||||
// process(rec.GetKey(), rec, true)
|
||||
// case rec := <-databaseUpdate:
|
||||
// process(rec.GetKey(), rec, false)
|
||||
// case key := <-databaseDelete:
|
||||
// process(key, nil, false)
|
||||
// default:
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func handleCreateSubscriptions(rec record.Record) {
|
||||
// select {
|
||||
// case databaseCreate <- rec:
|
||||
// default:
|
||||
// forceProcessing <- nil
|
||||
// databaseCreate <- rec
|
||||
// }
|
||||
// if workIsWaitingFlag.SetToIf(false, true) {
|
||||
// workIsWaiting <- nil
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func handleUpdateSubscriptions(rec record.Record) {
|
||||
// select {
|
||||
// case databaseUpdate <- rec:
|
||||
// default:
|
||||
// forceProcessing <- nil
|
||||
// databaseUpdate <- rec
|
||||
// }
|
||||
// if workIsWaitingFlag.SetToIf(false, true) {
|
||||
// workIsWaiting <- nil
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func handleDeleteSubscriptions(key string) {
|
||||
// select {
|
||||
// case databaseDelete <- key:
|
||||
// default:
|
||||
// forceProcessing <- nil
|
||||
// databaseDelete <- key
|
||||
// }
|
||||
// if workIsWaitingFlag.SetToIf(false, true) {
|
||||
// workIsWaiting <- nil
|
||||
// }
|
||||
// }
|
|
@ -1,103 +0,0 @@
|
|||
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
|
||||
|
||||
package database
|
||||
|
||||
// import (
|
||||
// "strconv"
|
||||
// "strings"
|
||||
// "sync"
|
||||
// "testing"
|
||||
// )
|
||||
//
|
||||
// var subTestWg sync.WaitGroup
|
||||
//
|
||||
// func waitForSubs(t *testing.T, sub *Subscription, highest int) {
|
||||
// defer subTestWg.Done()
|
||||
// expecting := 1
|
||||
// var subbedModel Model
|
||||
// forLoop:
|
||||
// for {
|
||||
// select {
|
||||
// case subbedModel = <-sub.Created:
|
||||
// case subbedModel = <-sub.Updated:
|
||||
// }
|
||||
// t.Logf("got model from subscription: %s", subbedModel.GetKey().String())
|
||||
// if !strings.HasPrefix(subbedModel.GetKey().Name(), "sub") {
|
||||
// // not a model that we use for testing, other tests might be interfering
|
||||
// continue forLoop
|
||||
// }
|
||||
// number, err := strconv.Atoi(strings.TrimPrefix(subbedModel.GetKey().Name(), "sub"))
|
||||
// if err != nil || number != expecting {
|
||||
// t.Errorf("test subscription: got unexpected model %s, expected sub%d", subbedModel.GetKey().String(), expecting)
|
||||
// continue forLoop
|
||||
// }
|
||||
// if number == highest {
|
||||
// return
|
||||
// }
|
||||
// expecting++
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func TestSubscriptions(t *testing.T) {
|
||||
//
|
||||
// // create subscription
|
||||
// sub := NewSubscription()
|
||||
//
|
||||
// // FIRST TEST
|
||||
//
|
||||
// subTestWg.Add(1)
|
||||
// go waitForSubs(t, sub, 3)
|
||||
// sub.Subscribe("/Tests/")
|
||||
// t.Log(sub.String())
|
||||
//
|
||||
// (&(TestingModel{})).CreateInNamespace("", "sub1")
|
||||
// (&(TestingModel{})).CreateInNamespace("A", "sub2")
|
||||
// (&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "sub3")
|
||||
//
|
||||
// subTestWg.Wait()
|
||||
//
|
||||
// // SECOND TEST
|
||||
//
|
||||
// subTestWg.Add(1)
|
||||
// go waitForSubs(t, sub, 3)
|
||||
// sub.Unsubscribe("/Tests/")
|
||||
// sub.Subscribe("/Tests/A/****")
|
||||
// t.Log(sub.String())
|
||||
//
|
||||
// (&(TestingModel{})).CreateInNamespace("", "subX")
|
||||
// (&(TestingModel{})).CreateInNamespace("A", "sub1")
|
||||
// (&(TestingModel{})).CreateInNamespace("A/B/C/D", "sub2")
|
||||
// (&(TestingModel{})).CreateInNamespace("A/B/C/D/E", "subX")
|
||||
// (&(TestingModel{})).CreateInNamespace("A", "sub3")
|
||||
//
|
||||
// subTestWg.Wait()
|
||||
//
|
||||
// // THIRD TEST
|
||||
//
|
||||
// subTestWg.Add(1)
|
||||
// go waitForSubs(t, sub, 3)
|
||||
// sub.Unsubscribe("/Tests/A/****")
|
||||
// sub.Subscribe("/Tests/TestingModel:sub1")
|
||||
// sub.Subscribe("/Tests/TestingModel:sub1/TestingModel")
|
||||
// t.Log(sub.String())
|
||||
//
|
||||
// (&(TestingModel{})).CreateInNamespace("", "sub1")
|
||||
// (&(TestingModel{})).CreateInNamespace("", "subX")
|
||||
// (&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub2")
|
||||
// (&(TestingModel{})).CreateInNamespace("TestingModel:sub1/A", "subX")
|
||||
// (&(TestingModel{})).CreateInNamespace("TestingModel:sub1", "sub3")
|
||||
//
|
||||
// subTestWg.Wait()
|
||||
//
|
||||
// // FINAL STUFF
|
||||
//
|
||||
// model := &TestingModel{}
|
||||
// model.CreateInNamespace("Invalid", "subX")
|
||||
// model.Save()
|
||||
//
|
||||
// sub.Destroy()
|
||||
//
|
||||
// // time.Sleep(1 * time.Second)
|
||||
// // pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
//
|
||||
// }
|
Loading…
Add table
Reference in a new issue