mirror of
https://github.com/safing/portbase
synced 2025-09-02 10:40:39 +00:00
Merge pull request #99 from safing/feature/database-api-metadata
Add metadata to records sent via the API
This commit is contained in:
commit
c8c049f5e8
1 changed files with 92 additions and 26 deletions
118
api/database.go
118
api/database.go
|
@ -7,6 +7,11 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/tidwall/sjson"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/database/iterator"
|
||||||
|
"github.com/safing/portbase/formats/varint"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/tevino/abool"
|
"github.com/tevino/abool"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
@ -45,6 +50,9 @@ type DatabaseAPI struct {
|
||||||
conn *websocket.Conn
|
conn *websocket.Conn
|
||||||
sendQueue chan []byte
|
sendQueue chan []byte
|
||||||
|
|
||||||
|
queriesLock sync.Mutex
|
||||||
|
queries map[string]*iterator.Iterator
|
||||||
|
|
||||||
subsLock sync.Mutex
|
subsLock sync.Mutex
|
||||||
subs map[string]*database.Subscription
|
subs map[string]*database.Subscription
|
||||||
|
|
||||||
|
@ -75,6 +83,7 @@ func startDatabaseAPI(w http.ResponseWriter, r *http.Request) {
|
||||||
new := &DatabaseAPI{
|
new := &DatabaseAPI{
|
||||||
conn: wsConn,
|
conn: wsConn,
|
||||||
sendQueue: make(chan []byte, 100),
|
sendQueue: make(chan []byte, 100),
|
||||||
|
queries: make(map[string]*iterator.Iterator),
|
||||||
subs: make(map[string]*database.Subscription),
|
subs: make(map[string]*database.Subscription),
|
||||||
shutdownSignal: make(chan struct{}),
|
shutdownSignal: make(chan struct{}),
|
||||||
shuttingDown: abool.NewBool(false),
|
shuttingDown: abool.NewBool(false),
|
||||||
|
@ -97,11 +106,13 @@ func (api *DatabaseAPI) handler() {
|
||||||
// 124|done
|
// 124|done
|
||||||
// 124|error|<message>
|
// 124|error|<message>
|
||||||
// 124|warning|<message> // error with single record, operation continues
|
// 124|warning|<message> // error with single record, operation continues
|
||||||
|
// 124|cancel
|
||||||
// 125|sub|<query>
|
// 125|sub|<query>
|
||||||
// 125|upd|<key>|<data>
|
// 125|upd|<key>|<data>
|
||||||
// 125|new|<key>|<data>
|
// 125|new|<key>|<data>
|
||||||
// 127|del|<key>
|
// 127|del|<key>
|
||||||
// 125|warning|<message> // error with single record, operation continues
|
// 125|warning|<message> // error with single record, operation continues
|
||||||
|
// 125|cancel
|
||||||
// 127|qsub|<query>
|
// 127|qsub|<query>
|
||||||
// 127|ok|<key>|<data>
|
// 127|ok|<key>|<data>
|
||||||
// 127|done
|
// 127|done
|
||||||
|
@ -110,6 +121,7 @@ func (api *DatabaseAPI) handler() {
|
||||||
// 127|new|<key>|<data>
|
// 127|new|<key>|<data>
|
||||||
// 127|del|<key>
|
// 127|del|<key>
|
||||||
// 127|warning|<message> // error with single record, operation continues
|
// 127|warning|<message> // error with single record, operation continues
|
||||||
|
// 127|cancel
|
||||||
|
|
||||||
// 128|create|<key>|<data>
|
// 128|create|<key>|<data>
|
||||||
// 128|success
|
// 128|success
|
||||||
|
@ -147,6 +159,7 @@ func (api *DatabaseAPI) handler() {
|
||||||
|
|
||||||
// Handle special command "cancel"
|
// Handle special command "cancel"
|
||||||
if len(parts) == 2 && string(parts[1]) == "cancel" {
|
if len(parts) == 2 && string(parts[1]) == "cancel" {
|
||||||
|
// 124|cancel
|
||||||
// 125|cancel
|
// 125|cancel
|
||||||
// 127|cancel
|
// 127|cancel
|
||||||
go api.handleCancel(parts[0])
|
go api.handleCancel(parts[0])
|
||||||
|
@ -265,7 +278,7 @@ func (api *DatabaseAPI) handleGet(opID []byte, key string) {
|
||||||
|
|
||||||
r, err := api.db.Get(key)
|
r, err := api.db.Get(key)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
data, err = r.Marshal(r, record.JSON)
|
data, err = marshalRecord(r)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
api.send(opID, dbMsgTypeError, err.Error(), nil)
|
api.send(opID, dbMsgTypeError, err.Error(), nil)
|
||||||
|
@ -281,6 +294,7 @@ func (api *DatabaseAPI) handleQuery(opID []byte, queryText string) {
|
||||||
// 124|warning|<message>
|
// 124|warning|<message>
|
||||||
// 124|error|<message>
|
// 124|error|<message>
|
||||||
// 124|warning|<message> // error with single record, operation continues
|
// 124|warning|<message> // error with single record, operation continues
|
||||||
|
// 124|cancel
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -300,19 +314,17 @@ func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
for r := range it.Next {
|
// Save query iterator.
|
||||||
r.Lock()
|
api.queriesLock.Lock()
|
||||||
data, err := r.Marshal(r, record.JSON)
|
api.queries[string(opID)] = it
|
||||||
r.Unlock()
|
api.queriesLock.Unlock()
|
||||||
if err != nil {
|
|
||||||
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
|
// Remove query iterator after it ended.
|
||||||
}
|
defer func() {
|
||||||
api.send(opID, dbMsgTypeOk, r.Key(), data)
|
api.queriesLock.Lock()
|
||||||
}
|
defer api.queriesLock.Unlock()
|
||||||
if it.Err() != nil {
|
delete(api.queries, string(opID))
|
||||||
api.send(opID, dbMsgTypeError, it.Err().Error(), nil)
|
}()
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -324,9 +336,7 @@ func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) {
|
||||||
// process query feed
|
// process query feed
|
||||||
if r != nil {
|
if r != nil {
|
||||||
// process record
|
// process record
|
||||||
r.Lock()
|
data, err := marshalRecord(r)
|
||||||
data, err := r.Marshal(r, record.JSON)
|
|
||||||
r.Unlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
|
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
|
||||||
}
|
}
|
||||||
|
@ -376,15 +386,15 @@ func (api *DatabaseAPI) registerSub(opID []byte, q *query.Query) (sub *database.
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save subscription.
|
|
||||||
api.subsLock.Lock()
|
|
||||||
defer api.subsLock.Unlock()
|
|
||||||
api.subs[string(opID)] = sub
|
|
||||||
|
|
||||||
return sub, true
|
return sub, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
|
func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
|
||||||
|
// Save subscription.
|
||||||
|
api.subsLock.Lock()
|
||||||
|
api.subs[string(opID)] = sub
|
||||||
|
api.subsLock.Unlock()
|
||||||
|
|
||||||
// Remove subscription after it ended.
|
// Remove subscription after it ended.
|
||||||
defer func() {
|
defer func() {
|
||||||
api.subsLock.Lock()
|
api.subsLock.Lock()
|
||||||
|
@ -402,9 +412,7 @@ func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
|
||||||
// process sub feed
|
// process sub feed
|
||||||
if r != nil {
|
if r != nil {
|
||||||
// process record
|
// process record
|
||||||
r.Lock()
|
data, err := marshalRecord(r)
|
||||||
data, err := r.Marshal(r, record.JSON)
|
|
||||||
r.Unlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
|
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
|
||||||
continue
|
continue
|
||||||
|
@ -462,6 +470,28 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *DatabaseAPI) handleCancel(opID []byte) {
|
func (api *DatabaseAPI) handleCancel(opID []byte) {
|
||||||
|
api.cancelQuery(opID)
|
||||||
|
api.cancelSub(opID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *DatabaseAPI) cancelQuery(opID []byte) {
|
||||||
|
api.queriesLock.Lock()
|
||||||
|
defer api.queriesLock.Unlock()
|
||||||
|
|
||||||
|
// Get subscription from api.
|
||||||
|
it, ok := api.queries[string(opID)]
|
||||||
|
if !ok {
|
||||||
|
// Fail silently as quries end by themselves when finished.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// End query.
|
||||||
|
it.Cancel()
|
||||||
|
|
||||||
|
// The query handler will end the communication with a done message.
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *DatabaseAPI) cancelSub(opID []byte) {
|
||||||
api.subsLock.Lock()
|
api.subsLock.Lock()
|
||||||
defer api.subsLock.Unlock()
|
defer api.subsLock.Unlock()
|
||||||
|
|
||||||
|
@ -478,7 +508,7 @@ func (api *DatabaseAPI) handleCancel(opID []byte) {
|
||||||
api.send(opID, dbMsgTypeError, fmt.Sprintf("failed to cancel subscription: %s", err), nil)
|
api.send(opID, dbMsgTypeError, fmt.Sprintf("failed to cancel subscription: %s", err), nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscription handler will end the communication with a done message.
|
// The subscription handler will end the communication with a done message.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create bool) {
|
func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create bool) {
|
||||||
|
@ -592,3 +622,39 @@ func (api *DatabaseAPI) shutdown() {
|
||||||
api.conn.Close()
|
api.conn.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// marsharlRecords locks and marshals the given record, additionally adding
|
||||||
|
// metadata and returning it as json.
|
||||||
|
func marshalRecord(r record.Record) ([]byte, error) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
// Pour record into JSON.
|
||||||
|
jsonData, err := r.Marshal(r, record.JSON)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove JSON identifier for manual editing.
|
||||||
|
jsonData = bytes.TrimPrefix(jsonData, varint.Pack8(record.JSON))
|
||||||
|
|
||||||
|
// Add metadata.
|
||||||
|
jsonData, err = sjson.SetBytes(jsonData, "_meta", r.Meta())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add database key.
|
||||||
|
jsonData, err = sjson.SetBytes(jsonData, "_meta.Key", r.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add JSON identifier again.
|
||||||
|
formatID := varint.Pack8(record.JSON)
|
||||||
|
finalData := make([]byte, 0, len(formatID)+len(jsonData))
|
||||||
|
finalData = append(finalData, formatID...)
|
||||||
|
finalData = append(finalData, jsonData...)
|
||||||
|
|
||||||
|
return finalData, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue