Move interception module and better integrate workers

This commit is contained in:
Daniel 2023-07-20 13:43:55 +02:00
parent 41c5266315
commit ec85816577
7 changed files with 210 additions and 122 deletions

View file

@ -1,21 +1,27 @@
package firewall
import (
"context"
"github.com/safing/portbase/config"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
"github.com/safing/portbase/modules/subsystems"
_ "github.com/safing/portmaster/core"
"github.com/safing/portmaster/network"
)
var filterModule *modules.Module
// FIXME: rename to "module"
var module *modules.Module
func init() {
filterModule = modules.Register("filter", nil, nil, nil, "core", "interception", "intel")
module = modules.Register("filter", prep, start, stop, "core", "interception", "intel")
subsystems.Register(
"filter",
"Privacy Filter",
"DNS and Network Filter",
filterModule,
module,
"config:filter/",
&config.Option{
Name: "Privacy Filter Module",
@ -31,3 +37,82 @@ func init() {
},
)
}
const (
configChangeEvent = "config change"
profileConfigChangeEvent = "profile config change"
onSPNConnectEvent = "spn connect"
)
func prep() error {
network.SetDefaultFirewallHandler(verdictHandler)
// Reset connections every time configuration changes
// this will be triggered on spn enable/disable
err := module.RegisterEventHook(
"config",
configChangeEvent,
"reset connection verdicts",
func(ctx context.Context, _ interface{}) error {
resetAllConnectionVerdicts()
return nil
},
)
if err != nil {
log.Errorf("interception: failed registering event hook: %s", err)
}
// Reset connections every time profile changes
err = module.RegisterEventHook(
"profiles",
profileConfigChangeEvent,
"reset connection verdicts",
func(ctx context.Context, _ interface{}) error {
resetAllConnectionVerdicts()
return nil
},
)
if err != nil {
log.Errorf("failed registering event hook: %s", err)
}
// Reset connections when spn is connected
// connect and disconnecting is triggered on config change event but connecting takеs more time
err = module.RegisterEventHook(
"captain",
onSPNConnectEvent,
"reset connection verdicts",
func(ctx context.Context, _ interface{}) error {
resetAllConnectionVerdicts()
return nil
},
)
if err != nil {
log.Errorf("failed registering event hook: %s", err)
}
if err := registerConfig(); err != nil {
return err
}
return prepAPIAuth()
}
func start() error {
getConfig()
startAPIAuth()
module.StartServiceWorker("packet handler", 0, packetHandler)
module.StartServiceWorker("bandwidth update handler", 0, bandwidthUpdateHandler)
// Start stat logger if logging is set to trace.
if log.GetLogLevel() == log.TraceLevel {
module.StartServiceWorker("stat logger", 0, statLogger)
}
return nil
}
func stop() error {
return nil
}

View file

@ -13,7 +13,6 @@ import (
"github.com/tevino/abool"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
"github.com/safing/portmaster/compat"
_ "github.com/safing/portmaster/core/base"
"github.com/safing/portmaster/firewall/inspection"
@ -25,9 +24,9 @@ import (
"github.com/safing/portmaster/network/reference"
)
var (
interceptionModule *modules.Module
// FIXME: rename to "packet_handler"
var (
nameserverIPMatcher func(ip net.IP) bool
nameserverIPMatcherSet = abool.New()
nameserverIPMatcherReady = abool.New()
@ -43,71 +42,6 @@ var (
ownPID = os.Getpid()
)
const (
configChangeEvent = "config change"
profileConfigChangeEvent = "profile config change"
onSPNConnectEvent = "spn connect"
)
func init() {
// TODO: Move interception module to own package (dir).
interceptionModule = modules.Register("interception", interceptionPrep, interceptionStart, interceptionStop, "base", "updates", "network", "notifications", "profiles")
network.SetDefaultFirewallHandler(verdictHandler)
}
func interceptionPrep() error {
// Reset connections every time configuration changes
// this will be triggered on spn enable/disable
err := interceptionModule.RegisterEventHook(
"config",
configChangeEvent,
"reset connection verdicts",
func(ctx context.Context, _ interface{}) error {
resetAllConnectionVerdicts()
return nil
},
)
if err != nil {
log.Errorf("interception: failed registering event hook: %s", err)
}
// Reset connections every time profile changes
err = interceptionModule.RegisterEventHook(
"profiles",
profileConfigChangeEvent,
"reset connection verdicts",
func(ctx context.Context, _ interface{}) error {
resetAllConnectionVerdicts()
return nil
},
)
if err != nil {
log.Errorf("failed registering event hook: %s", err)
}
// Reset connections when spn is connected
// connect and disconnecting is triggered on config change event but connecting takеs more time
err = interceptionModule.RegisterEventHook(
"captain",
onSPNConnectEvent,
"reset connection verdicts",
func(ctx context.Context, _ interface{}) error {
resetAllConnectionVerdicts()
return nil
},
)
if err != nil {
log.Errorf("failed registering event hook: %s", err)
}
if err := registerConfig(); err != nil {
return err
}
return prepAPIAuth()
}
func resetAllConnectionVerdicts() {
// Resetting will force all the connection to be evaluated by the firewall again
// this will set new verdicts if configuration was update or spn has been disabled or enabled.
@ -169,24 +103,6 @@ func resetAllConnectionVerdicts() {
tracer.Submit()
}
func interceptionStart() error {
getConfig()
startAPIAuth()
interceptionModule.StartServiceWorker("packet handler", 0, packetHandler)
// Start stat logger if logging is set to trace.
if log.GetLogLevel() == log.TraceLevel {
interceptionModule.StartServiceWorker("stat logger", 0, statLogger)
}
return interception.Start()
}
func interceptionStop() error {
return interception.Stop()
}
// SetNameserverIPMatcher sets a function that is used to match the internal
// nameserver IP(s). Can only bet set once.
func SetNameserverIPMatcher(fn func(ip net.IP) bool) error {
@ -695,6 +611,57 @@ func packetHandler(ctx context.Context) error {
}
}
func bandwidthUpdateHandler(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case bwUpdate := <-interception.BandwidthUpdates:
if bwUpdate != nil {
updateBandwidth(bwUpdate)
// DEBUG:
// log.Debugf("filter: bandwidth update: %s", bwUpdate)
} else {
return errors.New("received nil bandwidth update from interception")
}
}
}
}
func updateBandwidth(bwUpdate *packet.BandwidthUpdate) {
// Check if update makes sense.
if bwUpdate.RecvBytes == 0 && bwUpdate.SentBytes == 0 {
return
}
// Get connection.
conn, ok := network.GetConnection(bwUpdate.ConnID)
if !ok {
return
}
// Do not wait for connections that are locked.
// TODO: Use atomic operations for updating bandwidth stats.
if !conn.TryLock() {
return
}
defer conn.Unlock()
// Update stats according to method.
switch bwUpdate.Method {
case packet.Absolute:
conn.RecvBytes = bwUpdate.RecvBytes
conn.SentBytes = bwUpdate.SentBytes
case packet.Additive:
conn.RecvBytes += bwUpdate.RecvBytes
conn.SentBytes += bwUpdate.SentBytes
default:
log.Warningf("filter: unsupported bandwidth update method: %d", bwUpdate.Method)
}
// TODO: Send update.
}
func statLogger(ctx context.Context) error {
for {
select {

View file

@ -4,22 +4,36 @@ import (
"flag"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
"github.com/safing/portmaster/network/packet"
)
// FIXME: rename to "module"
var (
// Packets channel for feeding the firewall.
module *modules.Module
// Packets is a stream of interception network packest.
Packets = make(chan packet.Packet, 1000)
// BandwidthUpdates is a stream of bandwidth usage update for connections.
BandwidthUpdates = make(chan *packet.BandwidthUpdate, 1000)
disableInterception bool
)
func init() {
flag.BoolVar(&disableInterception, "disable-interception", false, "disable packet interception; this breaks a lot of functionality")
module = modules.Register("interception", prep, start, stop, "base", "updates", "network", "notifications", "profiles")
}
func prep() error {
return nil
}
// Start starts the interception.
func Start() error {
func start() error {
if disableInterception {
log.Warning("interception: packet interception is disabled via flag - this breaks a lot of functionality")
return nil
@ -36,16 +50,16 @@ func Start() error {
}()
}
return start(inputPackets)
return startInterception(inputPackets)
}
// Stop starts the interception.
func Stop() error {
func stop() error {
if disableInterception {
return nil
}
close(metrics.done)
return stop()
return stopInterception()
}

View file

@ -9,13 +9,13 @@ import (
)
// start starts the interception.
func start(_ chan packet.Packet) error {
func startInterception(_ chan packet.Packet) error {
log.Critical("interception: this platform has no support for packet interception - a lot of functionality will be broken")
return nil
}
// stop starts the interception.
func stop() error {
func stopInterception() error {
return nil
}

View file

@ -1,7 +1,10 @@
package interception
import (
// bandwidth "github.com/safing/portmaster/firewall/interception/ebpf/bandwidth"
"context"
"time"
bandwidth "github.com/safing/portmaster/firewall/interception/ebpf/bandwidth"
conn_listener "github.com/safing/portmaster/firewall/interception/ebpf/connection_listener"
"github.com/safing/portmaster/firewall/interception/nfq"
"github.com/safing/portmaster/network"
@ -9,20 +12,28 @@ import (
)
// start starts the interception.
func start(ch chan packet.Packet) error {
// Start ebpf new connection listener
conn_listener.StartEBPFWorker(ch)
// Start ebpf bandwidth listener
// bandwidth.SetupBandwidthInterface()
return StartNfqueueInterception(ch)
func startInterception(packets chan packet.Packet) error {
// Start packet interception via nfqueue.
err := StartNfqueueInterception(packets)
if err != nil {
return err
}
// Start ebpf new connection listener.
module.StartServiceWorker("ebpf connection listener", 0, func(ctx context.Context) error {
return conn_listener.ConnectionListenerWorker(ctx, packets)
})
// Start ebpf bandwidth stats monitor.
module.StartServiceWorker("ebpf bandwidth stats monitor", 0, func(ctx context.Context) error {
return bandwidth.BandwidthStatsWorker(ctx, 1*time.Second, BandwidthUpdates)
})
return nil
}
// stop starts the interception.
func stop() error {
// Stop ebpf connection listener
conn_listener.StopEBPFWorker()
// Stop ebpf bandwidth listener
// bandwidth.ShutdownBandwithInterface()
func stopInterception() error {
return StopNfqueueInterception()
}

View file

@ -1,7 +1,9 @@
package interception
import (
"context"
"fmt"
"time"
"github.com/safing/portmaster/firewall/interception/windowskext"
"github.com/safing/portmaster/network"
@ -10,7 +12,7 @@ import (
)
// start starts the interception.
func start(ch chan packet.Packet) error {
func startInterception(packets chan packet.Packet) error {
kextFile, err := updates.GetPlatformFile("kext/portmaster-kext.sys")
if err != nil {
return fmt.Errorf("interception: could not get kext sys: %s", err)
@ -26,16 +28,22 @@ func start(ch chan packet.Packet) error {
return fmt.Errorf("interception: could not start windows kext: %s", err)
}
go windowskext.Handler(ch)
// Start packet handler.
module.StartServiceWorker("kext packet handler", 0, func(ctx context.Context) error {
windowskext.Handler(ctx, packets)
return nil
})
// Example worker for the bandwidth data stats. Not ment for production.
// windowskext.StartBandwidthWorker()
// Start bandwidth stats monitor.
module.StartServiceWorker("kext bandwidth stats monitor", 0, func(ctx context.Context) error {
return windowskext.BandwidthStatsWorker(ctx, 1*time.Second, BandwidthUpdates)
})
return nil
}
// stop starts the interception.
func stop() error {
func stopInterception() error {
return windowskext.Stop()
}

View file

@ -1,6 +1,7 @@
package interception
import (
"context"
"flag"
"fmt"
"sort"
@ -257,30 +258,30 @@ func StartNfqueueInterception(packets chan<- packet.Packet) (err error) {
err = activateNfqueueFirewall()
if err != nil {
_ = Stop()
_ = StopNfqueueInterception()
return fmt.Errorf("could not initialize nfqueue: %w", err)
}
out4Queue, err = nfq.New(17040, false)
if err != nil {
_ = Stop()
_ = StopNfqueueInterception()
return fmt.Errorf("nfqueue(IPv4, out): %w", err)
}
in4Queue, err = nfq.New(17140, false)
if err != nil {
_ = Stop()
_ = StopNfqueueInterception()
return fmt.Errorf("nfqueue(IPv4, in): %w", err)
}
if netenv.IPv6Enabled() {
out6Queue, err = nfq.New(17060, true)
if err != nil {
_ = Stop()
_ = StopNfqueueInterception()
return fmt.Errorf("nfqueue(IPv6, out): %w", err)
}
in6Queue, err = nfq.New(17160, true)
if err != nil {
_ = Stop()
_ = StopNfqueueInterception()
return fmt.Errorf("nfqueue(IPv6, in): %w", err)
}
} else {
@ -289,7 +290,9 @@ func StartNfqueueInterception(packets chan<- packet.Packet) (err error) {
in6Queue = &disabledNfQueue{}
}
go handleInterception(packets)
module.StartServiceWorker("nfqueue packet handler", 0, func(_ context.Context) error {
return handleInterception(packets)
})
return nil
}
@ -318,12 +321,12 @@ func StopNfqueueInterception() error {
return nil
}
func handleInterception(packets chan<- packet.Packet) {
func handleInterception(packets chan<- packet.Packet) error {
for {
var pkt packet.Packet
select {
case <-shutdownSignal:
return
return nil
case pkt = <-out4Queue.PacketChannel():
pkt.SetOutbound()
case pkt = <-in4Queue.PacketChannel():
@ -337,7 +340,7 @@ func handleInterception(packets chan<- packet.Packet) {
select {
case packets <- pkt:
case <-shutdownSignal:
return
return nil
}
}
}