Add metadata to records sent via the API

This commit is contained in:
Daniel 2020-10-30 15:11:16 +01:00
parent 9c3b240825
commit cfbe4d9204

View file

@ -7,6 +7,11 @@ import (
"net/http"
"sync"
"github.com/tidwall/sjson"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/formats/varint"
"github.com/gorilla/websocket"
"github.com/tevino/abool"
"github.com/tidwall/gjson"
@ -45,6 +50,9 @@ type DatabaseAPI struct {
conn *websocket.Conn
sendQueue chan []byte
queriesLock sync.Mutex
queries map[string]*iterator.Iterator
subsLock sync.Mutex
subs map[string]*database.Subscription
@ -75,6 +83,7 @@ func startDatabaseAPI(w http.ResponseWriter, r *http.Request) {
new := &DatabaseAPI{
conn: wsConn,
sendQueue: make(chan []byte, 100),
queries: make(map[string]*iterator.Iterator),
subs: make(map[string]*database.Subscription),
shutdownSignal: make(chan struct{}),
shuttingDown: abool.NewBool(false),
@ -97,11 +106,13 @@ func (api *DatabaseAPI) handler() {
// 124|done
// 124|error|<message>
// 124|warning|<message> // error with single record, operation continues
// 124|cancel
// 125|sub|<query>
// 125|upd|<key>|<data>
// 125|new|<key>|<data>
// 127|del|<key>
// 125|warning|<message> // error with single record, operation continues
// 125|cancel
// 127|qsub|<query>
// 127|ok|<key>|<data>
// 127|done
@ -110,6 +121,7 @@ func (api *DatabaseAPI) handler() {
// 127|new|<key>|<data>
// 127|del|<key>
// 127|warning|<message> // error with single record, operation continues
// 127|cancel
// 128|create|<key>|<data>
// 128|success
@ -147,6 +159,7 @@ func (api *DatabaseAPI) handler() {
// Handle special command "cancel"
if len(parts) == 2 && string(parts[1]) == "cancel" {
// 124|cancel
// 125|cancel
// 127|cancel
go api.handleCancel(parts[0])
@ -265,7 +278,7 @@ func (api *DatabaseAPI) handleGet(opID []byte, key string) {
r, err := api.db.Get(key)
if err == nil {
data, err = r.Marshal(r, record.JSON)
data, err = marshalRecord(r)
}
if err != nil {
api.send(opID, dbMsgTypeError, err.Error(), nil)
@ -281,6 +294,7 @@ func (api *DatabaseAPI) handleQuery(opID []byte, queryText string) {
// 124|warning|<message>
// 124|error|<message>
// 124|warning|<message> // error with single record, operation continues
// 124|cancel
var err error
@ -300,19 +314,17 @@ func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) {
return false
}
for r := range it.Next {
r.Lock()
data, err := r.Marshal(r, record.JSON)
r.Unlock()
if err != nil {
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
}
api.send(opID, dbMsgTypeOk, r.Key(), data)
}
if it.Err() != nil {
api.send(opID, dbMsgTypeError, it.Err().Error(), nil)
return false
}
// Save query iterator.
api.queriesLock.Lock()
api.queries[string(opID)] = it
api.queriesLock.Unlock()
// Remove query iterator after it ended.
defer func() {
api.queriesLock.Lock()
defer api.queriesLock.Unlock()
delete(api.queries, string(opID))
}()
for {
select {
@ -324,9 +336,7 @@ func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) {
// process query feed
if r != nil {
// process record
r.Lock()
data, err := r.Marshal(r, record.JSON)
r.Unlock()
data, err := marshalRecord(r)
if err != nil {
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
}
@ -376,15 +386,15 @@ func (api *DatabaseAPI) registerSub(opID []byte, q *query.Query) (sub *database.
return nil, false
}
// Save subscription.
api.subsLock.Lock()
defer api.subsLock.Unlock()
api.subs[string(opID)] = sub
return sub, true
}
func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
// Save subscription.
api.subsLock.Lock()
api.subs[string(opID)] = sub
api.subsLock.Unlock()
// Remove subscription after it ended.
defer func() {
api.subsLock.Lock()
@ -402,9 +412,7 @@ func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
// process sub feed
if r != nil {
// process record
r.Lock()
data, err := r.Marshal(r, record.JSON)
r.Unlock()
data, err := marshalRecord(r)
if err != nil {
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
continue
@ -462,6 +470,28 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
}
func (api *DatabaseAPI) handleCancel(opID []byte) {
api.cancelQuery(opID)
api.cancelSub(opID)
}
func (api *DatabaseAPI) cancelQuery(opID []byte) {
api.queriesLock.Lock()
defer api.queriesLock.Unlock()
// Get subscription from api.
it, ok := api.queries[string(opID)]
if !ok {
// Fail silently as quries end by themselves when finished.
return
}
// End query.
it.Cancel()
// The query handler will end the communication with a done message.
}
func (api *DatabaseAPI) cancelSub(opID []byte) {
api.subsLock.Lock()
defer api.subsLock.Unlock()
@ -478,7 +508,7 @@ func (api *DatabaseAPI) handleCancel(opID []byte) {
api.send(opID, dbMsgTypeError, fmt.Sprintf("failed to cancel subscription: %s", err), nil)
}
// Subscription handler will end the communication with a done message.
// The subscription handler will end the communication with a done message.
}
func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create bool) {
@ -592,3 +622,39 @@ func (api *DatabaseAPI) shutdown() {
api.conn.Close()
}
}
// marsharlRecords locks and marshals the given record, additionally adding
// metadata and returning it as json.
func marshalRecord(r record.Record) ([]byte, error) {
r.Lock()
defer r.Unlock()
// Pour record into JSON.
jsonData, err := r.Marshal(r, record.JSON)
if err != nil {
return nil, err
}
// Remove JSON identifier for manual editing.
jsonData = bytes.TrimPrefix(jsonData, varint.Pack8(record.JSON))
// Add metadata.
jsonData, err = sjson.SetBytes(jsonData, "_meta", r.Meta())
if err != nil {
return nil, err
}
// Add database key.
jsonData, err = sjson.SetBytes(jsonData, "_meta.Key", r.Key())
if err != nil {
return nil, err
}
// Add JSON identifier again.
formatID := varint.Pack8(record.JSON)
finalData := make([]byte, 0, len(formatID)+len(jsonData))
finalData = append(finalData, formatID...)
finalData = append(finalData, jsonData...)
return finalData, nil
}