safing-portmaster/cmds/observation-hub/observe.go
Daniel Hååvi 80664d1a27
Restructure modules ()
* Move portbase into monorepo

* Add new simple module mgr

* [WIP] Switch to new simple module mgr

* Add StateMgr and more worker variants

* [WIP] Switch more modules

* [WIP] Switch more modules

* [WIP] swtich more modules

* [WIP] switch all SPN modules

* [WIP] switch all service modules

* [WIP] Convert all workers to the new module system

* [WIP] add new task system to module manager

* [WIP] Add second take for scheduling workers

* [WIP] Add FIXME for bugs in new scheduler

* [WIP] Add minor improvements to scheduler

* [WIP] Add new worker scheduler

* [WIP] Fix more bug related to new module system

* [WIP] Fix start handing of the new module system

* [WIP] Improve startup process

* [WIP] Fix minor issues

* [WIP] Fix missing subsystem in settings

* [WIP] Initialize managers in constructor

* [WIP] Move module event initialization to constrictors

* [WIP] Fix setting for enabling and disabling the SPN module

* [WIP] Move API registeration into module construction

* [WIP] Update states mgr for all modules

* [WIP] Add CmdLine operation support

* Add state helper methods to module group and instance

* Add notification and module status handling to status package

* Fix starting issues

* Remove pilot widget and update security lock to new status data

* Remove debug logs

* Improve http server shutdown

* Add workaround for cleanly shutting down firewall+netquery

* Improve logging

* Add syncing states with notifications for new module system

* Improve starting, stopping, shutdown; resolve FIXMEs/TODOs

* [WIP] Fix most unit tests

* Review new module system and fix minor issues

* Push shutdown and restart events again via API

* Set sleep mode via interface

* Update example/template module

* [WIP] Fix spn/cabin unit test

* Remove deprecated UI elements

* Make log output more similar for the logging transition phase

* Switch spn hub and observer cmds to new module system

* Fix log sources

* Make worker mgr less error prone

* Fix tests and minor issues

* Fix observation hub

* Improve shutdown and restart handling

* Split up big connection.go source file

* Move varint and dsd packages to structures repo

* Improve expansion test

* Fix linter warnings

* Fix interception module on windows

* Fix linter errors

---------

Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
2024-08-09 18:15:48 +03:00

461 lines
11 KiB
Go

package main
import (
"errors"
"flag"
"fmt"
"path"
"strings"
"sync/atomic"
"time"
diff "github.com/r3labs/diff/v3"
"golang.org/x/exp/slices"
"github.com/safing/portmaster/base/database"
"github.com/safing/portmaster/base/database/query"
"github.com/safing/portmaster/base/log"
"github.com/safing/portmaster/service/mgr"
"github.com/safing/portmaster/spn/captain"
"github.com/safing/portmaster/spn/navigator"
)
// Observer is the network observer module.
type Observer struct {
mgr *mgr.Manager
instance instance
}
// Manager returns the module manager.
func (o *Observer) Manager() *mgr.Manager {
return o.mgr
}
// Start starts the module.
func (o *Observer) Start() error {
return startObserver()
}
// Stop stops the module.
func (o *Observer) Stop() error {
return nil
}
var (
observerModule *Observer
shimLoaded atomic.Bool
db = database.NewInterface(&database.Options{
Local: true,
Internal: true,
})
reportAllChanges bool
errNoChanges = errors.New("no changes")
reportingDelayFlag string
reportingDelay = 5 * time.Minute
reportingMaxDelay = reportingDelay * 3
)
func init() {
flag.BoolVar(&reportAllChanges, "report-all-changes", false, "report all changes, no just interesting ones")
flag.StringVar(&reportingDelayFlag, "reporting-delay", "10m", "delay reports to summarize changes")
}
func prepObserver() error {
if reportingDelayFlag != "" {
duration, err := time.ParseDuration(reportingDelayFlag)
if err != nil {
return fmt.Errorf("failed to parse reporting-delay: %w", err)
}
reportingDelay = duration
}
reportingMaxDelay = reportingDelay * 3
return nil
}
func startObserver() error {
observerModule.mgr.Go("observer", observerWorker)
return nil
}
type observedPin struct {
previous *navigator.PinExport
latest *navigator.PinExport
firstUpdate time.Time
lastUpdate time.Time
updateReported bool
}
type observedChange struct {
Title string
Summary string
UpdatedPin *navigator.PinExport
UpdateTime time.Time
SPNStatus *captain.SPNStatus
}
func observerWorker(ctx *mgr.WorkerCtx) error {
log.Info("observer: starting")
defer log.Info("observer: stopped")
// Subscribe to SPN status.
statusSub, err := db.Subscribe(query.New("runtime:spn/status"))
if err != nil {
return fmt.Errorf("failed to subscribe to spn status: %w", err)
}
defer statusSub.Cancel() //nolint:errcheck
// Get latest status.
latestStatus := captain.GetSPNStatus()
// Step 1: Wait for SPN to connect, if needed.
if latestStatus.Status != captain.StatusConnected {
log.Info("observer: waiting for SPN to connect")
waitForConnect:
for {
select {
case r := <-statusSub.Feed:
if r == nil {
return errors.New("status feed ended")
}
statusUpdate, ok := r.(*captain.SPNStatus)
switch {
case !ok:
log.Warningf("observer: received invalid SPN status: %s", r)
case statusUpdate.Status == captain.StatusFailed:
log.Warningf("observer: SPN failed to connect")
case statusUpdate.Status == captain.StatusConnected:
break waitForConnect
}
case <-ctx.Done():
return nil
}
}
}
// Wait for one second for the navigator to settle things.
log.Info("observer: connected to network, waiting for navigator")
time.Sleep(1 * time.Second)
// Step 2: Get current state.
mapQuery := query.New("map:main/")
q, err := db.Query(mapQuery)
if err != nil {
return fmt.Errorf("failed to start map query: %w", err)
}
defer q.Cancel()
// Put all current pins in a map.
observedPins := make(map[string]*observedPin)
initialQuery:
for {
select {
case r := <-q.Next:
// Check if we are done.
if r == nil {
break initialQuery
}
// Add all pins to seen pins.
if pin, ok := r.(*navigator.PinExport); ok {
observedPins[pin.ID] = &observedPin{
previous: pin,
latest: pin,
updateReported: true,
}
} else {
log.Warningf("observer: received invalid pin export: %s", r)
}
case <-ctx.Done():
return nil
}
}
if q.Err() != nil {
return fmt.Errorf("failed to finish map query: %w", q.Err())
}
// Step 3: Monitor for changes.
sub, err := db.Subscribe(mapQuery)
if err != nil {
return fmt.Errorf("failed to start map sub: %w", err)
}
defer sub.Cancel() //nolint:errcheck
// Start ticker for checking for changes.
reportChangesTicker := time.NewTicker(10 * time.Second)
defer reportChangesTicker.Stop()
log.Info("observer: listening for hub changes")
for {
select {
case <-ctx.Done():
return nil
case r := <-statusSub.Feed:
// Keep SPN connection status up to date.
if r == nil {
return errors.New("status feed ended")
}
if statusUpdate, ok := r.(*captain.SPNStatus); ok {
latestStatus = statusUpdate
log.Infof("observer: SPN status is now %s", statusUpdate.Status)
} else {
log.Warningf("observer: received invalid pin export: %s", r)
}
case r := <-sub.Feed:
// Save all observed pins.
switch {
case r == nil:
return errors.New("pin feed ended")
case r.Meta().IsDeleted():
delete(observedPins, path.Base(r.DatabaseKey()))
default:
if pin, ok := r.(*navigator.PinExport); ok {
existingObservedPin, ok := observedPins[pin.ID]
if ok {
// Update previously observed Hub.
existingObservedPin.latest = pin
if existingObservedPin.updateReported {
existingObservedPin.firstUpdate = time.Now()
}
existingObservedPin.lastUpdate = time.Now()
existingObservedPin.updateReported = false
} else {
// Add new Hub.
observedPins[pin.ID] = &observedPin{
latest: pin,
firstUpdate: time.Now(),
lastUpdate: time.Now(),
updateReported: false,
}
}
} else {
log.Warningf("observer: received invalid pin export: %s", r)
}
}
case <-reportChangesTicker.C:
// Report changed pins.
for _, observedPin := range observedPins {
// Check if context was canceled.
select {
case <-ctx.Done():
return nil
default:
}
switch {
case observedPin.updateReported:
// Change already reported.
case time.Since(observedPin.lastUpdate) < reportingDelay &&
time.Since(observedPin.firstUpdate) < reportingMaxDelay:
// Only report changes if older than the configured delay.
// Up to a maximum delay.
default:
// Format and report.
title, changes, err := formatPinChanges(observedPin.previous, observedPin.latest)
if err != nil {
if errors.Is(err, errNoChanges) {
log.Debugf("observer: no reportable changes found for %s", observedPin.latest.HumanName())
} else {
log.Warningf("observer: failed to format pin changes: %s", err)
}
} else {
// Report changes.
reportChanges(&observedChange{
Title: title,
Summary: changes,
UpdatedPin: observedPin.latest,
UpdateTime: observedPin.lastUpdate,
SPNStatus: latestStatus,
})
}
// Update observed pin.
observedPin.previous = observedPin.latest
observedPin.updateReported = true
}
}
}
}
}
func reportChanges(change *observedChange) {
// Log changes.
log.Infof("observer:\n%s\n%s", change.Title, change.Summary)
// Report via Apprise.
err := reportToApprise(change)
if err != nil {
log.Warningf("observer: failed to report changes to apprise: %s", err)
}
}
var (
ignoreChangesIn = []string{
"ConnectedTo",
"HopDistance",
"Info.entryPolicy", // Alternatively, ignore "Info.Entry"
"Info.exitPolicy", // Alternatively, ignore "Info.Exit"
"Info.parsedTransports",
"Info.Timestamp",
"SessionActive",
"Status.Keys",
"Status.Lanes",
"Status.Load",
"Status.Timestamp",
}
ignoreStates = []string{
"IsHomeHub",
"Failing",
}
)
func ignoreChange(path string) bool {
if reportAllChanges {
return false
}
for _, pathPrefix := range ignoreChangesIn {
if strings.HasPrefix(path, pathPrefix) {
return true
}
}
return false
}
func formatPinChanges(from, to *navigator.PinExport) (title, changes string, err error) {
// Return immediately if pin is new.
if from == nil {
return fmt.Sprintf("New Hub: %s", makeHubName(to.Name, to.ID)), "", nil
}
// Find notable changes.
changelog, err := diff.Diff(from, to)
if err != nil {
return "", "", fmt.Errorf("failed to diff: %w", err)
}
if len(changelog) > 0 {
// Build changelog message.
changes := make([]string, 0, len(changelog))
for _, change := range changelog {
// Create path to changed field.
fullPath := strings.Join(change.Path, ".")
// Check if this path should be ignored.
if ignoreChange(fullPath) {
continue
}
// Add to reportable changes.
changeMsg := formatChange(change, fullPath)
if changeMsg != "" {
changes = append(changes, changeMsg)
}
}
// Log the changes, if there are any left.
if len(changes) > 0 {
return fmt.Sprintf("Hub Changed: %s", makeHubName(to.Name, to.ID)),
strings.Join(changes, "\n"),
nil
}
}
return "", "", errNoChanges
}
func formatChange(change diff.Change, fullPath string) string {
switch {
case strings.HasPrefix(fullPath, "States"):
switch change.Type {
case diff.CREATE:
return formatState(fmt.Sprintf("%v", change.To), true)
case diff.UPDATE:
a := formatState(fmt.Sprintf("%v", change.To), true)
b := formatState(fmt.Sprintf("%v", change.From), false)
switch {
case a != "" && b != "":
return a + "\n" + b
case a != "":
return a
case b != "":
return b
}
case diff.DELETE:
return formatState(fmt.Sprintf("%v", change.From), false)
}
default:
switch change.Type {
case diff.CREATE:
return fmt.Sprintf("%s added %v", fullPath, change.To)
case diff.UPDATE:
return fmt.Sprintf("%s changed from %v to %v", fullPath, change.From, change.To)
case diff.DELETE:
return fmt.Sprintf("%s removed %v", fullPath, change.From)
}
}
return ""
}
func formatState(name string, isSet bool) string {
// Check if state should be ignored.
if !reportAllChanges && slices.Contains[[]string, string](ignoreStates, name) {
return ""
}
if isSet {
return fmt.Sprintf("State is %v", name)
}
return fmt.Sprintf("State is NOT %v", name)
}
func makeHubName(name, id string) string {
shortenedID := id[len(id)-8:len(id)-4] +
"-" +
id[len(id)-4:]
// Be more careful, as the Hub name is user input.
switch {
case name == "":
return shortenedID
case len(name) > 16:
return fmt.Sprintf("%s (%s)", name[:16], shortenedID)
default:
return fmt.Sprintf("%s (%s)", name, shortenedID)
}
}
// New returns a new Observer module.
func New(instance instance) (*Observer, error) {
if !shimLoaded.CompareAndSwap(false, true) {
return nil, errors.New("only one instance allowed")
}
m := mgr.New("observer")
observerModule = &Observer{
mgr: m,
instance: instance,
}
if err := prepObserver(); err != nil {
return nil, err
}
return observerModule, nil
}
type instance interface{}