package netquery import ( "context" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "time" "github.com/safing/portbase/database/record" "github.com/safing/portbase/formats/dsd" "github.com/safing/portbase/log" "github.com/safing/portbase/runtime" "github.com/safing/portmaster/network" ) type ( // ConnectionStore describes the interface that is used by Manager // to save new or updated connection objects. // It is implemented by the *Database type of this package. ConnectionStore interface { // Save is called to perists the new or updated connection. If required, // It's up the the implementation to figure out if the operation is an // insert or an update. // The ID of Conn is unique and can be trusted to never collide with other // connections of the save device. Save(context.Context, Conn) error } // Manager handles new and updated network.Connections feeds and persists them // at a connection store. // Manager also registers itself as a runtime database and pushes updates to // connections using the local format. // Users should use this update feed rather than the deprecated "network:" database. Manager struct { store ConnectionStore push runtime.PushFunc runtimeReg *runtime.Registry pushPrefix string } ) // NewManager returns a new connection manager that persists all newly created or // updated connections at store. func NewManager(store ConnectionStore, pushPrefix string, reg *runtime.Registry) (*Manager, error) { mng := &Manager{ store: store, runtimeReg: reg, pushPrefix: pushPrefix, } push, err := reg.Register(pushPrefix, runtime.SimpleValueGetterFunc(mng.runtimeGet)) if err != nil { return nil, err } mng.push = push return mng, nil } func (mng *Manager) runtimeGet(keyOrPrefix string) ([]record.Record, error) { // TODO(ppacher): // we don't yet support querying using the runtime database here ... // consider exposing connection from the database at least by ID. // // NOTE(ppacher): // for debugging purposes use RuntimeQueryRunner to execute plain // SQL queries against the database using portbase/database/runtime. return nil, nil } // HandleFeed starts reading new and updated connections from feed and persists them // in the configured ConnectionStore. HandleFeed blocks until either ctx is cancelled // or feed is closed. // Any errors encountered when processing new or updated connections are logged but // otherwise ignored. // HandleFeed handles and persists updates one after each other! Depending on the system // load the user might want to use a buffered channel for feed. func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connection) { for { select { case <-ctx.Done(): return case conn, ok := <-feed: if !ok { return } model, err := convertConnection(conn) if err != nil { log.Errorf("netquery: failed to convert connection %s to sqlite model: %s", conn.ID, err) continue } log.Tracef("netquery: updating connection %s", conn.ID) if err := mng.store.Save(ctx, *model); err != nil { log.Errorf("netquery: failed to save connection %s in sqlite database: %s", conn.ID, err) continue } // we clone the record metadata from the connection // into the new model so the portbase/database layer // can handle NEW/UPDATE correctly. cloned := conn.Meta().Duplicate() // push an update for the connection if err := mng.pushConnUpdate(ctx, *cloned, *model); err != nil { log.Errorf("netquery: failed to push update for conn %s via database system: %s", conn.ID, err) } } } } func (mng *Manager) pushConnUpdate(_ context.Context, meta record.Meta, conn Conn) error { blob, err := json.Marshal(conn) if err != nil { return fmt.Errorf("failed to marshal connection: %w", err) } key := fmt.Sprintf("%s:%s%s", mng.runtimeReg.DatabaseName(), mng.pushPrefix, conn.ID) wrapper, err := record.NewWrapper( key, &meta, dsd.JSON, blob, ) if err != nil { return fmt.Errorf("failed to create record wrapper: %w", err) } mng.push(wrapper) return nil } // convertConnection converts conn to the local representation used // to persist the information in SQLite. convertConnection attempts // to lock conn and may thus block for some time. func convertConnection(conn *network.Connection) (*Conn, error) { conn.Lock() defer conn.Unlock() direction := "outbound" if conn.Inbound { direction = "inbound" } c := Conn{ ID: genConnID(conn), External: conn.External, IPVersion: conn.IPVersion, IPProtocol: conn.IPProtocol, LocalIP: conn.LocalIP.String(), LocalPort: conn.LocalPort, Verdict: conn.Verdict, Started: time.Unix(conn.Started, 0), Tunneled: conn.Tunneled, Encrypted: conn.Encrypted, Internal: conn.Internal, Direction: direction, Type: ConnectionTypeToString[conn.Type], ProfileID: conn.ProcessContext.Source + "/" + conn.ProcessContext.Profile, Path: conn.ProcessContext.BinaryPath, ProfileRevision: int(conn.ProfileRevisionCounter), ProfileName: conn.ProcessContext.ProfileName, } switch conn.Type { case network.DNSRequest: c.Type = "dns" case network.IPConnection: c.Type = "ip" case network.Undefined: c.Type = "" } switch conn.Verdict { case network.VerdictAccept, network.VerdictRerouteToNameserver, network.VerdictRerouteToTunnel: accepted := true c.Allowed = &accepted case network.VerdictBlock, network.VerdictDrop: allowed := false c.Allowed = &allowed case network.VerdictUndecided, network.VerdictUndeterminable, network.VerdictFailed: c.Allowed = nil } if conn.Ended > 0 { ended := time.Unix(conn.Ended, 0) c.Ended = &ended c.Active = false } else { c.Active = true } extraData := map[string]interface{}{ "pid": conn.ProcessContext.PID, } if conn.TunnelContext != nil { extraData["tunnel"] = conn.TunnelContext exitNode := conn.TunnelContext.GetExitNodeID() c.ExitNode = &exitNode c.SPNUsed = true } if conn.DNSContext != nil { extraData["dns"] = conn.DNSContext } // TODO(ppacher): enable when TLS inspection is merged // if conn.TLSContext != nil { // extraData["tls"] = conn.TLSContext // } if conn.Entity != nil { extraData["cname"] = conn.Entity.CNAME extraData["blockedByLists"] = conn.Entity.BlockedByLists extraData["blockedEntities"] = conn.Entity.BlockedEntities extraData["reason"] = conn.Reason c.RemoteIP = conn.Entity.IP.String() c.RemotePort = conn.Entity.Port c.Domain = conn.Entity.Domain c.Country = conn.Entity.Country c.ASN = conn.Entity.ASN c.ASOwner = conn.Entity.ASOrg c.Scope = conn.Entity.IPScope if conn.Entity.Coordinates != nil { c.Latitude = conn.Entity.Coordinates.Latitude c.Longitude = conn.Entity.Coordinates.Longitude } } // pre-compute the JSON blob for the extra data column // and assign it. extraDataBlob, err := json.Marshal(extraData) if err != nil { return nil, fmt.Errorf("failed to marshal extra data: %w", err) } c.ExtraData = extraDataBlob return &c, nil } func genConnID(conn *network.Connection) string { data := conn.ID + "-" + time.Unix(conn.Started, 0).String() hash := sha256.Sum256([]byte(data)) return hex.EncodeToString(hash[:]) }