mirror of
https://github.com/safing/portbase
synced 2025-09-01 01:59:48 +00:00
Clean up API code, use update and new message types for sending subscription events
This commit is contained in:
parent
d16cb1ebf2
commit
e0535421e4
3 changed files with 17 additions and 18 deletions
|
@ -101,6 +101,7 @@ func authMiddleware(next http.Handler) http.Handler {
|
|||
Name: cookieName,
|
||||
Value: tokenString,
|
||||
HttpOnly: true,
|
||||
MaxAge: int(cookieTTL.Seconds()),
|
||||
})
|
||||
|
||||
// serve
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.com/safing/portbase/log"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
|
@ -36,7 +35,6 @@ type Client struct {
|
|||
operations map[string]*Operation
|
||||
nextOpID uint64
|
||||
|
||||
wsConn *websocket.Conn
|
||||
lastError string
|
||||
}
|
||||
|
||||
|
@ -73,12 +71,12 @@ func (c *Client) Connect() error {
|
|||
func (c *Client) StayConnected() {
|
||||
log.Infof("client: connecting to Portmaster at %s", c.server)
|
||||
|
||||
c.Connect()
|
||||
_ = c.Connect()
|
||||
for {
|
||||
select {
|
||||
case <-time.After(backOffTimer):
|
||||
log.Infof("client: reconnecting...")
|
||||
c.Connect()
|
||||
_ = c.Connect()
|
||||
case <-c.shutdownSignal:
|
||||
return
|
||||
}
|
||||
|
|
|
@ -191,7 +191,7 @@ func (api *DatabaseAPI) writer() {
|
|||
select {
|
||||
// prioritize direct writes
|
||||
case data = <-api.sendQueue:
|
||||
if data == nil || len(data) == 0 {
|
||||
if len(data) == 0 {
|
||||
api.shutdown()
|
||||
return
|
||||
}
|
||||
|
@ -242,8 +242,9 @@ func (api *DatabaseAPI) handleGet(opID []byte, key string) {
|
|||
r, err := api.db.Get(key)
|
||||
if err == nil {
|
||||
data, err = r.Marshal(r, record.JSON)
|
||||
} else {
|
||||
api.send(opID, dbMsgTypeError, err.Error(), nil)
|
||||
}
|
||||
if err == nil {
|
||||
api.send(opID, dbMsgTypeError, err.Error(), nil) //nolint:nilness // FIXME: possibly false positive (golangci-lint govet/nilness)
|
||||
return
|
||||
}
|
||||
api.send(opID, dbMsgTypeOk, r.Key(), data)
|
||||
|
@ -357,7 +358,7 @@ func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
|
|||
select {
|
||||
case <-api.shutdownSignal:
|
||||
// cancel sub and return
|
||||
sub.Cancel()
|
||||
_ = sub.Cancel()
|
||||
return
|
||||
case r := <-sub.Feed:
|
||||
// process sub feed
|
||||
|
@ -373,17 +374,19 @@ func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
|
|||
// TODO: use upd, new and delete msgTypes
|
||||
r.Lock()
|
||||
isDeleted := r.Meta().IsDeleted()
|
||||
new := r.Meta().Created == r.Meta().Modified
|
||||
r.Unlock()
|
||||
if isDeleted {
|
||||
switch {
|
||||
case isDeleted:
|
||||
api.send(opID, dbMsgTypeDel, r.Key(), nil)
|
||||
} else {
|
||||
case new:
|
||||
api.send(opID, dbMsgTypeNew, r.Key(), data)
|
||||
default:
|
||||
api.send(opID, dbMsgTypeUpd, r.Key(), data)
|
||||
}
|
||||
} else {
|
||||
} else if sub.Err != nil {
|
||||
// sub feed ended
|
||||
if sub.Err != nil {
|
||||
api.send(opID, dbMsgTypeError, sub.Err.Error(), nil)
|
||||
}
|
||||
api.send(opID, dbMsgTypeError, sub.Err.Error(), nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -489,10 +492,7 @@ func (api *DatabaseAPI) handleInsert(opID []byte, key string, data []byte) {
|
|||
return false
|
||||
}
|
||||
insertError = acc.Set(key.String(), value.Value())
|
||||
if insertError != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
return insertError == nil
|
||||
})
|
||||
|
||||
if insertError != nil {
|
||||
|
|
Loading…
Add table
Reference in a new issue