From ec858165779999559dec2999cc207aa4221125a9 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 20 Jul 2023 13:43:55 +0200 Subject: [PATCH] Move interception module and better integrate workers --- firewall/filter.go | 91 +++++++++++- firewall/interception.go | 139 +++++++----------- firewall/interception/interception.go | 24 ++- firewall/interception/interception_default.go | 4 +- firewall/interception/interception_linux.go | 35 +++-- firewall/interception/interception_windows.go | 18 ++- firewall/interception/nfqueue_linux.go | 21 +-- 7 files changed, 210 insertions(+), 122 deletions(-) diff --git a/firewall/filter.go b/firewall/filter.go index d1b75f2e..b85d324e 100644 --- a/firewall/filter.go +++ b/firewall/filter.go @@ -1,21 +1,27 @@ package firewall import ( + "context" + "github.com/safing/portbase/config" + "github.com/safing/portbase/log" "github.com/safing/portbase/modules" "github.com/safing/portbase/modules/subsystems" _ "github.com/safing/portmaster/core" + "github.com/safing/portmaster/network" ) -var filterModule *modules.Module +// FIXME: rename to "module" + +var module *modules.Module func init() { - filterModule = modules.Register("filter", nil, nil, nil, "core", "interception", "intel") + module = modules.Register("filter", prep, start, stop, "core", "interception", "intel") subsystems.Register( "filter", "Privacy Filter", "DNS and Network Filter", - filterModule, + module, "config:filter/", &config.Option{ Name: "Privacy Filter Module", @@ -31,3 +37,82 @@ func init() { }, ) } + +const ( + configChangeEvent = "config change" + profileConfigChangeEvent = "profile config change" + onSPNConnectEvent = "spn connect" +) + +func prep() error { + network.SetDefaultFirewallHandler(verdictHandler) + + // Reset connections every time configuration changes + // this will be triggered on spn enable/disable + err := module.RegisterEventHook( + "config", + configChangeEvent, + "reset connection verdicts", + func(ctx context.Context, _ interface{}) error { + resetAllConnectionVerdicts() + return nil + }, + ) + if err != nil { + log.Errorf("interception: failed registering event hook: %s", err) + } + + // Reset connections every time profile changes + err = module.RegisterEventHook( + "profiles", + profileConfigChangeEvent, + "reset connection verdicts", + func(ctx context.Context, _ interface{}) error { + resetAllConnectionVerdicts() + return nil + }, + ) + if err != nil { + log.Errorf("failed registering event hook: %s", err) + } + + // Reset connections when spn is connected + // connect and disconnecting is triggered on config change event but connecting takеs more time + err = module.RegisterEventHook( + "captain", + onSPNConnectEvent, + "reset connection verdicts", + func(ctx context.Context, _ interface{}) error { + resetAllConnectionVerdicts() + return nil + }, + ) + if err != nil { + log.Errorf("failed registering event hook: %s", err) + } + + if err := registerConfig(); err != nil { + return err + } + + return prepAPIAuth() +} + +func start() error { + getConfig() + startAPIAuth() + + module.StartServiceWorker("packet handler", 0, packetHandler) + module.StartServiceWorker("bandwidth update handler", 0, bandwidthUpdateHandler) + + // Start stat logger if logging is set to trace. + if log.GetLogLevel() == log.TraceLevel { + module.StartServiceWorker("stat logger", 0, statLogger) + } + + return nil +} + +func stop() error { + return nil +} diff --git a/firewall/interception.go b/firewall/interception.go index 5c6c5db8..6ef4bda9 100644 --- a/firewall/interception.go +++ b/firewall/interception.go @@ -13,7 +13,6 @@ import ( "github.com/tevino/abool" "github.com/safing/portbase/log" - "github.com/safing/portbase/modules" "github.com/safing/portmaster/compat" _ "github.com/safing/portmaster/core/base" "github.com/safing/portmaster/firewall/inspection" @@ -25,9 +24,9 @@ import ( "github.com/safing/portmaster/network/reference" ) -var ( - interceptionModule *modules.Module +// FIXME: rename to "packet_handler" +var ( nameserverIPMatcher func(ip net.IP) bool nameserverIPMatcherSet = abool.New() nameserverIPMatcherReady = abool.New() @@ -43,71 +42,6 @@ var ( ownPID = os.Getpid() ) -const ( - configChangeEvent = "config change" - profileConfigChangeEvent = "profile config change" - onSPNConnectEvent = "spn connect" -) - -func init() { - // TODO: Move interception module to own package (dir). - interceptionModule = modules.Register("interception", interceptionPrep, interceptionStart, interceptionStop, "base", "updates", "network", "notifications", "profiles") - - network.SetDefaultFirewallHandler(verdictHandler) -} - -func interceptionPrep() error { - // Reset connections every time configuration changes - // this will be triggered on spn enable/disable - err := interceptionModule.RegisterEventHook( - "config", - configChangeEvent, - "reset connection verdicts", - func(ctx context.Context, _ interface{}) error { - resetAllConnectionVerdicts() - return nil - }, - ) - if err != nil { - log.Errorf("interception: failed registering event hook: %s", err) - } - - // Reset connections every time profile changes - err = interceptionModule.RegisterEventHook( - "profiles", - profileConfigChangeEvent, - "reset connection verdicts", - func(ctx context.Context, _ interface{}) error { - resetAllConnectionVerdicts() - return nil - }, - ) - if err != nil { - log.Errorf("failed registering event hook: %s", err) - } - - // Reset connections when spn is connected - // connect and disconnecting is triggered on config change event but connecting takеs more time - err = interceptionModule.RegisterEventHook( - "captain", - onSPNConnectEvent, - "reset connection verdicts", - func(ctx context.Context, _ interface{}) error { - resetAllConnectionVerdicts() - return nil - }, - ) - if err != nil { - log.Errorf("failed registering event hook: %s", err) - } - - if err := registerConfig(); err != nil { - return err - } - - return prepAPIAuth() -} - func resetAllConnectionVerdicts() { // Resetting will force all the connection to be evaluated by the firewall again // this will set new verdicts if configuration was update or spn has been disabled or enabled. @@ -169,24 +103,6 @@ func resetAllConnectionVerdicts() { tracer.Submit() } -func interceptionStart() error { - getConfig() - startAPIAuth() - - interceptionModule.StartServiceWorker("packet handler", 0, packetHandler) - - // Start stat logger if logging is set to trace. - if log.GetLogLevel() == log.TraceLevel { - interceptionModule.StartServiceWorker("stat logger", 0, statLogger) - } - - return interception.Start() -} - -func interceptionStop() error { - return interception.Stop() -} - // SetNameserverIPMatcher sets a function that is used to match the internal // nameserver IP(s). Can only bet set once. func SetNameserverIPMatcher(fn func(ip net.IP) bool) error { @@ -695,6 +611,57 @@ func packetHandler(ctx context.Context) error { } } +func bandwidthUpdateHandler(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case bwUpdate := <-interception.BandwidthUpdates: + if bwUpdate != nil { + updateBandwidth(bwUpdate) + // DEBUG: + // log.Debugf("filter: bandwidth update: %s", bwUpdate) + } else { + return errors.New("received nil bandwidth update from interception") + } + } + } +} + +func updateBandwidth(bwUpdate *packet.BandwidthUpdate) { + // Check if update makes sense. + if bwUpdate.RecvBytes == 0 && bwUpdate.SentBytes == 0 { + return + } + + // Get connection. + conn, ok := network.GetConnection(bwUpdate.ConnID) + if !ok { + return + } + + // Do not wait for connections that are locked. + // TODO: Use atomic operations for updating bandwidth stats. + if !conn.TryLock() { + return + } + defer conn.Unlock() + + // Update stats according to method. + switch bwUpdate.Method { + case packet.Absolute: + conn.RecvBytes = bwUpdate.RecvBytes + conn.SentBytes = bwUpdate.SentBytes + case packet.Additive: + conn.RecvBytes += bwUpdate.RecvBytes + conn.SentBytes += bwUpdate.SentBytes + default: + log.Warningf("filter: unsupported bandwidth update method: %d", bwUpdate.Method) + } + + // TODO: Send update. +} + func statLogger(ctx context.Context) error { for { select { diff --git a/firewall/interception/interception.go b/firewall/interception/interception.go index f3cbd5c5..8f80188e 100644 --- a/firewall/interception/interception.go +++ b/firewall/interception/interception.go @@ -4,22 +4,36 @@ import ( "flag" "github.com/safing/portbase/log" + "github.com/safing/portbase/modules" "github.com/safing/portmaster/network/packet" ) +// FIXME: rename to "module" + var ( - // Packets channel for feeding the firewall. + module *modules.Module + + // Packets is a stream of interception network packest. Packets = make(chan packet.Packet, 1000) + // BandwidthUpdates is a stream of bandwidth usage update for connections. + BandwidthUpdates = make(chan *packet.BandwidthUpdate, 1000) + disableInterception bool ) func init() { flag.BoolVar(&disableInterception, "disable-interception", false, "disable packet interception; this breaks a lot of functionality") + + module = modules.Register("interception", prep, start, stop, "base", "updates", "network", "notifications", "profiles") +} + +func prep() error { + return nil } // Start starts the interception. -func Start() error { +func start() error { if disableInterception { log.Warning("interception: packet interception is disabled via flag - this breaks a lot of functionality") return nil @@ -36,16 +50,16 @@ func Start() error { }() } - return start(inputPackets) + return startInterception(inputPackets) } // Stop starts the interception. -func Stop() error { +func stop() error { if disableInterception { return nil } close(metrics.done) - return stop() + return stopInterception() } diff --git a/firewall/interception/interception_default.go b/firewall/interception/interception_default.go index 94f5c51f..222a041c 100644 --- a/firewall/interception/interception_default.go +++ b/firewall/interception/interception_default.go @@ -9,13 +9,13 @@ import ( ) // start starts the interception. -func start(_ chan packet.Packet) error { +func startInterception(_ chan packet.Packet) error { log.Critical("interception: this platform has no support for packet interception - a lot of functionality will be broken") return nil } // stop starts the interception. -func stop() error { +func stopInterception() error { return nil } diff --git a/firewall/interception/interception_linux.go b/firewall/interception/interception_linux.go index b657c080..128f6649 100644 --- a/firewall/interception/interception_linux.go +++ b/firewall/interception/interception_linux.go @@ -1,7 +1,10 @@ package interception import ( - // bandwidth "github.com/safing/portmaster/firewall/interception/ebpf/bandwidth" + "context" + "time" + + bandwidth "github.com/safing/portmaster/firewall/interception/ebpf/bandwidth" conn_listener "github.com/safing/portmaster/firewall/interception/ebpf/connection_listener" "github.com/safing/portmaster/firewall/interception/nfq" "github.com/safing/portmaster/network" @@ -9,20 +12,28 @@ import ( ) // start starts the interception. -func start(ch chan packet.Packet) error { - // Start ebpf new connection listener - conn_listener.StartEBPFWorker(ch) - // Start ebpf bandwidth listener - // bandwidth.SetupBandwidthInterface() - return StartNfqueueInterception(ch) +func startInterception(packets chan packet.Packet) error { + // Start packet interception via nfqueue. + err := StartNfqueueInterception(packets) + if err != nil { + return err + } + + // Start ebpf new connection listener. + module.StartServiceWorker("ebpf connection listener", 0, func(ctx context.Context) error { + return conn_listener.ConnectionListenerWorker(ctx, packets) + }) + + // Start ebpf bandwidth stats monitor. + module.StartServiceWorker("ebpf bandwidth stats monitor", 0, func(ctx context.Context) error { + return bandwidth.BandwidthStatsWorker(ctx, 1*time.Second, BandwidthUpdates) + }) + + return nil } // stop starts the interception. -func stop() error { - // Stop ebpf connection listener - conn_listener.StopEBPFWorker() - // Stop ebpf bandwidth listener - // bandwidth.ShutdownBandwithInterface() +func stopInterception() error { return StopNfqueueInterception() } diff --git a/firewall/interception/interception_windows.go b/firewall/interception/interception_windows.go index 95974545..069f5c01 100644 --- a/firewall/interception/interception_windows.go +++ b/firewall/interception/interception_windows.go @@ -1,7 +1,9 @@ package interception import ( + "context" "fmt" + "time" "github.com/safing/portmaster/firewall/interception/windowskext" "github.com/safing/portmaster/network" @@ -10,7 +12,7 @@ import ( ) // start starts the interception. -func start(ch chan packet.Packet) error { +func startInterception(packets chan packet.Packet) error { kextFile, err := updates.GetPlatformFile("kext/portmaster-kext.sys") if err != nil { return fmt.Errorf("interception: could not get kext sys: %s", err) @@ -26,16 +28,22 @@ func start(ch chan packet.Packet) error { return fmt.Errorf("interception: could not start windows kext: %s", err) } - go windowskext.Handler(ch) + // Start packet handler. + module.StartServiceWorker("kext packet handler", 0, func(ctx context.Context) error { + windowskext.Handler(ctx, packets) + return nil + }) - // Example worker for the bandwidth data stats. Not ment for production. - // windowskext.StartBandwidthWorker() + // Start bandwidth stats monitor. + module.StartServiceWorker("kext bandwidth stats monitor", 0, func(ctx context.Context) error { + return windowskext.BandwidthStatsWorker(ctx, 1*time.Second, BandwidthUpdates) + }) return nil } // stop starts the interception. -func stop() error { +func stopInterception() error { return windowskext.Stop() } diff --git a/firewall/interception/nfqueue_linux.go b/firewall/interception/nfqueue_linux.go index 2ba1ab8f..2e632813 100644 --- a/firewall/interception/nfqueue_linux.go +++ b/firewall/interception/nfqueue_linux.go @@ -1,6 +1,7 @@ package interception import ( + "context" "flag" "fmt" "sort" @@ -257,30 +258,30 @@ func StartNfqueueInterception(packets chan<- packet.Packet) (err error) { err = activateNfqueueFirewall() if err != nil { - _ = Stop() + _ = StopNfqueueInterception() return fmt.Errorf("could not initialize nfqueue: %w", err) } out4Queue, err = nfq.New(17040, false) if err != nil { - _ = Stop() + _ = StopNfqueueInterception() return fmt.Errorf("nfqueue(IPv4, out): %w", err) } in4Queue, err = nfq.New(17140, false) if err != nil { - _ = Stop() + _ = StopNfqueueInterception() return fmt.Errorf("nfqueue(IPv4, in): %w", err) } if netenv.IPv6Enabled() { out6Queue, err = nfq.New(17060, true) if err != nil { - _ = Stop() + _ = StopNfqueueInterception() return fmt.Errorf("nfqueue(IPv6, out): %w", err) } in6Queue, err = nfq.New(17160, true) if err != nil { - _ = Stop() + _ = StopNfqueueInterception() return fmt.Errorf("nfqueue(IPv6, in): %w", err) } } else { @@ -289,7 +290,9 @@ func StartNfqueueInterception(packets chan<- packet.Packet) (err error) { in6Queue = &disabledNfQueue{} } - go handleInterception(packets) + module.StartServiceWorker("nfqueue packet handler", 0, func(_ context.Context) error { + return handleInterception(packets) + }) return nil } @@ -318,12 +321,12 @@ func StopNfqueueInterception() error { return nil } -func handleInterception(packets chan<- packet.Packet) { +func handleInterception(packets chan<- packet.Packet) error { for { var pkt packet.Packet select { case <-shutdownSignal: - return + return nil case pkt = <-out4Queue.PacketChannel(): pkt.SetOutbound() case pkt = <-in4Queue.PacketChannel(): @@ -337,7 +340,7 @@ func handleInterception(packets chan<- packet.Packet) { select { case packets <- pkt: case <-shutdownSignal: - return + return nil } } }