mirror of
https://github.com/safing/portbase
synced 2025-09-01 10:09:50 +00:00
Add cancel command to database api
This commit is contained in:
parent
85e985c493
commit
7e1125fc65
1 changed files with 48 additions and 1 deletions
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/tevino/abool"
|
||||
|
@ -43,7 +44,9 @@ func init() {
|
|||
type DatabaseAPI struct {
|
||||
conn *websocket.Conn
|
||||
sendQueue chan []byte
|
||||
subs map[string]*database.Subscription
|
||||
|
||||
subsLock sync.Mutex
|
||||
subs map[string]*database.Subscription
|
||||
|
||||
shutdownSignal chan struct{}
|
||||
shuttingDown *abool.AtomicBool
|
||||
|
@ -141,6 +144,15 @@ func (api *DatabaseAPI) handler() {
|
|||
}
|
||||
|
||||
parts := bytes.SplitN(msg, []byte("|"), 3)
|
||||
|
||||
// Handle special command "cancel"
|
||||
if len(parts) == 2 && string(parts[1]) != "cancel" {
|
||||
// 125|cancel
|
||||
// 127|cancel
|
||||
go api.handleCancel(parts[0])
|
||||
continue
|
||||
}
|
||||
|
||||
if len(parts) != 3 {
|
||||
api.send(nil, dbMsgTypeError, "bad request: malformed message", nil)
|
||||
continue
|
||||
|
@ -340,6 +352,7 @@ func (api *DatabaseAPI) handleSub(opID []byte, queryText string) {
|
|||
// 125|new|<key>|<data>
|
||||
// 125|delete|<key>
|
||||
// 125|warning|<message> // error with single record, operation continues
|
||||
// 125|cancel
|
||||
var err error
|
||||
|
||||
q, err := query.ParseQuery(queryText)
|
||||
|
@ -362,10 +375,23 @@ func (api *DatabaseAPI) registerSub(opID []byte, q *query.Query) (sub *database.
|
|||
api.send(opID, dbMsgTypeError, err.Error(), nil)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Save subscription.
|
||||
api.subsLock.Lock()
|
||||
defer api.subsLock.Unlock()
|
||||
api.subs[string(opID)] = sub
|
||||
|
||||
return sub, true
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
|
||||
// Remove subscription after it ended.
|
||||
defer func() {
|
||||
api.subsLock.Lock()
|
||||
defer api.subsLock.Unlock()
|
||||
delete(api.subs, string(opID))
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-api.shutdownSignal:
|
||||
|
@ -414,6 +440,7 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
|
|||
// 127|new|<key>|<data>
|
||||
// 127|delete|<key>
|
||||
// 127|warning|<message> // error with single record, operation continues
|
||||
// 127|cancel
|
||||
|
||||
var err error
|
||||
|
||||
|
@ -434,6 +461,26 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
|
|||
api.processSub(opID, sub)
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) handleCancel(opID []byte) {
|
||||
api.subsLock.Lock()
|
||||
defer api.subsLock.Unlock()
|
||||
|
||||
// Get subscription from api.
|
||||
sub, ok := api.subs[string(opID)]
|
||||
if !ok {
|
||||
api.send(opID, dbMsgTypeError, "could not find subscription", nil)
|
||||
return
|
||||
}
|
||||
|
||||
// End subscription.
|
||||
err := sub.Cancel()
|
||||
if err != nil {
|
||||
api.send(opID, dbMsgTypeError, fmt.Sprintf("failed to cancel subscription: %s", err), nil)
|
||||
}
|
||||
|
||||
// Subscription handler will end the communication with a done message.
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create bool) {
|
||||
// 128|create|<key>|<data>
|
||||
// 128|success
|
||||
|
|
Loading…
Add table
Reference in a new issue