Merge pull request #170 from safing/fix/race-conditions

Fixes race conditions due to incorrect usage of PushUpdate or PushFunc
This commit is contained in:
Patrick Pacher 2020-10-08 14:09:14 +02:00 committed by GitHub
commit 67b3d76ae5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 223 additions and 108 deletions

View file

@ -149,6 +149,8 @@ func handleRequest(ctx context.Context, w dns.ResponseWriter, request *dns.Msg)
// Get connection for this request. This identifies the process behind the request. // Get connection for this request. This identifies the process behind the request.
conn := network.NewConnectionFromDNSRequest(ctx, q.FQDN, nil, packet.IPv4, remoteAddr.IP, uint16(remoteAddr.Port)) conn := network.NewConnectionFromDNSRequest(ctx, q.FQDN, nil, packet.IPv4, remoteAddr.IP, uint16(remoteAddr.Port))
conn.Lock()
defer conn.Unlock()
// Once we decided on the connection we might need to save it to the database, // Once we decided on the connection we might need to save it to the database,
// so we defer that check for now. // so we defer that check for now.

View file

@ -12,7 +12,7 @@ import (
"github.com/safing/portmaster/process" "github.com/safing/portmaster/process"
) )
var ( const (
cleanerTickDuration = 5 * time.Second cleanerTickDuration = 5 * time.Second
deleteConnsAfterEndedThreshold = 5 * time.Minute deleteConnsAfterEndedThreshold = 5 * time.Minute
) )
@ -46,15 +46,8 @@ func cleanConnections() (activePIDs map[int]struct{}) {
nowUnix := now.Unix() nowUnix := now.Unix()
deleteOlderThan := now.Add(-deleteConnsAfterEndedThreshold).Unix() deleteOlderThan := now.Add(-deleteConnsAfterEndedThreshold).Unix()
// lock both together because we cannot fully guarantee in which map a connection lands
// of course every connection should land in the correct map, but this increases resilience
connsLock.Lock()
defer connsLock.Unlock()
dnsConnsLock.Lock()
defer dnsConnsLock.Unlock()
// network connections // network connections
for _, conn := range conns { for _, conn := range conns.clone() {
conn.Lock() conn.Lock()
// delete inactive connections // delete inactive connections
@ -70,15 +63,13 @@ func cleanConnections() (activePIDs map[int]struct{}) {
Dst: conn.Entity.IP, Dst: conn.Entity.IP,
DstPort: conn.Entity.Port, DstPort: conn.Entity.Port,
}, now) }, now)
activePIDs[conn.process.Pid] = struct{}{} activePIDs[conn.process.Pid] = struct{}{}
if !exists { if !exists {
// Step 2: mark end // Step 2: mark end
conn.Ended = nowUnix conn.Ended = nowUnix
if conn.KeyIsSet() { conn.Save()
// Be absolutely sure that we have a key set here, else conn.Save() will deadlock.
conn.Save()
}
} }
case conn.Ended < deleteOlderThan: case conn.Ended < deleteOlderThan:
// Step 3: delete // Step 3: delete
@ -90,7 +81,7 @@ func cleanConnections() (activePIDs map[int]struct{}) {
} }
// dns requests // dns requests
for _, conn := range dnsConns { for _, conn := range dnsConns.clone() {
conn.Lock() conn.Lock()
// delete old dns connections // delete old dns connections

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"strconv"
"sync" "sync"
"time" "time"
@ -19,48 +18,114 @@ import (
"github.com/safing/portmaster/resolver" "github.com/safing/portmaster/resolver"
) )
// FirewallHandler defines the function signature for a firewall handle function // FirewallHandler defines the function signature for a firewall
// handle function. A firewall handler is responsible for finding
// a reasonable verdict for the connection conn. The connection is
// locked before the firewall handler is called.
type FirewallHandler func(conn *Connection, pkt packet.Packet) type FirewallHandler func(conn *Connection, pkt packet.Packet)
// Connection describes a distinct physical network connection identified by the IP/Port pair. // Connection describes a distinct physical network connection
// identified by the IP/Port pair.
type Connection struct { //nolint:maligned // TODO: fix alignment type Connection struct { //nolint:maligned // TODO: fix alignment
record.Base record.Base
sync.Mutex sync.Mutex
ID string // ID may hold unique connection id. It is only set for non-DNS
Scope string // request connections and is considered immutable after a
// connection object has been created.
ID string
// Scope defines the scope of a connection. For DNS requests, the
// scope is always set to the domain name. For direct packet
// connections the scope consists of the involved network environment
// and the packet direction. Once a connection object is created,
// Scope is considered immutable.
Scope string
// IPVersion is set to the packet IP version. It is not set (0) for
// connections created from a DNS request.
IPVersion packet.IPVersion IPVersion packet.IPVersion
Inbound bool // Inbound is set to true if the connection is incoming. Inbound is
// only set when a connection object is created and is considered
// local endpoint // immutable afterwards.
Inbound bool
// IPProtocol is set to the transport protocol used by the connection.
// Is is considered immutable once a connection object has been
// created. IPProtocol is not set for connections that have been
// created from a DNS request.
IPProtocol packet.IPProtocol IPProtocol packet.IPProtocol
LocalIP net.IP // LocalIP holds the local IP address of the connection. It is not
LocalPort uint16 // set for connections created from DNS requests. LocalIP is
process *process.Process // considered immutable once a connection object has been created.
LocalIP net.IP
// remote endpoint // LocalPort holds the local port of the connection. It is not
// set for connections created from DNS requests. LocalPort is
// considered immutable once a connection object has been created.
LocalPort uint16
// Entity describes the remote entity that the connection has been
// established to. The entity might be changed or information might
// be added to it during the livetime of a connection. Access to
// entity must be guarded by the connection lock.
Entity *intel.Entity Entity *intel.Entity
// Verdict is the final decision that has been made for a connection.
Verdict Verdict // The verdict may change so any access to it must be guarded by the
Reason string // connection lock.
Verdict Verdict
// Reason is a human readable description justifying the set verdict.
// Access to Reason must be guarded by the connection lock.
Reason string
// ReasonContext may holds additional reason-specific information and
// any access must be guarded by the connection lock.
ReasonContext interface{} ReasonContext interface{}
ReasonID string // format source[:id[:id]] // TODO // Started holds the number of seconds in UNIX epoch time at which
// the connection has been initated and first seen by the portmaster.
Started int64 // Staretd is only every set when creating a new connection object
Ended int64 // and is considered immutable afterwards.
Tunneled bool Started int64
// Ended is set to the number of seconds in UNIX epoch time at which
// the connection is considered terminated. Ended may be set at any
// time so access must be guarded by the conneciton lock.
Ended int64
// VerdictPermanent is set to true if the final verdict is permanent
// and the connection has been (or will be) handed back to the kernel.
// VerdictPermanent may be changed together with the Verdict and Reason
// properties and must be guarded using the connection lock.
VerdictPermanent bool VerdictPermanent bool
Inspecting bool // Inspecting is set to true if the connection is being inspected
Encrypted bool // TODO // by one or more of the registered inspectors. This property may
Internal bool // Portmaster internal connections are marked in order to easily filter these out in the UI // be changed during the lifetime of a connection and must be guarded
// using the connection lock.
pktQueue chan packet.Packet Inspecting bool
// Tunneled is currently unused and MUST be ignored.
Tunneled bool
// Encrypted is currently unused and MUST be ignored.
Encrypted bool
// Internal is set to true if the connection is attributed as an
// Portmaster internal connection. Internal may be set at different
// points and access to it must be guarded by the connection lock.
Internal bool
// process holds a reference to the actor process. That is, the
// process instance that initated the conneciton.
process *process.Process
// pkgQueue is used to serialize packet handling for a single
// connection and is served by the connections packetHandler.
pktQueue chan packet.Packet
// firewallHandler is the firewall handler that is called for
// each packet sent to pktQueue.
firewallHandler FirewallHandler firewallHandler FirewallHandler
// saveWhenFinished can be set to drue during the life-time of
// a connection and signals the firewallHandler that a Save()
// should be issued after processing the connection.
saveWhenFinished bool
// activeInspectors is a slice of booleans where each entry
// maps to the index of an available inspector. If the value
// is true the inspector is currently active. False indicates
// that the inspector has finished and should be skipped.
activeInspectors []bool activeInspectors []bool
inspectorData map[uint8]interface{} // inspectorData holds additional meta data for the inspectors.
// using the inspectors index as a map key.
saveWhenFinished bool inspectorData map[uint8]interface{}
// profileRevisionCounter is used to track changes to the process
// profile and required for correct re-evaluation of a connections
// verdict.
profileRevisionCounter uint64 profileRevisionCounter uint64
} }
@ -120,7 +185,10 @@ func NewConnectionFromFirstPacket(pkt packet.Packet) *Connection {
scope = IncomingLAN scope = IncomingLAN
case netutils.Global, netutils.GlobalMulticast: case netutils.Global, netutils.GlobalMulticast:
scope = IncomingInternet scope = IncomingInternet
default: // netutils.Invalid
case netutils.Invalid:
fallthrough
default:
scope = IncomingInvalid scope = IncomingInvalid
} }
entity = &intel.Entity{ entity = &intel.Entity{
@ -167,7 +235,10 @@ func NewConnectionFromFirstPacket(pkt packet.Packet) *Connection {
scope = PeerLAN scope = PeerLAN
case netutils.Global, netutils.GlobalMulticast: case netutils.Global, netutils.GlobalMulticast:
scope = PeerInternet scope = PeerInternet
default: // netutils.Invalid
case netutils.Invalid:
fallthrough
default:
scope = PeerInvalid scope = PeerInvalid
} }
@ -194,11 +265,7 @@ func NewConnectionFromFirstPacket(pkt packet.Packet) *Connection {
// GetConnection fetches a Connection from the database. // GetConnection fetches a Connection from the database.
func GetConnection(id string) (*Connection, bool) { func GetConnection(id string) (*Connection, bool) {
connsLock.RLock() return conns.get(id)
defer connsLock.RUnlock()
conn, ok := conns[id]
return conn, ok
} }
// AcceptWithContext accepts the connection. // AcceptWithContext accepts the connection.
@ -292,32 +359,24 @@ func (conn *Connection) SaveWhenFinished() {
conn.saveWhenFinished = true conn.saveWhenFinished = true
} }
// Save saves the connection in the storage and propagates the change through the database system. // Save saves the connection in the storage and propagates the change
// through the database system. Save may lock dnsConnsLock or connsLock
// in if Save() is called the first time.
// Callers must make sure to lock the connection itself before calling
// Save().
func (conn *Connection) Save() { func (conn *Connection) Save() {
conn.UpdateMeta() conn.UpdateMeta()
if !conn.KeyIsSet() { if !conn.KeyIsSet() {
// A connection without an ID has been created from
// a DNS request rather than a packet. Choose the correct
// connection store here.
if conn.ID == "" { if conn.ID == "" {
// dns request
// set key
conn.SetKey(fmt.Sprintf("network:tree/%d/%s", conn.process.Pid, conn.Scope)) conn.SetKey(fmt.Sprintf("network:tree/%d/%s", conn.process.Pid, conn.Scope))
mapKey := strconv.Itoa(conn.process.Pid) + "/" + conn.Scope dnsConns.add(conn)
// save
dnsConnsLock.Lock()
dnsConns[mapKey] = conn
dnsConnsLock.Unlock()
} else { } else {
// network connection
// set key
conn.SetKey(fmt.Sprintf("network:tree/%d/%s/%s", conn.process.Pid, conn.Scope, conn.ID)) conn.SetKey(fmt.Sprintf("network:tree/%d/%s/%s", conn.process.Pid, conn.Scope, conn.ID))
conns.add(conn)
// save
connsLock.Lock()
conns[conn.ID] = conn
connsLock.Unlock()
} }
} }
@ -325,12 +384,17 @@ func (conn *Connection) Save() {
dbController.PushUpdate(conn) dbController.PushUpdate(conn)
} }
// delete deletes a link from the storage and propagates the change. Nothing is locked - both the conns map and the connection itself require locking // delete deletes a link from the storage and propagates the change.
// delete may lock either the dnsConnsLock or connsLock. Callers
// must still make sure to lock the connection itself.
func (conn *Connection) delete() { func (conn *Connection) delete() {
// A connection without an ID has been created from
// a DNS request rather than a packet. Choose the correct
// connection store here.
if conn.ID == "" { if conn.ID == "" {
delete(dnsConns, strconv.Itoa(conn.process.Pid)+"/"+conn.Scope) dnsConns.delete(conn)
} else { } else {
delete(conns, conn.ID) conns.delete(conn)
} }
conn.Meta().Delete() conn.Meta().Delete()
@ -352,7 +416,8 @@ func (conn *Connection) UpdateAndCheck() (needsReevaluation bool) {
return return
} }
// SetFirewallHandler sets the firewall handler for this link, and starts a worker to handle the packets. // SetFirewallHandler sets the firewall handler for this link, and starts a
// worker to handle the packets.
func (conn *Connection) SetFirewallHandler(handler FirewallHandler) { func (conn *Connection) SetFirewallHandler(handler FirewallHandler) {
if conn.firewallHandler == nil { if conn.firewallHandler == nil {
conn.pktQueue = make(chan packet.Packet, 1000) conn.pktQueue = make(chan packet.Packet, 1000)
@ -388,26 +453,27 @@ func (conn *Connection) HandlePacket(pkt packet.Packet) {
// packetHandler sequentially handles queued packets // packetHandler sequentially handles queued packets
func (conn *Connection) packetHandler() { func (conn *Connection) packetHandler() {
for { for pkt := range conn.pktQueue {
pkt := <-conn.pktQueue
if pkt == nil { if pkt == nil {
return return
} }
// get handler // get handler
conn.Lock() conn.Lock()
// execute handler or verdict // execute handler or verdict
if conn.firewallHandler != nil { if conn.firewallHandler != nil {
conn.firewallHandler(conn, pkt) conn.firewallHandler(conn, pkt)
} else { } else {
defaultFirewallHandler(conn, pkt) defaultFirewallHandler(conn, pkt)
} }
conn.Unlock()
// save does not touch any changing data // save does not touch any changing data
// must not be locked, will deadlock with cleaner functions // must not be locked, will deadlock with cleaner functions
if conn.saveWhenFinished { if conn.saveWhenFinished {
conn.saveWhenFinished = false conn.saveWhenFinished = false
conn.Save() conn.Save()
} }
conn.Unlock()
// submit trace logs // submit trace logs
log.Tracer(pkt.Ctx()).Submit() log.Tracer(pkt.Ctx()).Submit()
} }

View file

@ -0,0 +1,57 @@
package network
import (
"strconv"
"sync"
)
type connectionStore struct {
rw sync.RWMutex
items map[string]*Connection
}
func newConnectionStore() *connectionStore {
return &connectionStore{
items: make(map[string]*Connection, 100),
}
}
func (cs *connectionStore) getID(conn *Connection) string {
if conn.ID != "" {
return conn.ID
}
return strconv.Itoa(conn.process.Pid) + "/" + conn.Scope
}
func (cs *connectionStore) add(conn *Connection) {
cs.rw.Lock()
defer cs.rw.Unlock()
cs.items[cs.getID(conn)] = conn
}
func (cs *connectionStore) delete(conn *Connection) {
cs.rw.Lock()
defer cs.rw.Unlock()
delete(cs.items, cs.getID(conn))
}
func (cs *connectionStore) get(id string) (*Connection, bool) {
cs.rw.RLock()
defer cs.rw.RUnlock()
conn, ok := cs.items[id]
return conn, ok
}
func (cs *connectionStore) clone() map[string]*Connection {
cs.rw.RLock()
defer cs.rw.RUnlock()
m := make(map[string]*Connection, len(cs.items))
for key, conn := range cs.items {
m[key] = conn
}
return m
}

View file

@ -3,7 +3,6 @@ package network
import ( import (
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/safing/portmaster/network/state" "github.com/safing/portmaster/network/state"
@ -16,15 +15,14 @@ import (
) )
var ( var (
dnsConns = make(map[string]*Connection) // key: <PID>/Scope
dnsConnsLock sync.RWMutex
conns = make(map[string]*Connection) // key: Connection ID
connsLock sync.RWMutex
dbController *database.Controller dbController *database.Controller
dnsConns = newConnectionStore()
conns = newConnectionStore()
) )
// StorageInterface provices a storage.Interface to the configuration manager. // StorageInterface provices a storage.Interface to the
// configuration manager.
type StorageInterface struct { type StorageInterface struct {
storage.InjectBase storage.InjectBase
} }
@ -45,18 +43,12 @@ func (s *StorageInterface) Get(key string) (record.Record, error) {
} }
} }
case 3: case 3:
dnsConnsLock.RLock() if r, ok := dnsConns.get(splitted[1] + "/" + splitted[2]); ok {
defer dnsConnsLock.RUnlock() return r, nil
conn, ok := dnsConns[splitted[1]+"/"+splitted[2]]
if ok {
return conn, nil
} }
case 4: case 4:
connsLock.RLock() if r, ok := conns.get(splitted[3]); ok {
defer connsLock.RUnlock() return r, nil
conn, ok := conns[splitted[3]]
if ok {
return conn, nil
} }
} }
case "system": case "system":
@ -97,28 +89,24 @@ func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) {
if slashes <= 2 { if slashes <= 2 {
// dns scopes only // dns scopes only
dnsConnsLock.RLock() for _, dnsConn := range dnsConns.clone() {
for _, dnsConn := range dnsConns {
dnsConn.Lock() dnsConn.Lock()
if q.Matches(dnsConn) { if q.Matches(dnsConn) {
it.Next <- dnsConn it.Next <- dnsConn
} }
dnsConn.Unlock() dnsConn.Unlock()
} }
dnsConnsLock.RUnlock()
} }
if slashes <= 3 { if slashes <= 3 {
// connections // connections
connsLock.RLock() for _, conn := range conns.clone() {
for _, conn := range conns {
conn.Lock() conn.Lock()
if q.Matches(conn) { if q.Matches(conn) {
it.Next <- conn it.Next <- conn
} }
conn.Unlock() conn.Unlock()
} }
connsLock.RUnlock()
} }
it.Finish(nil) it.Finish(nil)

View file

@ -17,14 +17,16 @@ var (
openDNSRequests = make(map[string]*Connection) // key: <pid>/fqdn openDNSRequests = make(map[string]*Connection) // key: <pid>/fqdn
openDNSRequestsLock sync.Mutex openDNSRequestsLock sync.Mutex
// scope prefix
unidentifiedProcessScopePrefix = strconv.Itoa(process.UnidentifiedProcessID) + "/"
)
const (
// write open dns requests every // write open dns requests every
writeOpenDNSRequestsTickDuration = 5 * time.Second writeOpenDNSRequestsTickDuration = 5 * time.Second
// duration after which DNS requests without a following connection are logged // duration after which DNS requests without a following connection are logged
openDNSRequestLimit = 3 * time.Second openDNSRequestLimit = 3 * time.Second
// scope prefix
unidentifiedProcessScopePrefix = strconv.Itoa(process.UnidentifiedProcessID) + "/"
) )
func getDNSRequestCacheKey(pid int, fqdn string) string { func getDNSRequestCacheKey(pid int, fqdn string) string {

View file

@ -61,3 +61,7 @@ func (i *BindInfo) GetUID() int { return i.UID }
// GetInode returns the Inode. // GetInode returns the Inode.
func (i *BindInfo) GetInode() int { return i.Inode } func (i *BindInfo) GetInode() int { return i.Inode }
// compile time checks
var _ Info = new(ConnectionInfo)
var _ Info = new(BindInfo)

View file

@ -3,9 +3,10 @@ package network
// Verdict describes the decision made about a connection or link. // Verdict describes the decision made about a connection or link.
type Verdict int8 type Verdict int8
// List of values a Status can have // All possible verdicts that can be applied to a network
// connection.
const ( const (
// UNDECIDED is the default status of new connections // VerdictUndecided is the default status of new connections.
VerdictUndecided Verdict = 0 VerdictUndecided Verdict = 0
VerdictUndeterminable Verdict = 1 VerdictUndeterminable Verdict = 1
VerdictAccept Verdict = 2 VerdictAccept Verdict = 2
@ -63,7 +64,7 @@ func (v Verdict) Verb() string {
} }
} }
// Packer Directions // Packet Directions
const ( const (
Inbound = true Inbound = true
Outbound = false Outbound = false

View file

@ -66,7 +66,7 @@ func (p *Process) Save() {
} }
if dbControllerFlag.IsSet() { if dbControllerFlag.IsSet() {
go dbController.PushUpdate(p) dbController.PushUpdate(p)
} }
} }
@ -83,7 +83,7 @@ func (p *Process) Delete() {
// propagate delete // propagate delete
p.Meta().Delete() p.Meta().Delete()
if dbControllerFlag.IsSet() { if dbControllerFlag.IsSet() {
go dbController.PushUpdate(p) dbController.PushUpdate(p)
} }
// TODO: maybe mark the assigned profiles as no longer needed? // TODO: maybe mark the assigned profiles as no longer needed?

View file

@ -89,5 +89,9 @@ func pushSystemStatus() {
return return
} }
pushUpdate(buildSystemStatus()) record := buildSystemStatus()
record.Lock()
defer record.Unlock()
pushUpdate(record)
} }