Fix netquery connection ID generation

This commit is contained in:
Daniel 2023-08-04 21:49:57 +02:00
parent aa0b42dd01
commit d0f4392b9d
4 changed files with 55 additions and 41 deletions

View file

@ -628,9 +628,9 @@ func bandwidthUpdateHandler(ctx context.Context) error {
return nil
case bwUpdate := <-interception.BandwidthUpdates:
if bwUpdate != nil {
updateBandwidth(ctx, bwUpdate)
// DEBUG:
// log.Debugf("filter: bandwidth update: %s", bwUpdate)
updateBandwidth(ctx, bwUpdate)
} else {
return errors.New("received nil bandwidth update from interception")
}
@ -653,6 +653,8 @@ func updateBandwidth(ctx context.Context, bwUpdate *packet.BandwidthUpdate) {
// Do not wait for connections that are locked.
// TODO: Use atomic operations for updating bandwidth stats.
if !conn.TryLock() {
// DEBUG:
// log.Warningf("filter: failed to lock connection for bandwidth update: %s", conn)
return
}
defer conn.Unlock()
@ -675,7 +677,7 @@ func updateBandwidth(ctx context.Context, bwUpdate *packet.BandwidthUpdate) {
if err := netquery.DefaultModule.Store.UpdateBandwidth(
ctx,
conn.HistoryEnabled,
conn.Process().GetID(),
conn.Process().GetKey(),
conn.ID,
conn.BytesReceived,
conn.BytesSent,

View file

@ -2,8 +2,6 @@ package netquery
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
@ -395,12 +393,8 @@ func (db *Database) MarkAllHistoryConnectionsEnded(ctx context.Context) error {
// UpdateBandwidth updates bandwidth data for the connection and optionally also writes
// the bandwidth data to the history database.
func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, bytesReceived uint64, bytesSent uint64) error {
data := connID + "-" + processKey
hash := sha256.Sum256([]byte(data))
dbConnID := hex.EncodeToString(hash[:])
params := map[string]any{
":id": dbConnID,
":id": makeNqIDFromParts(processKey, connID),
}
parts := []string{}
@ -481,6 +475,12 @@ func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) err
// and save some CPU cycles for the user
dbNames := []DatabaseName{LiveDatabase}
// TODO: Should we only add ended connection to the history database to save
// a couple INSERTs per connection?
// This means we need to write the current live DB to the history DB on
// shutdown in order to be able to pick the back up after a restart.
// Save to history DB if enabled.
if enableHistory {
dbNames = append(dbNames, HistoryDatabase)
}

View file

@ -100,24 +100,30 @@ func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connect
if !ok {
return
}
func() {
conn.Lock()
defer conn.Unlock()
if !conn.DataIsComplete() {
continue
return
}
model, err := convertConnection(conn)
if err != nil {
log.Errorf("netquery: failed to convert connection %s to sqlite model: %s", conn.ID, err)
continue
return
}
// DEBUG:
// log.Tracef("netquery: updating connection %s", conn.ID)
if err := mng.store.Save(ctx, *model, conn.HistoryEnabled); err != nil {
// Save to netquery database.
// Do not include internal connections in history.
if err := mng.store.Save(ctx, *model, conn.HistoryEnabled && !conn.Internal); err != nil {
log.Errorf("netquery: failed to save connection %s in sqlite database: %s", conn.ID, err)
continue
return
}
// we clone the record metadata from the connection
@ -129,6 +135,7 @@ func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connect
if err := mng.pushConnUpdate(ctx, *cloned, *model); err != nil {
log.Errorf("netquery: failed to push update for conn %s via database system: %s", conn.ID, err)
}
}()
}
}
}
@ -155,18 +162,16 @@ func (mng *Manager) pushConnUpdate(_ context.Context, meta record.Meta, conn Con
}
// convertConnection converts conn to the local representation used
// to persist the information in SQLite. convertConnection attempts
// to lock conn and may thus block for some time.
// to persist the information in SQLite.
// The caller must hold the lock to the given network.Connection.
func convertConnection(conn *network.Connection) (*Conn, error) {
conn.Lock()
defer conn.Unlock()
direction := "outbound"
if conn.Inbound {
direction = "inbound"
}
c := Conn{
ID: genConnID(conn),
ID: makeNqIDFromConn(conn),
External: conn.External,
IPVersion: conn.IPVersion,
IPProtocol: conn.IPProtocol,
@ -265,6 +270,13 @@ func convertConnection(conn *network.Connection) (*Conn, error) {
return &c, nil
}
func genConnID(conn *network.Connection) string {
return conn.ID + "-" + conn.Process().GetID()
// makeNqIDFromConn creates a netquery connection ID from the network connection.
func makeNqIDFromConn(conn *network.Connection) string {
return makeNqIDFromParts(conn.Process().GetKey(), conn.ID)
}
// makeNqIDFromParts creates a netquery connection ID from the given network
// connection ID and the process key.
func makeNqIDFromParts(processKey string, netConnID string) string {
return processKey + "-" + netConnID
}

View file

@ -314,10 +314,10 @@ func loadProcess(ctx context.Context, key string, pInfo *processInfo.Process) (*
return process, nil
}
// GetID returns the key that is used internally to identify the process.
// The ID consists of the PID and the start time of the process as reported by
// GetKey returns the key that is used internally to identify the process.
// The key consists of the PID and the start time of the process as reported by
// the system.
func (p *Process) GetID() string {
func (p *Process) GetKey() string {
return p.processKey
}