Merge pull request #126 from safing/feature/patch-set-1

Improvements for notifications and events
This commit is contained in:
Daniel 2021-05-13 17:01:46 +02:00 committed by GitHub
commit 45a589c574
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1027 additions and 206 deletions

171
api/api_bridge.go Normal file
View file

@ -0,0 +1,171 @@
package api
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"path"
"strings"
"sync"
"github.com/safing/portbase/database"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
)
const (
endpointBridgeRemoteAddress = "websocket-bridge"
apiDatabaseName = "api"
)
func registerEndpointBridgeDB() error {
if _, err := database.Register(&database.Database{
Name: apiDatabaseName,
Description: "API Bridge",
StorageType: "injected",
}); err != nil {
return err
}
_, err := database.InjectDatabase("api", &endpointBridgeStorage{})
return err
}
type endpointBridgeStorage struct {
storage.InjectBase
}
type EndpointBridgeRequest struct {
record.Base
sync.Mutex
Method string
Path string
Query map[string]string
Data []byte
MimeType string
}
type EndpointBridgeResponse struct {
record.Base
sync.Mutex
MimeType string
Body string
}
// Get returns a database record.
func (ebs *endpointBridgeStorage) Get(key string) (record.Record, error) {
if key == "" {
return nil, database.ErrNotFound
}
return callAPI(&EndpointBridgeRequest{
Method: http.MethodGet,
Path: key,
})
}
// Get returns the metadata of a database record.
func (ebs *endpointBridgeStorage) GetMeta(key string) (*record.Meta, error) {
// This interface is an API, always return a fresh copy.
m := &record.Meta{}
m.Update()
return m, nil
}
// Put stores a record in the database.
func (ebs *endpointBridgeStorage) Put(r record.Record) (record.Record, error) {
if r.DatabaseKey() == "" {
return nil, database.ErrNotFound
}
// Prepare data.
var ebr *EndpointBridgeRequest
if r.IsWrapped() {
// Only allocate a new struct, if we need it.
ebr = &EndpointBridgeRequest{}
err := record.Unwrap(r, ebr)
if err != nil {
return nil, err
}
} else {
var ok bool
ebr, ok = r.(*EndpointBridgeRequest)
if !ok {
return nil, fmt.Errorf("record not of type *EndpointBridgeRequest, but %T", r)
}
}
// Override path with key to mitigate sneaky stuff.
ebr.Path = r.DatabaseKey()
return callAPI(ebr)
}
// ReadOnly returns whether the database is read only.
func (ebs *endpointBridgeStorage) ReadOnly() bool {
return false
}
func callAPI(ebr *EndpointBridgeRequest) (record.Record, error) {
// Add API prefix to path.
requestURL := path.Join(apiV1Path, ebr.Path)
// Check if path is correct. (Defense in depth)
if !strings.HasPrefix(requestURL, apiV1Path) {
return nil, fmt.Errorf("bridged request for %q violates scope", ebr.Path)
}
// Apply default Method.
if ebr.Method == "" {
if len(ebr.Data) > 0 {
ebr.Method = http.MethodPost
} else {
ebr.Method = http.MethodGet
}
}
// Build URL.
u, err := url.ParseRequestURI(requestURL)
if err != nil {
return nil, fmt.Errorf("failed to build bridged request url: %w", err)
}
// Build query values.
if ebr.Query != nil && len(ebr.Query) > 0 {
query := url.Values{}
for k, v := range ebr.Query {
query.Set(k, v)
}
u.RawQuery = query.Encode()
}
// Create request and response objects.
r := httptest.NewRequest(ebr.Method, u.String(), bytes.NewBuffer(ebr.Data))
r.RemoteAddr = endpointBridgeRemoteAddress
if ebr.MimeType != "" {
r.Header.Set("Content-Type", ebr.MimeType)
}
w := httptest.NewRecorder()
// Let the API handle the request.
server.Handler.ServeHTTP(w, r)
switch w.Code {
case 200:
// Everything okay, continue.
case 500:
// A Go error was returned internally.
// We can safely return this as an error.
return nil, fmt.Errorf("bridged api call failed: %s", w.Body.String())
default:
return nil, fmt.Errorf("bridged api call returned unexpected error code %d", w.Code)
}
response := &EndpointBridgeResponse{
MimeType: w.Result().Header.Get("Content-Type"),
Body: w.Body.String(),
}
response.SetKey(apiDatabaseName + ":" + ebr.Path)
response.UpdateMeta()
return response, nil
}

View file

@ -250,6 +250,14 @@ func checkAuth(w http.ResponseWriter, r *http.Request, authRequired bool) (token
}, false }, false
} }
// Database Bridge Access.
if r.RemoteAddr == endpointBridgeRemoteAddress {
return &AuthToken{
Read: dbCompatibilityPermission,
Write: dbCompatibilityPermission,
}, false
}
// Check for valid API key. // Check for valid API key.
token = checkAPIKey(r) token = checkAPIKey(r)
if token != nil { if token != nil {

View file

@ -4,7 +4,6 @@ import (
"flag" "flag"
"github.com/safing/portbase/config" "github.com/safing/portbase/config"
"github.com/safing/portbase/log"
) )
// Config Keys. // Config Keys.
@ -24,13 +23,12 @@ var (
) )
func init() { func init() {
flag.StringVar(&listenAddressFlag, "api-address", "", "override api listen address") flag.StringVar(
} &listenAddressFlag,
"api-address",
func logFlagOverrides() { "",
if listenAddressFlag != "" { "set api listen address; configuration is stronger",
log.Warning("api: api/listenAddress default config is being overridden by -api-address flag") )
}
} }
func getDefaultListenAddress() string { func getDefaultListenAddress() string {

View file

@ -2,6 +2,7 @@ package api
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -39,6 +40,7 @@ const (
var ( var (
dbAPISeperatorBytes = []byte(dbAPISeperator) dbAPISeperatorBytes = []byte(dbAPISeperator)
dbCompatibilityPermission = PermitAdmin
) )
func init() { func init() {
@ -46,8 +48,8 @@ func init() {
startDatabaseAPI, startDatabaseAPI,
// Default to admin read/write permissions until the database gets support // Default to admin read/write permissions until the database gets support
// for api permissions. // for api permissions.
PermitAdmin, dbCompatibilityPermission,
PermitAdmin, dbCompatibilityPermission,
)) ))
} }
@ -96,13 +98,13 @@ func startDatabaseAPI(w http.ResponseWriter, r *http.Request) {
db: database.NewInterface(nil), db: database.NewInterface(nil),
} }
go new.handler() module.StartWorker("database api handler", new.handler)
go new.writer() module.StartWorker("database api writer", new.writer)
log.Tracer(r.Context()).Infof("api request: init websocket %s %s", r.RemoteAddr, r.RequestURI) log.Tracer(r.Context()).Infof("api request: init websocket %s %s", r.RemoteAddr, r.RequestURI)
} }
func (api *DatabaseAPI) handler() { func (api *DatabaseAPI) handler(context.Context) error {
// 123|get|<key> // 123|get|<key>
// 123|ok|<key>|<data> // 123|ok|<key>|<data>
@ -146,19 +148,7 @@ func (api *DatabaseAPI) handler() {
_, msg, err := api.conn.ReadMessage() _, msg, err := api.conn.ReadMessage()
if err != nil { if err != nil {
if !api.shuttingDown.IsSet() { return api.shutdown(err)
api.shutdown()
if websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
) {
log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr())
} else {
log.Warningf("api: websocket read error from %s: %s", api.conn.RemoteAddr(), err)
}
}
return
} }
parts := bytes.SplitN(msg, []byte("|"), 3) parts := bytes.SplitN(msg, []byte("|"), 3)
@ -218,29 +208,39 @@ func (api *DatabaseAPI) handler() {
} }
} }
func (api *DatabaseAPI) writer() { func (api *DatabaseAPI) writer(ctx context.Context) error {
var data []byte var data []byte
var err error var err error
for { for {
data = nil
select { select {
// prioritize direct writes // prioritize direct writes
case data = <-api.sendQueue: case data = <-api.sendQueue:
if len(data) == 0 { if len(data) == 0 {
api.shutdown() return api.shutdown(nil)
return
} }
case <-ctx.Done():
return api.shutdown(nil)
case <-api.shutdownSignal: case <-api.shutdownSignal:
return return api.shutdown(nil)
} }
// log.Tracef("api: sending %s", string(*msg)) // log.Tracef("api: sending %s", string(*msg))
err = api.conn.WriteMessage(websocket.BinaryMessage, data) err = api.conn.WriteMessage(websocket.BinaryMessage, data)
if err != nil { if err != nil {
if !api.shuttingDown.IsSet() { return api.shutdown(err)
api.shutdown() }
}
}
func (api *DatabaseAPI) shutdown(err error) error {
// Check if we are the first to shut down.
if !api.shuttingDown.SetToIf(false, true) {
return nil
}
// Check the given error.
if err != nil {
if websocket.IsCloseError(err, if websocket.IsCloseError(err,
websocket.CloseNormalClosure, websocket.CloseNormalClosure,
websocket.CloseGoingAway, websocket.CloseGoingAway,
@ -248,13 +248,14 @@ func (api *DatabaseAPI) writer() {
) { ) {
log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr()) log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr())
} else { } else {
log.Warningf("api: websocket write error to %s: %s", api.conn.RemoteAddr(), err) log.Warningf("api: websocket connection error with %s: %s", api.conn.RemoteAddr(), err)
} }
} }
return
}
} // Trigger shutdown.
close(api.shutdownSignal)
api.conn.Close()
return nil
} }
func (api *DatabaseAPI) send(opID []byte, msgType string, msgOrKey string, data []byte) { func (api *DatabaseAPI) send(opID []byte, msgType string, msgOrKey string, data []byte) {
@ -622,13 +623,6 @@ func (api *DatabaseAPI) handleDelete(opID []byte, key string) {
api.send(opID, dbMsgTypeSuccess, emptyString, nil) api.send(opID, dbMsgTypeSuccess, emptyString, nil)
} }
func (api *DatabaseAPI) shutdown() {
if api.shuttingDown.SetToIf(false, true) {
close(api.shutdownSignal)
api.conn.Close()
}
}
// marsharlRecords locks and marshals the given record, additionally adding // marsharlRecords locks and marshals the given record, additionally adding
// metadata and returning it as json. // metadata and returning it as json.
func marshalRecord(r record.Record, withDSDIdentifier bool) ([]byte, error) { func marshalRecord(r record.Record, withDSDIdentifier bool) ([]byte, error) {

View file

@ -11,6 +11,8 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/gorilla/mux"
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
"github.com/safing/portbase/log" "github.com/safing/portbase/log"
) )
@ -84,6 +86,7 @@ func init() {
var ( var (
endpoints = make(map[string]*Endpoint) endpoints = make(map[string]*Endpoint)
endpointsMux = mux.NewRouter()
endpointsLock sync.RWMutex endpointsLock sync.RWMutex
// ErrInvalidEndpoint is returned when an invalid endpoint is registered. // ErrInvalidEndpoint is returned when an invalid endpoint is registered.
@ -106,16 +109,26 @@ func getAPIContext(r *http.Request) (apiEndpoint *Endpoint, apiRequest *Request)
return apiEndpoint, apiRequest return apiEndpoint, apiRequest
} }
// If not, get the action from the registry.
endpointPath, ok := apiRequest.URLVars["endpointPath"]
if !ok {
return nil, apiRequest
}
endpointsLock.RLock() endpointsLock.RLock()
defer endpointsLock.RUnlock() defer endpointsLock.RUnlock()
apiEndpoint, ok = endpoints[endpointPath] // Get handler for request.
// Gorilla does not support handling this on our own very well.
// See github.com/gorilla/mux.ServeHTTP for reference.
var match mux.RouteMatch
var handler http.Handler
if endpointsMux.Match(r, &match) {
handler = match.Handler
apiRequest.Route = match.Route
// Add/Override variables instead of replacing.
for k, v := range match.Vars {
apiRequest.URLVars[k] = v
}
} else {
return nil, apiRequest
}
apiEndpoint, ok = handler.(*Endpoint)
if ok { if ok {
// Cache for next operation. // Cache for next operation.
apiRequest.HandlerCache = apiEndpoint apiRequest.HandlerCache = apiEndpoint
@ -139,6 +152,7 @@ func RegisterEndpoint(e Endpoint) error {
} }
endpoints[e.Path] = &e endpoints[e.Path] = &e
endpointsMux.Handle(apiV1Path+e.Path, &e)
return nil return nil
} }
@ -243,6 +257,17 @@ func (eh *endpointHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
apiEndpoint.ServeHTTP(w, r)
}
// ServeHTTP handles the http request.
func (e *Endpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_, apiRequest := getAPIContext(r)
if apiRequest == nil {
http.NotFound(w, r)
return
}
switch r.Method { switch r.Method {
case http.MethodHead: case http.MethodHead:
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@ -260,7 +285,7 @@ func (eh *endpointHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
return return
default: default:
http.Error(w, "Unsupported method for the actions API.", http.StatusMethodNotAllowed) http.Error(w, "unsupported method for the actions API", http.StatusMethodNotAllowed)
return return
} }
@ -269,47 +294,47 @@ func (eh *endpointHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error var err error
switch { switch {
case apiEndpoint.ActionFunc != nil: case e.ActionFunc != nil:
var msg string var msg string
msg, err = apiEndpoint.ActionFunc(apiRequest) msg, err = e.ActionFunc(apiRequest)
if err == nil { if err == nil {
responseData = []byte(msg) responseData = []byte(msg)
} }
case apiEndpoint.DataFunc != nil: case e.DataFunc != nil:
responseData, err = apiEndpoint.DataFunc(apiRequest) responseData, err = e.DataFunc(apiRequest)
case apiEndpoint.StructFunc != nil: case e.StructFunc != nil:
var v interface{} var v interface{}
v, err = apiEndpoint.StructFunc(apiRequest) v, err = e.StructFunc(apiRequest)
if err == nil && v != nil { if err == nil && v != nil {
responseData, err = json.Marshal(v) responseData, err = json.Marshal(v)
} }
case apiEndpoint.RecordFunc != nil: case e.RecordFunc != nil:
var rec record.Record var rec record.Record
rec, err = apiEndpoint.RecordFunc(apiRequest) rec, err = e.RecordFunc(apiRequest)
if err == nil && r != nil { if err == nil && r != nil {
responseData, err = marshalRecord(rec, false) responseData, err = marshalRecord(rec, false)
} }
case apiEndpoint.HandlerFunc != nil: case e.HandlerFunc != nil:
apiEndpoint.HandlerFunc(w, r) e.HandlerFunc(w, r)
return return
default: default:
http.Error(w, "Internal server error: Missing handler.", http.StatusInternalServerError) http.Error(w, "missing handler", http.StatusInternalServerError)
return return
} }
// Check for handler error. // Check for handler error.
if err != nil { if err != nil {
http.Error(w, "Internal server error: "+err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
// Write response. // Write response.
w.Header().Set("Content-Type", apiEndpoint.MimeType+"; charset=utf-8") w.Header().Set("Content-Type", e.MimeType+"; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(responseData))) w.Header().Set("Content-Length", strconv.Itoa(len(responseData)))
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
_, err = w.Write(responseData) _, err = w.Write(responseData)
@ -321,14 +346,14 @@ func (eh *endpointHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func readBody(w http.ResponseWriter, r *http.Request) (inputData []byte, ok bool) { func readBody(w http.ResponseWriter, r *http.Request) (inputData []byte, ok bool) {
// Check for too long content in order to prevent death. // Check for too long content in order to prevent death.
if r.ContentLength > 20000000 { // 20MB if r.ContentLength > 20000000 { // 20MB
http.Error(w, "Too much input data.", http.StatusRequestEntityTooLarge) http.Error(w, "too much input data", http.StatusRequestEntityTooLarge)
return nil, false return nil, false
} }
// Read and close body. // Read and close body.
inputData, err := ioutil.ReadAll(r.Body) inputData, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
http.Error(w, "Failed to read body: "+err.Error(), http.StatusInternalServerError) http.Error(w, "failed to read body"+err.Error(), http.StatusInternalServerError)
return nil, false return nil, false
} }
return inputData, true return inputData, true

View file

@ -51,6 +51,7 @@ func registerMetaEndpoints() error {
if err := RegisterEndpoint(Endpoint{ if err := RegisterEndpoint(Endpoint{
Path: "auth/reset", Path: "auth/reset",
Read: PermitAnyone, Read: PermitAnyone,
Write: PermitAnyone,
HandlerFunc: authReset, HandlerFunc: authReset,
Name: "Reset Authenticated Session", Name: "Reset Authenticated Session",
Description: "Resets authentication status internally and in the browser.", Description: "Resets authentication status internally and in the browser.",

36
api/endpoints_modules.go Normal file
View file

@ -0,0 +1,36 @@
package api
import (
"errors"
"fmt"
)
func registerModulesEndpoints() error {
if err := RegisterEndpoint(Endpoint{
Path: "modules/{moduleName:.+}/trigger/{eventName:.+}",
Write: PermitSelf,
ActionFunc: triggerEvent,
Name: "Export Configuration Options",
Description: "Returns a list of all registered configuration options and their metadata. This does not include the current active or default settings.",
}); err != nil {
return err
}
return nil
}
func triggerEvent(ar *Request) (msg string, err error) {
// Get parameters.
moduleName := ar.URLVars["moduleName"]
eventName := ar.URLVars["eventName"]
if moduleName == "" || eventName == "" {
return "", errors.New("invalid parameters")
}
// Inject event.
if err := module.InjectEvent("api event injection", moduleName, eventName, nil); err != nil {
return "", fmt.Errorf("failed to inject event: %w", err)
}
return "event successfully injected", nil
}

View file

@ -50,11 +50,14 @@ func prep() error {
return err return err
} }
if err := registerModulesEndpoints(); err != nil {
return err
}
return registerMetaEndpoints() return registerMetaEndpoints()
} }
func start() error { func start() error {
logFlagOverrides()
go Serve() go Serve()
_ = updateAPIKeys(module.Ctx, nil) _ = updateAPIKeys(module.Ctx, nil)
@ -68,7 +71,7 @@ func start() error {
module.NewTask("clean api sessions", cleanSessions).Repeat(5 * time.Minute) module.NewTask("clean api sessions", cleanSessions).Repeat(5 * time.Minute)
} }
return nil return registerEndpointBridgeDB()
} }
func stop() error { func stop() error {

View file

@ -18,7 +18,7 @@ var (
) )
func init() { func init() {
flag.BoolVar(&defaultDevMode, "devmode", false, "enable development mode") flag.BoolVar(&defaultDevMode, "devmode", false, "enable development mode; configuration is stronger")
} }
func registerBasicOptions() error { func registerBasicOptions() error {

View file

@ -32,7 +32,7 @@ func SetDataRoot(root *utils.DirStructure) {
func init() { func init() {
module = modules.Register("config", prep, start, nil, "database") module = modules.Register("config", prep, start, nil, "database")
module.RegisterEvent(configChangeEvent) module.RegisterEvent(configChangeEvent, true)
flag.BoolVar(&exportConfig, "export-config-options", false, "export configuration registry and exit") flag.BoolVar(&exportConfig, "export-config-options", false, "export configuration registry and exit")
} }

View file

@ -42,7 +42,7 @@ func (c *Controller) Injected() bool {
return c.storage.Injected() return c.storage.Injected()
} }
// Get return the record with the given key. // Get returns the record with the given key.
func (c *Controller) Get(key string) (record.Record, error) { func (c *Controller) Get(key string) (record.Record, error) {
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return nil, ErrShuttingDown return nil, ErrShuttingDown
@ -55,7 +55,7 @@ func (c *Controller) Get(key string) (record.Record, error) {
r, err := c.storage.Get(key) r, err := c.storage.Get(key)
if err != nil { if err != nil {
// replace not found error // replace not found error
if err == storage.ErrNotFound { if errors.Is(err, storage.ErrNotFound) {
return nil, ErrNotFound return nil, ErrNotFound
} }
return nil, err return nil, err
@ -76,6 +76,42 @@ func (c *Controller) Get(key string) (record.Record, error) {
return r, nil return r, nil
} }
// Get returns the metadata of the record with the given key.
func (c *Controller) GetMeta(key string) (*record.Meta, error) {
if shuttingDown.IsSet() {
return nil, ErrShuttingDown
}
var m *record.Meta
var err error
if metaDB, ok := c.storage.(storage.MetaHandler); ok {
m, err = metaDB.GetMeta(key)
if err != nil {
// replace not found error
if errors.Is(err, storage.ErrNotFound) {
return nil, ErrNotFound
}
return nil, err
}
} else {
r, err := c.storage.Get(key)
if err != nil {
// replace not found error
if errors.Is(err, storage.ErrNotFound) {
return nil, ErrNotFound
}
return nil, err
}
m = r.Meta()
}
if !m.CheckValidity() {
return nil, ErrNotFound
}
return m, nil
}
// Put saves a record in the database, executes any registered // Put saves a record in the database, executes any registered
// pre-put hooks and finally send an update to all subscribers. // pre-put hooks and finally send an update to all subscribers.
// The record must be locked and secured from concurrent access // The record must be locked and secured from concurrent access

View file

@ -201,6 +201,40 @@ func (i *Interface) getRecord(dbName string, dbKey string, mustBeWriteable bool)
return r, db, nil return r, db, nil
} }
func (i *Interface) getMeta(dbName string, dbKey string, mustBeWriteable bool) (m *record.Meta, db *Controller, err error) {
if dbName == "" {
dbName, dbKey = record.ParseKey(dbKey)
}
db, err = getController(dbName)
if err != nil {
return nil, nil, err
}
if mustBeWriteable && db.ReadOnly() {
return nil, db, ErrReadOnly
}
r := i.checkCache(dbName + ":" + dbKey)
if r != nil {
if !i.options.hasAccessPermission(r) {
return nil, db, ErrPermissionDenied
}
return r.Meta(), db, nil
}
m, err = db.GetMeta(dbKey)
if err != nil {
return nil, db, err
}
if !m.CheckPermission(i.options.Local, i.options.Internal) {
return nil, db, ErrPermissionDenied
}
return m, db, nil
}
// InsertValue inserts a value into a record. // InsertValue inserts a value into a record.
func (i *Interface) InsertValue(key string, attribute string, value interface{}) error { func (i *Interface) InsertValue(key string, attribute string, value interface{}) error {
r, db, err := i.getRecord(getDBFromKey, key, true) r, db, err := i.getRecord(getDBFromKey, key, true)
@ -236,7 +270,7 @@ func (i *Interface) Put(r record.Record) (err error) {
// get record or only database // get record or only database
var db *Controller var db *Controller
if !i.options.HasAllPermissions() { if !i.options.HasAllPermissions() {
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true) _, db, err = i.getMeta(r.DatabaseName(), r.DatabaseKey(), true)
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {
return err return err
} }
@ -247,7 +281,7 @@ func (i *Interface) Put(r record.Record) (err error) {
} }
} }
// Check if database is read only before we add to the cache. // Check if database is read only.
if db.ReadOnly() { if db.ReadOnly() {
return ErrReadOnly return ErrReadOnly
} }
@ -274,7 +308,7 @@ func (i *Interface) PutNew(r record.Record) (err error) {
// get record or only database // get record or only database
var db *Controller var db *Controller
if !i.options.HasAllPermissions() { if !i.options.HasAllPermissions() {
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true) _, db, err = i.getMeta(r.DatabaseName(), r.DatabaseKey(), true)
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {
return err return err
} }
@ -285,6 +319,11 @@ func (i *Interface) PutNew(r record.Record) (err error) {
} }
} }
// Check if database is read only.
if db.ReadOnly() {
return ErrReadOnly
}
r.Lock() r.Lock()
if r.Meta() != nil { if r.Meta() != nil {
r.Meta().Reset() r.Meta().Reset()
@ -328,6 +367,13 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
} }
} }
// Check if database is read only.
if db.ReadOnly() {
return func(r record.Record) error {
return ErrReadOnly
}
}
// start database access // start database access
dbBatch, errs := db.PutMany() dbBatch, errs := db.PutMany()
finished := abool.New() finished := abool.New()
@ -462,6 +508,11 @@ func (i *Interface) Delete(key string) error {
return err return err
} }
// Check if database is read only.
if db.ReadOnly() {
return ErrReadOnly
}
i.options.Apply(r) i.options.Apply(r)
r.Meta().Delete() r.Meta().Delete()
return db.Put(r) return db.Put(r)
@ -495,6 +546,11 @@ func (i *Interface) Purge(ctx context.Context, q *query.Query) (int, error) {
return 0, err return 0, err
} }
// Check if database is read only before we add to the cache.
if db.ReadOnly() {
return 0, ErrReadOnly
}
return db.Purge(ctx, q, i.options.Local, i.options.Internal) return db.Purge(ctx, q, i.options.Local, i.options.Internal)
} }

View file

@ -22,7 +22,7 @@ Please note that some feeders may have other special characters. It is advised t
## Operators ## Operators
| Name | Textual | Req. Type | Internal Type | Compared with | | Name | Textual | Req. Type | Internal Type | Compared with |
|---|---|---|---| |-------------------------|--------------------|-----------|---------------|---------------------------|
| Equals | `==` | int | int64 | `==` | | Equals | `==` | int | int64 | `==` |
| GreaterThan | `>` | int | int64 | `>` | | GreaterThan | `>` | int | int64 | `>` |
| GreaterThanOrEqual | `>=` | int | int64 | `>=` | | GreaterThanOrEqual | `>=` | int | int64 | `>=` |
@ -38,11 +38,11 @@ Please note that some feeders may have other special characters. It is advised t
| StartsWith | `startswith`, `sw` | string | string | `strings.HasPrefix()` | | StartsWith | `startswith`, `sw` | string | string | `strings.HasPrefix()` |
| EndsWith | `endswith`, `ew` | string | string | `strings.HasSuffix()` | | EndsWith | `endswith`, `ew` | string | string | `strings.HasSuffix()` |
| In | `in` | string | string | for loop with `==` | | In | `in` | string | string | for loop with `==` |
| Matches | `matches`, `re` | string | int64 | `regexp.Regexp.Matches()` | | Matches | `matches`, `re` | string | string | `regexp.Regexp.Matches()` |
| Is | `is` | bool* | bool | `==` | | Is | `is` | bool* | bool | `==` |
| Exists | `exists`, `ex` | any | n/a | n/a | | Exists | `exists`, `ex` | any | n/a | n/a |
\*accepts strings: 1, t, T, TRUE, true, True, 0, f, F, FALSE \*accepts strings: 1, t, T, true, True, TRUE, 0, f, F, false, False, FALSE
## Escaping ## Escaping

View file

@ -25,7 +25,7 @@ var (
registry = make(map[string]*Database) registry = make(map[string]*Database)
registryLock sync.Mutex registryLock sync.Mutex
nameConstraint = regexp.MustCompile("^[A-Za-z0-9_-]{4,}$") nameConstraint = regexp.MustCompile("^[A-Za-z0-9_-]{3,}$")
) )
// Register registers a new database. // Register registers a new database.
@ -56,7 +56,7 @@ func Register(new *Database) (*Database, error) {
} else { } else {
// register new database // register new database
if !nameConstraint.MatchString(new.Name) { if !nameConstraint.MatchString(new.Name) {
return nil, errors.New("database name must only contain alphanumeric and `_-` characters and must be at least 4 characters long") return nil, errors.New("database name must only contain alphanumeric and `_-` characters and must be at least 3 characters long")
} }
now := time.Now().Round(time.Second) now := time.Now().Round(time.Second)

View file

@ -82,6 +82,18 @@ func (b *Badger) Get(key string) (record.Record, error) {
return m, nil return m, nil
} }
// GetMeta returns the metadata of a database record.
func (b *Badger) GetMeta(key string) (*record.Meta, error) {
// TODO: Replace with more performant variant.
r, err := b.Get(key)
if err != nil {
return nil, err
}
return r.Meta(), nil
}
// Put stores a record in the database. // Put stores a record in the database.
func (b *Badger) Put(r record.Record) (record.Record, error) { func (b *Badger) Put(r record.Record) (record.Record, error) {
data, err := r.MarshalRecord(r) data, err := r.MarshalRecord(r)

View file

@ -96,6 +96,18 @@ func (b *BBolt) Get(key string) (record.Record, error) {
return r, nil return r, nil
} }
// GetMeta returns the metadata of a database record.
func (b *BBolt) GetMeta(key string) (*record.Meta, error) {
// TODO: Replace with more performant variant.
r, err := b.Get(key)
if err != nil {
return nil, err
}
return r.Meta(), nil
}
// Put stores a record in the database. // Put stores a record in the database.
func (b *BBolt) Put(r record.Record) (record.Record, error) { func (b *BBolt) Put(r record.Record) (record.Record, error) {
data, err := r.MarshalRecord(r) data, err := r.MarshalRecord(r)

View file

@ -104,6 +104,18 @@ func (fst *FSTree) Get(key string) (record.Record, error) {
return r, nil return r, nil
} }
// GetMeta returns the metadata of a database record.
func (fst *FSTree) GetMeta(key string) (*record.Meta, error) {
// TODO: Replace with more performant variant.
r, err := fst.Get(key)
if err != nil {
return nil, err
}
return r.Meta(), nil
}
// Put stores a record in the database. // Put stores a record in the database.
func (fst *FSTree) Put(r record.Record) (record.Record, error) { func (fst *FSTree) Put(r record.Record) (record.Record, error) {
dstPath, err := fst.buildFilePath(r.DatabaseKey(), true) dstPath, err := fst.buildFilePath(r.DatabaseKey(), true)

View file

@ -44,6 +44,18 @@ func (hm *HashMap) Get(key string) (record.Record, error) {
return r, nil return r, nil
} }
// GetMeta returns the metadata of a database record.
func (hm *HashMap) GetMeta(key string) (*record.Meta, error) {
// TODO: Replace with more performant variant.
r, err := hm.Get(key)
if err != nil {
return nil, err
}
return r.Meta(), nil
}
// Put stores a record in the database. // Put stores a record in the database.
func (hm *HashMap) Put(r record.Record) (record.Record, error) { func (hm *HashMap) Put(r record.Record) (record.Record, error) {
hm.dbLock.Lock() hm.dbLock.Lock()

View file

@ -11,7 +11,8 @@ import (
) )
var ( var (
errNotImplemented = errors.New("not implemented") // ErrNotImplemented is returned when a function is not implemented by a storage.
ErrNotImplemented = errors.New("not implemented")
) )
// InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces. // InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces.
@ -22,22 +23,27 @@ var _ Interface = &InjectBase{}
// Get returns a database record. // Get returns a database record.
func (i *InjectBase) Get(key string) (record.Record, error) { func (i *InjectBase) Get(key string) (record.Record, error) {
return nil, errNotImplemented return nil, ErrNotImplemented
}
// Get returns a database record.
func (i *InjectBase) GetMeta(key string) (*record.Meta, error) {
return nil, ErrNotImplemented
} }
// Put stores a record in the database. // Put stores a record in the database.
func (i *InjectBase) Put(m record.Record) (record.Record, error) { func (i *InjectBase) Put(m record.Record) (record.Record, error) {
return nil, errNotImplemented return nil, ErrNotImplemented
} }
// Delete deletes a record from the database. // Delete deletes a record from the database.
func (i *InjectBase) Delete(key string) error { func (i *InjectBase) Delete(key string) error {
return errNotImplemented return ErrNotImplemented
} }
// Query returns a an iterator for the supplied query. // Query returns a an iterator for the supplied query.
func (i *InjectBase) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { func (i *InjectBase) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
return nil, errNotImplemented return nil, ErrNotImplemented
} }
// ReadOnly returns whether the database is read only. // ReadOnly returns whether the database is read only.

View file

@ -26,6 +26,11 @@ type Interface interface {
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error
} }
// Maintainer defines the database storage API for backends that support optimized fetching of only the metadata.
type MetaHandler interface {
GetMeta(key string) (*record.Meta, error)
}
// Maintainer defines the database storage API for backends that require regular maintenance. // Maintainer defines the database storage API for backends that require regular maintenance.
type Maintainer interface { type Maintainer interface {
Maintain(ctx context.Context) error Maintain(ctx context.Context) error

View file

@ -44,6 +44,11 @@ func (s *Sinkhole) Get(key string) (record.Record, error) {
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
} }
// GetMeta returns the metadata of a database record.
func (s *Sinkhole) GetMeta(key string) (*record.Meta, error) {
return nil, storage.ErrNotFound
}
// Put stores a record in the database. // Put stores a record in the database.
func (s *Sinkhole) Put(r record.Record) (record.Record, error) { func (s *Sinkhole) Put(r record.Record) (record.Record, error) {
return r, nil return r, nil

View file

@ -21,8 +21,8 @@ var (
) )
func init() { func init() {
flag.StringVar(&pushFlag, "push-metrics", "", "Set default URL to push prometheus metrics to.") flag.StringVar(&pushFlag, "push-metrics", "", "set default URL to push prometheus metrics to")
flag.StringVar(&instanceFlag, "metrics-instance", "", "Set the default global instance label.") flag.StringVar(&instanceFlag, "metrics-instance", "", "set the default global instance label")
} }
func prepConfig() error { func prepConfig() error {

View file

@ -6,8 +6,18 @@ import (
"fmt" "fmt"
"github.com/safing/portbase/log" "github.com/safing/portbase/log"
"github.com/tevino/abool"
) )
type eventHooks struct {
// hooks holds all registed hooks for the event.
hooks []*eventHook
// internal signifies that the event and it's data may not be exposed and may
// only be propagated internally.
internal bool
}
type eventHookFn func(context.Context, interface{}) error type eventHookFn func(context.Context, interface{}) error
type eventHook struct { type eventHook struct {
@ -27,17 +37,26 @@ func (m *Module) processEventTrigger(event string, data interface{}) {
m.eventHooksLock.RLock() m.eventHooksLock.RLock()
defer m.eventHooksLock.RUnlock() defer m.eventHooksLock.RUnlock()
hooks, ok := m.eventHooks[event] eventHooks, ok := m.eventHooks[event]
if !ok { if !ok {
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event) log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event)
return return
} }
for _, hook := range hooks { for _, hook := range eventHooks.hooks {
if hook.hookingModule.OnlineSoon() { if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, event, data) go m.runEventHook(hook, event, data)
} }
} }
// Call subscription function, if set.
if eventSubscriptionFuncReady.IsSet() {
m.StartWorker("event subscription", func(context.Context) error {
// Only use data in worker that won't change anymore.
eventSubscriptionFunc(m.Name, event, eventHooks.internal, data)
return nil
})
}
} }
// InjectEvent triggers an event from a foreign module and executes all hook functions registered to that event. // InjectEvent triggers an event from a foreign module and executes all hook functions registered to that event.
@ -63,12 +82,21 @@ func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName
return fmt.Errorf(`module "%s" has no event named "%s"`, targetModuleName, targetEventName) return fmt.Errorf(`module "%s" has no event named "%s"`, targetModuleName, targetEventName)
} }
for _, hook := range targetHooks { for _, hook := range targetHooks.hooks {
if hook.hookingModule.OnlineSoon() { if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, sourceEventName, data) go m.runEventHook(hook, sourceEventName, data)
} }
} }
// Call subscription function, if set.
if eventSubscriptionFuncReady.IsSet() {
m.StartWorker("event subscription", func(context.Context) error {
// Only use data in worker that won't change anymore.
eventSubscriptionFunc(targetModule.Name, targetEventName, targetHooks.internal, data)
return nil
})
}
return nil return nil
} }
@ -111,13 +139,20 @@ func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
} }
// RegisterEvent registers a new event to allow for registering hooks. // RegisterEvent registers a new event to allow for registering hooks.
func (m *Module) RegisterEvent(event string) { // The expose argument controls whether these events and the attached data may
// be received by external components via APIs. If not exposed, the database
// record that carries the event and it's data will be marked as secret and as
// a crown jewel. Enforcement is left to the database layer.
func (m *Module) RegisterEvent(event string, expose bool) {
m.eventHooksLock.Lock() m.eventHooksLock.Lock()
defer m.eventHooksLock.Unlock() defer m.eventHooksLock.Unlock()
_, ok := m.eventHooks[event] _, ok := m.eventHooks[event]
if !ok { if !ok {
m.eventHooks[event] = make([]*eventHook, 0, 1) m.eventHooks[event] = &eventHooks{
hooks: make([]*eventHook, 0, 1),
internal: !expose,
}
} }
} }
@ -138,16 +173,34 @@ func (m *Module) RegisterEventHook(module string, event string, description stri
// get target event // get target event
eventModule.eventHooksLock.Lock() eventModule.eventHooksLock.Lock()
defer eventModule.eventHooksLock.Unlock() defer eventModule.eventHooksLock.Unlock()
hooks, ok := eventModule.eventHooks[event] eventHooks, ok := eventModule.eventHooks[event]
if !ok { if !ok {
return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event) return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event)
} }
// add hook // add hook
eventModule.eventHooks[event] = append(hooks, &eventHook{ eventHooks.hooks = append(eventHooks.hooks, &eventHook{
description: description, description: description,
hookingModule: m, hookingModule: m,
hookFn: fn, hookFn: fn,
}) })
return nil return nil
} }
// Subscribe to all events
var (
eventSubscriptionFunc func(moduleName, eventName string, internal bool, data interface{})
eventSubscriptionFuncEnabled = abool.NewBool(false)
eventSubscriptionFuncReady = abool.NewBool(false)
)
// SetEventSubscriptionFunc
func SetEventSubscriptionFunc(fn func(moduleName, eventName string, internal bool, data interface{})) bool {
if eventSubscriptionFuncEnabled.SetToIf(false, true) {
eventSubscriptionFunc = fn
eventSubscriptionFuncReady.Set()
return true
}
return false
}

View file

@ -40,6 +40,7 @@ type Module struct { //nolint:maligned // not worth the effort
// failure status // failure status
failureStatus uint8 failureStatus uint8
failureID string failureID string
failureTitle string
failureMsg string failureMsg string
// lifecycle callback functions // lifecycle callback functions
@ -62,7 +63,7 @@ type Module struct { //nolint:maligned // not worth the effort
waitGroup sync.WaitGroup waitGroup sync.WaitGroup
// events // events
eventHooks map[string][]*eventHook eventHooks map[string]*eventHooks
eventHooksLock sync.RWMutex eventHooksLock sync.RWMutex
// dependency mgmt // dependency mgmt
@ -127,8 +128,9 @@ func (m *Module) prep(reports chan *report) {
// set status // set status
if err != nil { if err != nil {
m.Error( m.Error(
"module-failed-prep", fmt.Sprintf("%s:prep-failed", m.Name),
fmt.Sprintf("failed to prep module: %s", err.Error()), fmt.Sprintf("Preparing module %s failed", m.Name),
fmt.Sprintf("Failed to prep module: %s", err.Error()),
) )
} else { } else {
m.Lock() m.Lock()
@ -183,8 +185,9 @@ func (m *Module) start(reports chan *report) {
// set status // set status
if err != nil { if err != nil {
m.Error( m.Error(
"module-failed-start", fmt.Sprintf("%s:start-failed", m.Name),
fmt.Sprintf("failed to start module: %s", err.Error()), fmt.Sprintf("Starting module %s failed", m.Name),
fmt.Sprintf("Failed to start module: %s", err.Error()),
) )
} else { } else {
m.Lock() m.Lock()
@ -270,8 +273,9 @@ func (m *Module) stopAllTasks(reports chan *report) {
// set status // set status
if err != nil { if err != nil {
m.Error( m.Error(
"module-failed-stop", fmt.Sprintf("%s:stop-failed", m.Name),
fmt.Sprintf("failed to stop module: %s", err.Error()), fmt.Sprintf("Stopping module %s failed", m.Name),
fmt.Sprintf("Failed to stop module: %s", err.Error()),
) )
} }
@ -328,7 +332,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
taskCnt: &taskCnt, taskCnt: &taskCnt,
microTaskCnt: &microTaskCnt, microTaskCnt: &microTaskCnt,
waitGroup: sync.WaitGroup{}, waitGroup: sync.WaitGroup{},
eventHooks: make(map[string][]*eventHook), eventHooks: make(map[string]*eventHooks),
depNames: dependencies, depNames: dependencies,
} }

View file

@ -1,5 +1,11 @@
package modules package modules
import (
"context"
"github.com/tevino/abool"
)
// Module Status Values // Module Status Values
const ( const (
StatusDead uint8 = 0 // not prepared, not started StatusDead uint8 = 0 // not prepared, not started
@ -25,6 +31,23 @@ const (
statusNothingToDo statusNothingToDo
) )
var (
failureUpdateNotifyFunc func(moduleFailure uint8, id, title, msg string)
failureUpdateNotifyFuncEnabled = abool.NewBool(false)
failureUpdateNotifyFuncReady = abool.NewBool(false)
)
// SetFailureUpdateNotifyFunc sets a function that is called on every change
// of a module's failure status.
func SetFailureUpdateNotifyFunc(fn func(moduleFailure uint8, id, title, msg string)) bool {
if failureUpdateNotifyFuncEnabled.SetToIf(false, true) {
failureUpdateNotifyFunc = fn
failureUpdateNotifyFuncReady.Set()
return true
}
return false
}
// Online returns whether the module is online. // Online returns whether the module is online.
func (m *Module) Online() bool { func (m *Module) Online() bool {
return m.Status() == StatusOnline return m.Status() == StatusOnline
@ -56,40 +79,80 @@ func (m *Module) FailureStatus() (failureStatus uint8, failureID, failureMsg str
return m.failureStatus, m.failureID, m.failureMsg return m.failureStatus, m.failureID, m.failureMsg
} }
// Hint sets failure status to hint. This is a somewhat special failure status, as the module is believed to be working correctly, but there is an important module specific information to convey. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans. // Hint sets failure status to hint. This is a somewhat special failure status,
func (m *Module) Hint(failureID, failureMsg string) { // as the module is believed to be working correctly, but there is an important
// module specific information to convey. The supplied failureID is for
// improved automatic handling within connected systems, the failureMsg is for
// humans.
// The given ID must be unique for the given title and message. A call to
// Hint(), Warning() or Error() with the same ID as the existing one will be
// ignored.
func (m *Module) Hint(id, title, msg string) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
m.failureStatus = FailureHint m.setFailure(FailureHint, id, title, msg)
m.failureID = failureID
m.failureMsg = failureMsg
m.notifyOfChange()
} }
// Warning sets failure status to warning. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans. // Warning sets failure status to warning. The supplied failureID is for
func (m *Module) Warning(failureID, failureMsg string) { // improved automatic handling within connected systems, the failureMsg is for
// humans.
// The given ID must be unique for the given title and message. A call to
// Hint(), Warning() or Error() with the same ID as the existing one will be
// ignored.
func (m *Module) Warning(id, title, msg string) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
m.failureStatus = FailureWarning m.setFailure(FailureWarning, id, title, msg)
m.failureID = failureID
m.failureMsg = failureMsg
m.notifyOfChange()
} }
// Error sets failure status to error. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans. // Error sets failure status to error. The supplied failureID is for improved
func (m *Module) Error(failureID, failureMsg string) { // automatic handling within connected systems, the failureMsg is for humans.
// The given ID must be unique for the given title and message. A call to
// Hint(), Warning() or Error() with the same ID as the existing one will be
// ignored.
func (m *Module) Error(id, title, msg string) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
m.failureStatus = FailureError m.setFailure(FailureError, id, title, msg)
m.failureID = failureID }
m.failureMsg = failureMsg
func (m *Module) setFailure(status uint8, id, title, msg string) {
// Ignore calls with the same ID.
if id == m.failureID {
return
}
// Copy data for failure status update worker.
resolveFailureID := m.failureID
// Set new failure status.
m.failureStatus = status
m.failureID = id
m.failureTitle = title
m.failureMsg = msg
// Notify of module change.
m.notifyOfChange() m.notifyOfChange()
// Propagate failure status.
if failureUpdateNotifyFuncReady.IsSet() {
m.newTask("failure status updater", func(context.Context, *Task) error {
// Only use data in worker that won't change anymore.
// Resolve previous failure state if available.
if resolveFailureID != "" {
failureUpdateNotifyFunc(FailureNone, resolveFailureID, "", "")
}
// Notify of new failure state.
failureUpdateNotifyFunc(status, id, title, msg)
return nil
}).QueuePrioritized()
}
} }
// Resolve removes the failure state from the module if the given failureID matches the current failure ID. If the given failureID is an empty string, Resolve removes any failure state. // Resolve removes the failure state from the module if the given failureID matches the current failure ID. If the given failureID is an empty string, Resolve removes any failure state.
@ -97,13 +160,33 @@ func (m *Module) Resolve(failureID string) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
if failureID == "" || failureID == m.failureID { // Check if resolving is necessary.
m.failureStatus = FailureNone if failureID != "" && failureID != m.failureID {
m.failureID = "" // Return immediately if not resolving any (`""`) or if the failure ID
m.failureMsg = "" // does not match.
return
} }
// Copy data for failure status update worker.
resolveFailureID := m.failureID
// Set failure status on module.
m.failureStatus = FailureNone
m.failureID = ""
m.failureTitle = ""
m.failureMsg = ""
// Notify of module change.
m.notifyOfChange() m.notifyOfChange()
// Propagate failure status.
if failureUpdateNotifyFuncReady.IsSet() {
m.newTask("failure status updater", func(context.Context, *Task) error {
// Only use data in worker that won't change anymore.
failureUpdateNotifyFunc(FailureNone, resolveFailureID, "", "")
return nil
}).QueuePrioritized()
}
} }
// readyToPrep returns whether all dependencies are ready for this module to prep. // readyToPrep returns whether all dependencies are ready for this module to prep.

View file

@ -76,7 +76,8 @@ func prep() error {
if err != nil { if err != nil {
module.Error( module.Error(
"modulemgmt-failed", "modulemgmt-failed",
fmt.Sprintf("The subsystem framework failed to start or stop one or more modules.\nError: %s\nCheck logs for more information.", err), "A Module failed to start",
fmt.Sprintf("The subsystem framework failed to start or stop one or more modules.\nError: %s\nCheck logs for more information or try to restart.", err),
) )
return nil return nil
} }

View file

@ -77,7 +77,7 @@ func TestSubsystems(t *testing.T) {
// test // test
// let module fail // let module fail
feature1.Error("test-fail", "Testing Fail") feature1.Error("test-fail", "Test Fail", "Testing Fail")
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
if sub1.FailureStatus != modules.FailureError { if sub1.FailureStatus != modules.FailureError {
t.Fatal("error did not propagate") t.Fatal("error did not propagate")

View file

@ -61,7 +61,10 @@ const (
defaultMaxDelay = 1 * time.Minute defaultMaxDelay = 1 * time.Minute
) )
// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed. // NewTask creates a new task with a descriptive name (non-unique), a optional
// deadline, and the task function to be executed. You must call one of Queue,
// QueuePrioritized, StartASAP, Schedule or Repeat in order to have the Task
// executed.
func (m *Module) NewTask(name string, fn func(context.Context, *Task) error) *Task { func (m *Module) NewTask(name string, fn func(context.Context, *Task) error) *Task {
if m == nil { if m == nil {
log.Errorf(`modules: cannot create task "%s" with nil module`, name) log.Errorf(`modules: cannot create task "%s" with nil module`, name)
@ -75,6 +78,10 @@ func (m *Module) NewTask(name string, fn func(context.Context, *Task) error) *Ta
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
return m.newTask(name, fn)
}
func (m *Module) newTask(name string, fn func(context.Context, *Task) error) *Task {
if m.Ctx == nil || !m.OnlineSoon() { if m.Ctx == nil || !m.OnlineSoon() {
log.Errorf(`modules: tasks should only be started when the module is online or starting`) log.Errorf(`modules: tasks should only be started when the module is online or starting`)
return &Task{ return &Task{
@ -144,8 +151,8 @@ func (t *Task) Queue() *Task {
return t return t
} }
// Prioritize puts the task in the prioritized queue. // QueuePrioritized queues the Task for execution in the prioritized queue.
func (t *Task) Prioritize() *Task { func (t *Task) QueuePrioritized() *Task {
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()

View file

@ -0,0 +1,107 @@
package notifications
import (
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
)
// AttachToModule attaches the notification to a module and changes to the
// notification will be reflected on the module failure status.
func (n *Notification) AttachToModule(m *modules.Module) {
if m == nil {
log.Warningf("notifications: invalid usage: cannot attach %s to nil module", n.EventID)
return
}
n.lock.Lock()
defer n.lock.Unlock()
if n.State != Active {
log.Warningf("notifications: cannot attach module to inactive notification %s", n.EventID)
return
}
if n.belongsTo != nil {
log.Warningf("notifications: cannot override attached module for notification %s", n.EventID)
return
}
// Attach module.
n.belongsTo = m
// Set module failure status.
switch n.Type { //nolint:exhaustive
case Info:
m.Hint(n.EventID, n.Title, n.Message)
case Warning:
m.Warning(n.EventID, n.Title, n.Message)
case Error:
m.Error(n.EventID, n.Title, n.Message)
default:
log.Warningf("notifications: incompatible type for attaching to module in notification %s", n.EventID)
m.Error(n.EventID, n.Title, n.Message+" [incompatible notification type]")
}
}
// resolveModuleFailure removes the notification from the module failure status.
func (n *Notification) resolveModuleFailure() {
if n.belongsTo != nil {
// Resolve failure in attached module.
n.belongsTo.Resolve(n.EventID)
// Reset attachment in order to mitigate duplicate failure resolving.
// Re-attachment is prevented by the state check when attaching.
n.belongsTo = nil
}
}
func init() {
modules.SetFailureUpdateNotifyFunc(mirrorModuleStatus)
}
func mirrorModuleStatus(moduleFailure uint8, id, title, msg string) {
// Ignore "resolve all" requests.
if id == "" {
return
}
// Get notification from storage.
n, ok := getNotification(id)
if ok {
// The notification already exists.
// Check if we should delete it.
if moduleFailure == modules.FailureNone {
n.Delete()
}
return
}
// A notification for the given ID does not yet exists, create it.
n = &Notification{
EventID: id,
Title: title,
Message: msg,
AvailableActions: []*Action{
{
Text: "Get Help",
Type: ActionTypeOpenURL,
Payload: "https://safing.io/support/",
},
},
}
switch moduleFailure {
case modules.FailureHint:
n.Type = Info
n.AvailableActions = nil
case modules.FailureWarning:
n.Type = Warning
n.ShowOnSystem = true
case modules.FailureError:
n.Type = Error
n.ShowOnSystem = true
}
Notify(n)
}

View file

@ -8,6 +8,7 @@ import (
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
"github.com/safing/portbase/log" "github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
"github.com/safing/portbase/utils" "github.com/safing/portbase/utils"
) )
@ -19,6 +20,7 @@ const (
Info Type = 0 Info Type = 0
Warning Type = 1 Warning Type = 1
Prompt Type = 2 Prompt Type = 2
Error Type = 3
) )
// State describes the state of a notification. // State describes the state of a notification.
@ -70,6 +72,10 @@ type Notification struct {
// of the notification is available. Note that the message should already // of the notification is available. Note that the message should already
// have any paramerized values replaced. // have any paramerized values replaced.
Message string Message string
// ShowOnSystem specifies if the notification should be also shown on the
// operating system. Notifications shown on the operating system level are
// more focus-intrusive and should only be used for important notifications.
ShowOnSystem bool
// EventData contains an additional payload for the notification. This payload // EventData contains an additional payload for the notification. This payload
// may contain contextual data and may be used by a localization framework // may contain contextual data and may be used by a localization framework
// to populate the notification message template. // to populate the notification message template.
@ -91,6 +97,10 @@ type Notification struct {
// based on the user selection. // based on the user selection.
SelectedActionID string SelectedActionID string
// belongsTo holds the module this notification belongs to. The notification
// lifecycle will be mirrored to the module's failure status.
belongsTo *modules.Module
lock sync.Mutex lock sync.Mutex
actionFunction NotificationActionFn // call function to process action actionFunction NotificationActionFn // call function to process action
actionTrigger chan string // and/or send to a channel actionTrigger chan string // and/or send to a channel
@ -99,8 +109,59 @@ type Notification struct {
// Action describes an action that can be taken for a notification. // Action describes an action that can be taken for a notification.
type Action struct { type Action struct {
// ID specifies a unique ID for the action. If an action is selected, the ID
// is written to SelectedActionID and the notification is saved.
// If the action type is not ActionTypeNone, the ID may be empty, signifying
// that this action is merely additional and selecting it does not dismiss the
// notification.
ID string ID string
// Text on the button.
Text string Text string
// Type specifies the action type. Implementing interfaces should only
// display action types they can handle.
Type ActionType
// Payload holds additional data for special action types.
Payload interface{}
}
// ActionType defines a specific type of action.
type ActionType string
// Action Types.
const (
ActionTypeNone = "" // Report selected ID back to backend.
ActionTypeOpenURL = "open-url" // Open external URL
ActionTypeOpenPage = "open-page" // Payload: Page ID
ActionTypeOpenSetting = "open-setting" // Payload: See struct definition below.
ActionTypeOpenProfile = "open-profile" // Payload: Scoped Profile ID
ActionTypeInjectEvent = "inject-event" // Payload: Event ID
ActionTypeWebhook = "call-webhook" // Payload: See struct definition below.
)
// ActionTypeOpenSettingPayload defines the payload for the OpenSetting Action Type.
type ActionTypeOpenSettingPayload struct {
// Key is the key of the setting.
Key string
// Profile is the scoped ID of the profile.
// Leaving this empty opens the global settings.
Profile string
}
// ActionTypeWebhookPayload defines the payload for the WebhookPayload Action Type.
type ActionTypeWebhookPayload struct {
// HTTP Method to use. Defaults to "GET", or "POST" if a Payload is supplied.
Method string
// URL to call.
// If the URL is relative, prepend the current API endpoint base path.
// If the URL is absolute, send request to the Portmaster.
URL string
// Payload holds arbitrary payload data.
Payload interface{}
// ResultAction defines what should be done with successfully returned data.
// Must one of:
// - `ignore`: do nothing (default)
// - `display`: the result is a human readable message, display it in a success message.
ResultAction string
} }
// Get returns the notification identifed by the given id or nil if it doesn't exist. // Get returns the notification identifed by the given id or nil if it doesn't exist.
@ -114,38 +175,84 @@ func Get(id string) *Notification {
return nil return nil
} }
// NotifyInfo is a helper method for quickly showing a info // Delete deletes the notification with the given id.
// notification. The notification is already shown. If id is func Delete(id string) {
// an empty string a new UUIDv4 will be generated. // Delete notification in defer to enable deferred unlocking.
func NotifyInfo(id, msg string, actions ...Action) *Notification { var n *Notification
return notify(Info, id, msg, actions...) var ok bool
defer func() {
if ok {
n.Delete()
}
}()
notsLock.Lock()
defer notsLock.Unlock()
n, ok = nots[id]
} }
// NotifyWarn is a helper method for quickly showing a warning // NotifyInfo is a helper method for quickly showing an info notification.
// notification. The notification is already shown. If id is // The notification will be activated immediately.
// an empty string a new UUIDv4 will be generated. // If the provided id is empty, an id will derived from msg.
func NotifyWarn(id, msg string, actions ...Action) *Notification { // ShowOnSystem is disabled.
return notify(Warning, id, msg, actions...) // If no actions are defined, a default "OK" (ID:"ack") action will be added.
func NotifyInfo(id, title, msg string, actions ...Action) *Notification {
return notify(Info, id, title, msg, false, actions...)
} }
// NotifyPrompt is a helper method for quickly showing a prompt // NotifyWarn is a helper method for quickly showing a warning notification
// notification. The notification is already shown. If id is // The notification will be activated immediately.
// an empty string a new UUIDv4 will be generated. // If the provided id is empty, an id will derived from msg.
func NotifyPrompt(id, msg string, actions ...Action) *Notification { // ShowOnSystem is enabled.
return notify(Prompt, id, msg, actions...) // If no actions are defined, a default "OK" (ID:"ack") action will be added.
func NotifyWarn(id, title, msg string, actions ...Action) *Notification {
return notify(Warning, id, title, msg, true, actions...)
} }
func notify(nType Type, id, msg string, actions ...Action) *Notification { // NotifyError is a helper method for quickly showing an error notification.
acts := make([]*Action, len(actions)) // The notification will be activated immediately.
for idx := range actions { // If the provided id is empty, an id will derived from msg.
a := actions[idx] // ShowOnSystem is enabled.
acts[idx] = &a // If no actions are defined, a default "OK" (ID:"ack") action will be added.
func NotifyError(id, title, msg string, actions ...Action) *Notification {
return notify(Error, id, title, msg, true, actions...)
}
// NotifyPrompt is a helper method for quickly showing a prompt notification.
// The notification will be activated immediately.
// If the provided id is empty, an id will derived from msg.
// ShowOnSystem is disabled.
// If no actions are defined, a default "OK" (ID:"ack") action will be added.
func NotifyPrompt(id, title, msg string, actions ...Action) *Notification {
return notify(Prompt, id, title, msg, false, actions...)
}
func notify(nType Type, id, title, msg string, showOnSystem bool, actions ...Action) *Notification {
// Process actions.
var acts []*Action
if len(actions) == 0 {
// Create ack action if there are no defined actions.
acts = []*Action{
{
ID: "ack",
Text: "OK",
},
}
} else {
// Reference given actions for notification.
acts = make([]*Action, len(actions))
for index := range actions {
a := actions[index]
acts[index] = &a
}
} }
return Notify(&Notification{ return Notify(&Notification{
EventID: id, EventID: id,
Type: nType, Type: nType,
Title: title,
Message: msg, Message: msg,
ShowOnSystem: showOnSystem,
AvailableActions: acts, AvailableActions: acts,
}) })
} }
@ -185,15 +292,9 @@ func (n *Notification) save(pushUpdate bool) {
n.lock.Lock() n.lock.Lock()
defer n.lock.Unlock() defer n.lock.Unlock()
// Move Title to Message, as that is the required field.
if n.Message == "" {
n.Message = n.Title
n.Title = ""
}
// Check if required data is present. // Check if required data is present.
if n.Message == "" { if n.Title == "" && n.Message == "" {
log.Warning("notifications: ignoring notification without Message") log.Warning("notifications: ignoring notification without Title or Message")
return return
} }
@ -213,16 +314,6 @@ func (n *Notification) save(pushUpdate bool) {
n.GUID = utils.RandomUUID(n.EventID).String() n.GUID = utils.RandomUUID(n.EventID).String()
} }
// Make ack notification if there are no defined actions.
if len(n.AvailableActions) == 0 {
n.AvailableActions = []*Action{
{
ID: "ack",
Text: "OK",
},
}
}
// Make sure we always have a notification state assigned. // Make sure we always have a notification state assigned.
if n.State == "" { if n.State == "" {
n.State = Active n.State = Active
@ -293,9 +384,8 @@ func (n *Notification) Update(expires int64) {
} }
// Delete (prematurely) cancels and deletes a notification. // Delete (prematurely) cancels and deletes a notification.
func (n *Notification) Delete() error { func (n *Notification) Delete() {
n.delete(true) n.delete(true)
return nil
} }
// delete deletes the notification from the internal storage. It locks the // delete deletes the notification from the internal storage. It locks the
@ -332,6 +422,8 @@ func (n *Notification) delete(pushUpdate bool) {
if pushUpdate { if pushUpdate {
dbController.PushUpdate(n) dbController.PushUpdate(n)
} }
n.resolveModuleFailure()
} }
// Expired notifies the caller when the notification has expired. // Expired notifies the caller when the notification has expired.
@ -384,6 +476,7 @@ func (n *Notification) selectAndExecuteAction(id string) {
if executed { if executed {
n.State = Executed n.State = Executed
n.resolveModuleFailure()
} }
} }

View file

@ -1,6 +1,8 @@
package runtime package runtime
import ( import (
"fmt"
"github.com/safing/portbase/database" "github.com/safing/portbase/database"
"github.com/safing/portbase/modules" "github.com/safing/portbase/modules"
) )
@ -30,6 +32,10 @@ func startModule() error {
return err return err
} }
if err := startModulesIntegration(); err != nil {
return fmt.Errorf("failed to start modules integration: %w", err)
}
return nil return nil
} }

View file

@ -0,0 +1,72 @@
package runtime
import (
"fmt"
"sync"
"github.com/safing/portbase/database"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
)
var (
modulesIntegrationUpdatePusher func(...record.Record)
)
func startModulesIntegration() (err error) {
modulesIntegrationUpdatePusher, err = Register("modules/", &ModulesIntegration{})
if err != nil {
return err
}
if !modules.SetEventSubscriptionFunc(pushModuleEvent) {
log.Warningf("runtime: failed to register the modules event subscription function")
}
return nil
}
type ModulesIntegration struct{}
// Set is called when the value is set from outside.
// If the runtime value is considered read-only ErrReadOnly
// should be returned. It is guaranteed that the key of
// the record passed to Set is prefixed with the key used
// to register the value provider.
func (mi *ModulesIntegration) Set(record.Record) (record.Record, error) {
return nil, ErrReadOnly
}
// Get should return one or more records that match keyOrPrefix.
// keyOrPrefix is guaranteed to be at least the prefix used to
// register the ValueProvider.
func (mi *ModulesIntegration) Get(keyOrPrefix string) ([]record.Record, error) {
return nil, database.ErrNotFound
}
type eventData struct { //nolint:unused // This is a cascading false positive.
record.Base
sync.Mutex
Data interface{}
}
func pushModuleEvent(moduleName, eventName string, internal bool, data interface{}) { //nolint:unused // This is a false positive, the function is provided to modules.SetEventSubscriptionFunc().
// Create event record and set key.
eventRecord := &eventData{
Data: data,
}
eventRecord.SetKey(fmt.Sprintf(
"runtime:modules/%s/event/%s",
moduleName,
eventName,
))
eventRecord.UpdateMeta()
if internal {
eventRecord.Meta().MakeSecret()
eventRecord.Meta().MakeCrownJewel()
}
// Push event to database subscriptions.
modulesIntegrationUpdatePusher(eventRecord)
}

View file

@ -39,7 +39,7 @@ func init() {
) )
// register events that other modules can subscribe to // register events that other modules can subscribe to
module.RegisterEvent(eventStateUpdate) module.RegisterEvent(eventStateUpdate, true)
} }
func prep() error { func prep() error {

5
test
View file

@ -175,13 +175,16 @@ echo "running tests for ${platformInfo//$'\n'/ }:"
# run vet/test on packages # run vet/test on packages
for package in $packages; do for package in $packages; do
package=${package#github.com/safing/portbase}
package=${package#/}
package=$PWD/$package
echo "" echo ""
echo $package echo $package
if [[ $testonly -eq 0 ]]; then if [[ $testonly -eq 0 ]]; then
checkformat $package checkformat $package
run golint -set_exit_status -min_confidence 1.0 $package run golint -set_exit_status -min_confidence 1.0 $package
run go vet $package run go vet $package
run golangci-lint run $GOPATH/src/$package run golangci-lint run $package
fi fi
run go test -cover $fullTestFlags $package run go test -cover $fullTestFlags $package
done done

View file

@ -36,14 +36,14 @@ func TestVersionSelection(t *testing.T) {
registry.Beta = true registry.Beta = true
registry.DevMode = true registry.DevMode = true
res.selectVersion() res.selectVersion()
if res.SelectedVersion.VersionNumber != "0" { if res.SelectedVersion.VersionNumber != "0.0.0" {
t.Errorf("selected version should be 0, not %s", res.SelectedVersion.VersionNumber) t.Errorf("selected version should be 0.0.0, not %s", res.SelectedVersion.VersionNumber)
} }
registry.DevMode = false registry.DevMode = false
res.selectVersion() res.selectVersion()
if res.SelectedVersion.VersionNumber != "1.2.4b" { if res.SelectedVersion.VersionNumber != "1.2.4-b" {
t.Errorf("selected version should be 1.2.4b, not %s", res.SelectedVersion.VersionNumber) t.Errorf("selected version should be 1.2.4-b, not %s", res.SelectedVersion.VersionNumber)
} }
registry.Beta = false registry.Beta = false