mirror of
https://github.com/safing/portmaster
synced 2025-09-01 18:19:12 +00:00
Persist bandwidth data in netquery DBs when enabled
This commit is contained in:
parent
b7fd1fc76a
commit
5dcb6b268f
6 changed files with 69 additions and 45 deletions
|
@ -14,7 +14,7 @@ import (
|
|||
var module *modules.Module
|
||||
|
||||
func init() {
|
||||
module = modules.Register("filter", prep, start, stop, "core", "interception", "intel")
|
||||
module = modules.Register("filter", prep, start, stop, "core", "interception", "intel", "netquery")
|
||||
subsystems.Register(
|
||||
"filter",
|
||||
"Privacy Filter",
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/safing/portmaster/firewall/inspection"
|
||||
"github.com/safing/portmaster/firewall/interception"
|
||||
"github.com/safing/portmaster/netenv"
|
||||
"github.com/safing/portmaster/netquery"
|
||||
"github.com/safing/portmaster/network"
|
||||
"github.com/safing/portmaster/network/netutils"
|
||||
"github.com/safing/portmaster/network/packet"
|
||||
|
@ -616,7 +617,7 @@ func bandwidthUpdateHandler(ctx context.Context) error {
|
|||
return nil
|
||||
case bwUpdate := <-interception.BandwidthUpdates:
|
||||
if bwUpdate != nil {
|
||||
updateBandwidth(bwUpdate)
|
||||
updateBandwidth(ctx, bwUpdate)
|
||||
// DEBUG:
|
||||
// log.Debugf("filter: bandwidth update: %s", bwUpdate)
|
||||
} else {
|
||||
|
@ -626,7 +627,7 @@ func bandwidthUpdateHandler(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
func updateBandwidth(bwUpdate *packet.BandwidthUpdate) {
|
||||
func updateBandwidth(ctx context.Context, bwUpdate *packet.BandwidthUpdate) {
|
||||
// Check if update makes sense.
|
||||
if bwUpdate.RecvBytes == 0 && bwUpdate.SentBytes == 0 {
|
||||
return
|
||||
|
@ -657,7 +658,18 @@ func updateBandwidth(bwUpdate *packet.BandwidthUpdate) {
|
|||
log.Warningf("filter: unsupported bandwidth update method: %d", bwUpdate.Method)
|
||||
}
|
||||
|
||||
// TODO: Send update.
|
||||
if netquery.DefaultModule != nil && conn.BandwidthEnabled {
|
||||
if err := netquery.DefaultModule.Store.UpdateBandwidth(
|
||||
ctx,
|
||||
conn.HistoryEnabled,
|
||||
conn.Process().GetID(),
|
||||
conn.ID,
|
||||
&conn.RecvBytes,
|
||||
&conn.SentBytes,
|
||||
); err != nil {
|
||||
log.Errorf("firewall: failed to persist bandwidth data: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func statLogger(ctx context.Context) error {
|
||||
|
|
|
@ -101,8 +101,8 @@ type (
|
|||
Allowed *bool `sqlite:"allowed"`
|
||||
ProfileRevision int `sqlite:"profile_revision"`
|
||||
ExitNode *string `sqlite:"exit_node"`
|
||||
BWIncoming uint64 `sqlite:"bw_incoming,default=0"`
|
||||
BWOutgoing uint64 `sqlite:"bw_outgoing,default=0"`
|
||||
BytesReceived uint64 `sqlite:"bytes_received,default=0"`
|
||||
BytesSent uint64 `sqlite:"bytes_sent,default=0"`
|
||||
|
||||
// TODO(ppacher): support "NOT" in search query to get rid of the following helper fields
|
||||
Active bool `sqlite:"active"` // could use "ended IS NOT NULL" or "ended IS NULL"
|
||||
|
@ -400,13 +400,13 @@ func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, pro
|
|||
|
||||
parts := []string{}
|
||||
if incoming != nil {
|
||||
parts = append(parts, "bw_incoming = :bw_incoming")
|
||||
params[":bw_incoming"] = *incoming
|
||||
parts = append(parts, "bytes_received = :bytes_received")
|
||||
params[":bytes_received"] = *incoming
|
||||
}
|
||||
|
||||
if outgoing != nil {
|
||||
parts = append(parts, "bw_outgoing = :bw_outgoing")
|
||||
params[":bw_outgoing"] = *outgoing
|
||||
parts = append(parts, "bytes_sent = :bytes_sent")
|
||||
params[":bytes_sent"] = *outgoing
|
||||
}
|
||||
|
||||
updateSet := strings.Join(parts, ", ")
|
||||
|
@ -438,11 +438,11 @@ func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, pro
|
|||
// connection pool.
|
||||
func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) error {
|
||||
// convert the connection to a param map where each key is already translated
|
||||
// to the sql column name. We also skip bw_incoming and bw_outgoing since those
|
||||
// to the sql column name. We also skip bytes_received and bytes_sent since those
|
||||
// will be updated independenly from the connection object.
|
||||
connMap, err := orm.ToParamMap(ctx, conn, "", orm.DefaultEncodeConfig, []string{
|
||||
"bw_incoming",
|
||||
"bw_outgoing",
|
||||
"bytes_received",
|
||||
"bytes_sent",
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to encode connection for SQL: %w", err)
|
||||
|
|
|
@ -13,8 +13,6 @@ import (
|
|||
"github.com/safing/portbase/log"
|
||||
"github.com/safing/portbase/runtime"
|
||||
"github.com/safing/portmaster/network"
|
||||
"github.com/safing/spn/access"
|
||||
"github.com/safing/spn/access/account"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -117,20 +115,7 @@ func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connect
|
|||
|
||||
log.Tracef("netquery: updating connection %s", conn.ID)
|
||||
|
||||
// check if we should persist the connection in the history database.
|
||||
// Also make sure the current SPN User/subscription allows use of the history.
|
||||
historyEnabled := conn.Process().Profile().HistoryEnabled()
|
||||
if historyEnabled {
|
||||
user, err := access.GetUser()
|
||||
if err != nil {
|
||||
// there was an error so disable history
|
||||
historyEnabled = false
|
||||
} else if !user.MayUse(account.FeatureHistory) {
|
||||
historyEnabled = false
|
||||
}
|
||||
}
|
||||
|
||||
if err := mng.store.Save(ctx, *model, historyEnabled); err != nil {
|
||||
if err := mng.store.Save(ctx, *model, conn.HistoryEnabled); err != nil {
|
||||
log.Errorf("netquery: failed to save connection %s in sqlite database: %s", conn.ID, err)
|
||||
|
||||
continue
|
||||
|
|
|
@ -19,22 +19,26 @@ import (
|
|||
"github.com/safing/portmaster/network"
|
||||
)
|
||||
|
||||
var DefaultModule *module
|
||||
|
||||
type module struct {
|
||||
*modules.Module
|
||||
|
||||
db *database.Interface
|
||||
sqlStore *Database
|
||||
mng *Manager
|
||||
feed chan *network.Connection
|
||||
Store *Database
|
||||
|
||||
db *database.Interface
|
||||
mng *Manager
|
||||
feed chan *network.Connection
|
||||
}
|
||||
|
||||
func init() {
|
||||
m := new(module)
|
||||
m.Module = modules.Register(
|
||||
DefaultModule = new(module)
|
||||
|
||||
DefaultModule.Module = modules.Register(
|
||||
"netquery",
|
||||
m.prepare,
|
||||
m.start,
|
||||
m.stop,
|
||||
DefaultModule.prepare,
|
||||
DefaultModule.start,
|
||||
DefaultModule.stop,
|
||||
"api",
|
||||
"network",
|
||||
"database",
|
||||
|
@ -44,7 +48,7 @@ func init() {
|
|||
"history",
|
||||
"Network History",
|
||||
"Keep Network History Data",
|
||||
m.Module,
|
||||
DefaultModule.Module,
|
||||
"config:history/",
|
||||
nil,
|
||||
)
|
||||
|
@ -58,12 +62,12 @@ func (m *module) prepare() error {
|
|||
Internal: true,
|
||||
})
|
||||
|
||||
m.sqlStore, err = NewInMemory()
|
||||
m.Store, err = NewInMemory()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create in-memory database: %w", err)
|
||||
}
|
||||
|
||||
m.mng, err = NewManager(m.sqlStore, "netquery/data/", runtime.DefaultRegistry)
|
||||
m.mng, err = NewManager(m.Store, "netquery/data/", runtime.DefaultRegistry)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create manager: %w", err)
|
||||
}
|
||||
|
@ -71,12 +75,12 @@ func (m *module) prepare() error {
|
|||
m.feed = make(chan *network.Connection, 1000)
|
||||
|
||||
queryHander := &QueryHandler{
|
||||
Database: m.sqlStore,
|
||||
Database: m.Store,
|
||||
IsDevMode: config.Concurrent.GetAsBool(config.CfgDevModeKey, false),
|
||||
}
|
||||
|
||||
chartHandler := &ChartHandler{
|
||||
Database: m.sqlStore,
|
||||
Database: m.Store,
|
||||
}
|
||||
|
||||
if err := api.RegisterEndpoint(api.Endpoint{
|
||||
|
@ -204,7 +208,7 @@ func (m *module) start() error {
|
|||
return nil
|
||||
case <-time.After(10 * time.Second):
|
||||
threshold := time.Now().Add(-network.DeleteConnsAfterEndedThreshold)
|
||||
count, err := m.sqlStore.Cleanup(ctx, threshold)
|
||||
count, err := m.Store.Cleanup(ctx, threshold)
|
||||
if err != nil {
|
||||
log.Errorf("netquery: failed to count number of rows in memory: %s", err)
|
||||
} else {
|
||||
|
@ -218,7 +222,7 @@ func (m *module) start() error {
|
|||
// the runtime database.
|
||||
// Only expose in development mode.
|
||||
if config.GetAsBool(config.CfgDevModeKey, false)() {
|
||||
_, err := NewRuntimeQueryRunner(m.sqlStore, "netquery/query/", runtime.DefaultRegistry)
|
||||
_, err := NewRuntimeQueryRunner(m.Store, "netquery/query/", runtime.DefaultRegistry)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set up runtime SQL query runner: %w", err)
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ import (
|
|||
"github.com/safing/portmaster/process"
|
||||
_ "github.com/safing/portmaster/process/tags"
|
||||
"github.com/safing/portmaster/resolver"
|
||||
"github.com/safing/spn/access"
|
||||
"github.com/safing/spn/access/account"
|
||||
"github.com/safing/spn/navigator"
|
||||
)
|
||||
|
||||
|
@ -218,6 +220,13 @@ type Connection struct { //nolint:maligned // TODO: fix alignment
|
|||
// addedToMetrics signifies if the connection has already been counted in
|
||||
// the metrics.
|
||||
addedToMetrics bool
|
||||
|
||||
// HistoryEnabled is set to true when the connection should be persisted
|
||||
// in the history database.
|
||||
HistoryEnabled bool
|
||||
// BanwidthEnabled is set to true if connection bandwidth data should be persisted
|
||||
// in netquery.
|
||||
BandwidthEnabled bool
|
||||
}
|
||||
|
||||
// Reason holds information justifying a verdict, as well as additional
|
||||
|
@ -420,7 +429,21 @@ func (conn *Connection) GatherConnectionInfo(pkt packet.Packet) (err error) {
|
|||
// Inherit internal status of profile.
|
||||
if localProfile := conn.process.Profile().LocalProfile(); localProfile != nil {
|
||||
conn.Internal = localProfile.Internal
|
||||
|
||||
// check if we should persist the connection in the history database.
|
||||
// Also make sure the current SPN User/subscription allows use of the history.
|
||||
user, err := access.GetUser()
|
||||
if err == nil {
|
||||
if user.MayUse(account.FeatureHistory) {
|
||||
conn.HistoryEnabled = localProfile.HistoryEnabled()
|
||||
}
|
||||
|
||||
if user.MayUse(account.FeatureBWVis) {
|
||||
conn.BandwidthEnabled = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
conn.process = nil
|
||||
if pkt.InfoOnly() {
|
||||
|
|
Loading…
Add table
Reference in a new issue