mirror of
https://github.com/safing/portmaster
synced 2025-04-21 19:39:09 +00:00
* Move portbase into monorepo * Add new simple module mgr * [WIP] Switch to new simple module mgr * Add StateMgr and more worker variants * [WIP] Switch more modules * [WIP] Switch more modules * [WIP] swtich more modules * [WIP] switch all SPN modules * [WIP] switch all service modules * [WIP] Convert all workers to the new module system * [WIP] add new task system to module manager * [WIP] Add second take for scheduling workers * [WIP] Add FIXME for bugs in new scheduler * [WIP] Add minor improvements to scheduler * [WIP] Add new worker scheduler * [WIP] Fix more bug related to new module system * [WIP] Fix start handing of the new module system * [WIP] Improve startup process * [WIP] Fix minor issues * [WIP] Fix missing subsystem in settings * [WIP] Initialize managers in constructor * [WIP] Move module event initialization to constrictors * [WIP] Fix setting for enabling and disabling the SPN module * [WIP] Move API registeration into module construction * [WIP] Update states mgr for all modules * [WIP] Add CmdLine operation support * Add state helper methods to module group and instance * Add notification and module status handling to status package * Fix starting issues * Remove pilot widget and update security lock to new status data * Remove debug logs * Improve http server shutdown * Add workaround for cleanly shutting down firewall+netquery * Improve logging * Add syncing states with notifications for new module system * Improve starting, stopping, shutdown; resolve FIXMEs/TODOs * [WIP] Fix most unit tests * Review new module system and fix minor issues * Push shutdown and restart events again via API * Set sleep mode via interface * Update example/template module * [WIP] Fix spn/cabin unit test * Remove deprecated UI elements * Make log output more similar for the logging transition phase * Switch spn hub and observer cmds to new module system * Fix log sources * Make worker mgr less error prone * Fix tests and minor issues * Fix observation hub * Improve shutdown and restart handling * Split up big connection.go source file * Move varint and dsd packages to structures repo * Improve expansion test * Fix linter warnings * Fix interception module on windows * Fix linter errors --------- Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
202 lines
5 KiB
Go
202 lines
5 KiB
Go
package hub
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/safing/portmaster/base/database"
|
|
"github.com/safing/portmaster/base/database/iterator"
|
|
"github.com/safing/portmaster/base/database/query"
|
|
"github.com/safing/portmaster/base/database/record"
|
|
)
|
|
|
|
var (
|
|
db = database.NewInterface(&database.Options{
|
|
Local: true,
|
|
Internal: true,
|
|
})
|
|
|
|
getFromNavigator func(mapName, hubID string) *Hub
|
|
)
|
|
|
|
// MakeHubDBKey makes a hub db key.
|
|
func MakeHubDBKey(mapName, hubID string) string {
|
|
return fmt.Sprintf("cache:spn/hubs/%s/%s", mapName, hubID)
|
|
}
|
|
|
|
// MakeHubMsgDBKey makes a hub msg db key.
|
|
func MakeHubMsgDBKey(mapName string, msgType MsgType, hubID string) string {
|
|
return fmt.Sprintf("cache:spn/msgs/%s/%s/%s", mapName, msgType, hubID)
|
|
}
|
|
|
|
// SetNavigatorAccess sets a shortcut function to access hubs from the navigator instead of having go through the database.
|
|
// This also reduces the number of object in RAM and better caches parsed attributes.
|
|
func SetNavigatorAccess(fn func(mapName, hubID string) *Hub) {
|
|
if getFromNavigator == nil {
|
|
getFromNavigator = fn
|
|
}
|
|
}
|
|
|
|
// GetHub get a Hub from the database - or the navigator, if configured.
|
|
func GetHub(mapName string, hubID string) (*Hub, error) {
|
|
if getFromNavigator != nil {
|
|
hub := getFromNavigator(mapName, hubID)
|
|
if hub != nil {
|
|
return hub, nil
|
|
}
|
|
}
|
|
|
|
return GetHubByKey(MakeHubDBKey(mapName, hubID))
|
|
}
|
|
|
|
// GetHubByKey returns a hub by its raw DB key.
|
|
func GetHubByKey(key string) (*Hub, error) {
|
|
r, err := db.Get(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
hub, err := EnsureHub(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return hub, nil
|
|
}
|
|
|
|
// EnsureHub makes sure a database record is a Hub.
|
|
func EnsureHub(r record.Record) (*Hub, error) {
|
|
// unwrap
|
|
if r.IsWrapped() {
|
|
// only allocate a new struct, if we need it
|
|
newHub := &Hub{}
|
|
err := record.Unwrap(r, newHub)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
newHub = prepHub(newHub)
|
|
|
|
// Fully validate when getting from database.
|
|
if err := newHub.Info.validateFormatting(); err != nil {
|
|
return nil, fmt.Errorf("announcement failed format validation: %w", err)
|
|
}
|
|
if err := newHub.Status.validateFormatting(); err != nil {
|
|
return nil, fmt.Errorf("status failed format validation: %w", err)
|
|
}
|
|
if err := newHub.Info.prepare(false); err != nil {
|
|
return nil, fmt.Errorf("failed to prepare announcement: %w", err)
|
|
}
|
|
|
|
return newHub, nil
|
|
}
|
|
|
|
// or adjust type
|
|
newHub, ok := r.(*Hub)
|
|
if !ok {
|
|
return nil, fmt.Errorf("record not of type *Hub, but %T", r)
|
|
}
|
|
newHub = prepHub(newHub)
|
|
|
|
// Prepare only when already parsed.
|
|
if err := newHub.Info.prepare(false); err != nil {
|
|
return nil, fmt.Errorf("failed to prepare announcement: %w", err)
|
|
}
|
|
|
|
// ensure status
|
|
return newHub, nil
|
|
}
|
|
|
|
func prepHub(h *Hub) *Hub {
|
|
if h.Status == nil {
|
|
h.Status = &Status{}
|
|
}
|
|
h.Measurements = getSharedMeasurements(h.ID, h.Measurements)
|
|
return h
|
|
}
|
|
|
|
// Save saves to Hub to the correct scope in the database.
|
|
func (h *Hub) Save() error {
|
|
if !h.KeyIsSet() {
|
|
h.SetKey(MakeHubDBKey(h.Map, h.ID))
|
|
}
|
|
|
|
return db.Put(h)
|
|
}
|
|
|
|
// RemoveHubAndMsgs deletes a Hub and it's saved messages from the database.
|
|
func RemoveHubAndMsgs(mapName string, hubID string) (err error) {
|
|
err = db.Delete(MakeHubDBKey(mapName, hubID))
|
|
if err != nil && !errors.Is(err, database.ErrNotFound) {
|
|
return fmt.Errorf("failed to delete main hub entry: %w", err)
|
|
}
|
|
|
|
err = db.Delete(MakeHubMsgDBKey(mapName, MsgTypeAnnouncement, hubID))
|
|
if err != nil && !errors.Is(err, database.ErrNotFound) {
|
|
return fmt.Errorf("failed to delete hub announcement data: %w", err)
|
|
}
|
|
|
|
err = db.Delete(MakeHubMsgDBKey(mapName, MsgTypeStatus, hubID))
|
|
if err != nil && !errors.Is(err, database.ErrNotFound) {
|
|
return fmt.Errorf("failed to delete hub status data: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// HubMsg stores raw Hub messages.
|
|
type HubMsg struct { //nolint:golint
|
|
record.Base
|
|
sync.Mutex
|
|
|
|
ID string
|
|
Map string
|
|
Type MsgType
|
|
Data []byte
|
|
|
|
Received int64
|
|
}
|
|
|
|
// SaveHubMsg saves a raw (and signed) message received by another Hub.
|
|
func SaveHubMsg(id string, mapName string, msgType MsgType, data []byte) error {
|
|
// create wrapper record
|
|
msg := &HubMsg{
|
|
ID: id,
|
|
Map: mapName,
|
|
Type: msgType,
|
|
Data: data,
|
|
Received: time.Now().Unix(),
|
|
}
|
|
// set key
|
|
msg.SetKey(MakeHubMsgDBKey(msg.Map, msg.Type, msg.ID))
|
|
// save
|
|
return db.PutNew(msg)
|
|
}
|
|
|
|
// QueryRawGossipMsgs queries the database for raw gossip messages.
|
|
func QueryRawGossipMsgs(mapName string, msgType MsgType) (it *iterator.Iterator, err error) {
|
|
it, err = db.Query(query.New(MakeHubMsgDBKey(mapName, msgType, "")))
|
|
return
|
|
}
|
|
|
|
// EnsureHubMsg makes sure a database record is a HubMsg.
|
|
func EnsureHubMsg(r record.Record) (*HubMsg, error) {
|
|
// unwrap
|
|
if r.IsWrapped() {
|
|
// only allocate a new struct, if we need it
|
|
newHubMsg := &HubMsg{}
|
|
err := record.Unwrap(r, newHubMsg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newHubMsg, nil
|
|
}
|
|
|
|
// or adjust type
|
|
newHubMsg, ok := r.(*HubMsg)
|
|
if !ok {
|
|
return nil, fmt.Errorf("record not of type *Hub, but %T", r)
|
|
}
|
|
return newHubMsg, nil
|
|
}
|