diff --git a/firewall/packet_handler.go b/firewall/packet_handler.go index cf5e57bf..7c364451 100644 --- a/firewall/packet_handler.go +++ b/firewall/packet_handler.go @@ -628,9 +628,9 @@ func bandwidthUpdateHandler(ctx context.Context) error { return nil case bwUpdate := <-interception.BandwidthUpdates: if bwUpdate != nil { - updateBandwidth(ctx, bwUpdate) // DEBUG: // log.Debugf("filter: bandwidth update: %s", bwUpdate) + updateBandwidth(ctx, bwUpdate) } else { return errors.New("received nil bandwidth update from interception") } @@ -653,6 +653,8 @@ func updateBandwidth(ctx context.Context, bwUpdate *packet.BandwidthUpdate) { // Do not wait for connections that are locked. // TODO: Use atomic operations for updating bandwidth stats. if !conn.TryLock() { + // DEBUG: + // log.Warningf("filter: failed to lock connection for bandwidth update: %s", conn) return } defer conn.Unlock() @@ -675,7 +677,7 @@ func updateBandwidth(ctx context.Context, bwUpdate *packet.BandwidthUpdate) { if err := netquery.DefaultModule.Store.UpdateBandwidth( ctx, conn.HistoryEnabled, - conn.Process().GetID(), + conn.Process().GetKey(), conn.ID, conn.BytesReceived, conn.BytesSent, diff --git a/netquery/database.go b/netquery/database.go index 00e4f006..7823a700 100644 --- a/netquery/database.go +++ b/netquery/database.go @@ -2,8 +2,6 @@ package netquery import ( "context" - "crypto/sha256" - "encoding/hex" "encoding/json" "fmt" "io" @@ -395,12 +393,8 @@ func (db *Database) MarkAllHistoryConnectionsEnded(ctx context.Context) error { // UpdateBandwidth updates bandwidth data for the connection and optionally also writes // the bandwidth data to the history database. func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, bytesReceived uint64, bytesSent uint64) error { - data := connID + "-" + processKey - hash := sha256.Sum256([]byte(data)) - dbConnID := hex.EncodeToString(hash[:]) - params := map[string]any{ - ":id": dbConnID, + ":id": makeNqIDFromParts(processKey, connID), } parts := []string{} @@ -481,6 +475,12 @@ func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) err // and save some CPU cycles for the user dbNames := []DatabaseName{LiveDatabase} + // TODO: Should we only add ended connection to the history database to save + // a couple INSERTs per connection? + // This means we need to write the current live DB to the history DB on + // shutdown in order to be able to pick the back up after a restart. + + // Save to history DB if enabled. if enableHistory { dbNames = append(dbNames, HistoryDatabase) } diff --git a/netquery/manager.go b/netquery/manager.go index b5db2ca0..9116cbad 100644 --- a/netquery/manager.go +++ b/netquery/manager.go @@ -100,35 +100,42 @@ func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connect if !ok { return } - if !conn.DataIsComplete() { - continue - } - model, err := convertConnection(conn) - if err != nil { - log.Errorf("netquery: failed to convert connection %s to sqlite model: %s", conn.ID, err) + func() { + conn.Lock() + defer conn.Unlock() - continue - } + if !conn.DataIsComplete() { + return + } - // DEBUG: - // log.Tracef("netquery: updating connection %s", conn.ID) + model, err := convertConnection(conn) + if err != nil { + log.Errorf("netquery: failed to convert connection %s to sqlite model: %s", conn.ID, err) - if err := mng.store.Save(ctx, *model, conn.HistoryEnabled); err != nil { - log.Errorf("netquery: failed to save connection %s in sqlite database: %s", conn.ID, err) + return + } - continue - } + // DEBUG: + // log.Tracef("netquery: updating connection %s", conn.ID) - // we clone the record metadata from the connection - // into the new model so the portbase/database layer - // can handle NEW/UPDATE correctly. - cloned := conn.Meta().Duplicate() + // Save to netquery database. + // Do not include internal connections in history. + if err := mng.store.Save(ctx, *model, conn.HistoryEnabled && !conn.Internal); err != nil { + log.Errorf("netquery: failed to save connection %s in sqlite database: %s", conn.ID, err) + return + } - // push an update for the connection - if err := mng.pushConnUpdate(ctx, *cloned, *model); err != nil { - log.Errorf("netquery: failed to push update for conn %s via database system: %s", conn.ID, err) - } + // we clone the record metadata from the connection + // into the new model so the portbase/database layer + // can handle NEW/UPDATE correctly. + cloned := conn.Meta().Duplicate() + + // push an update for the connection + if err := mng.pushConnUpdate(ctx, *cloned, *model); err != nil { + log.Errorf("netquery: failed to push update for conn %s via database system: %s", conn.ID, err) + } + }() } } } @@ -155,18 +162,16 @@ func (mng *Manager) pushConnUpdate(_ context.Context, meta record.Meta, conn Con } // convertConnection converts conn to the local representation used -// to persist the information in SQLite. convertConnection attempts -// to lock conn and may thus block for some time. +// to persist the information in SQLite. +// The caller must hold the lock to the given network.Connection. func convertConnection(conn *network.Connection) (*Conn, error) { - conn.Lock() - defer conn.Unlock() direction := "outbound" if conn.Inbound { direction = "inbound" } c := Conn{ - ID: genConnID(conn), + ID: makeNqIDFromConn(conn), External: conn.External, IPVersion: conn.IPVersion, IPProtocol: conn.IPProtocol, @@ -265,6 +270,13 @@ func convertConnection(conn *network.Connection) (*Conn, error) { return &c, nil } -func genConnID(conn *network.Connection) string { - return conn.ID + "-" + conn.Process().GetID() +// makeNqIDFromConn creates a netquery connection ID from the network connection. +func makeNqIDFromConn(conn *network.Connection) string { + return makeNqIDFromParts(conn.Process().GetKey(), conn.ID) +} + +// makeNqIDFromParts creates a netquery connection ID from the given network +// connection ID and the process key. +func makeNqIDFromParts(processKey string, netConnID string) string { + return processKey + "-" + netConnID } diff --git a/process/process.go b/process/process.go index 733ba819..e982c901 100644 --- a/process/process.go +++ b/process/process.go @@ -314,10 +314,10 @@ func loadProcess(ctx context.Context, key string, pInfo *processInfo.Process) (* return process, nil } -// GetID returns the key that is used internally to identify the process. -// The ID consists of the PID and the start time of the process as reported by +// GetKey returns the key that is used internally to identify the process. +// The key consists of the PID and the start time of the process as reported by // the system. -func (p *Process) GetID() string { +func (p *Process) GetKey() string { return p.processKey }