Improve ebpf connection listener

This commit is contained in:
Daniel 2023-07-20 14:01:58 +02:00
parent dc033c3933
commit 4c21c87b8a
5 changed files with 107 additions and 170 deletions

View file

@ -1,57 +0,0 @@
//go:build linux
package ebpf
import (
"fmt"
pmpacket "github.com/safing/portmaster/network/packet"
)
// packet implements the packet.Packet interface.
type infoPacket struct {
pmpacket.Base
}
// InfoOnly returns whether the packet is informational only and does not
// represent an actual packet.
func (pkt *infoPacket) InfoOnly() bool {
return true
}
// LoadPacketData does nothing on Linux, as data is always fully parsed.
func (pkt *infoPacket) LoadPacketData() error {
return fmt.Errorf("can't load data in info only packet")
}
func (pkt *infoPacket) Accept() error {
return nil
}
func (pkt *infoPacket) Block() error {
return nil
}
func (pkt *infoPacket) Drop() error {
return nil
}
func (pkt *infoPacket) PermanentAccept() error {
return pkt.Accept()
}
func (pkt *infoPacket) PermanentBlock() error {
return pkt.Block()
}
func (pkt *infoPacket) PermanentDrop() error {
return nil
}
func (pkt *infoPacket) RerouteToNameserver() error {
return nil
}
func (pkt *infoPacket) RerouteToTunnel() error {
return nil
}

View file

@ -2,131 +2,124 @@ package ebpf
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"net"
"sync/atomic"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/safing/portbase/log"
"github.com/safing/portmaster/network/packet"
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -cflags "-O2 -g -Wall -Werror" -type Event bpf ../programs/monitor.c
var stopper chan struct{}
var ebpfLoadingFailed atomic.Uint32
// StartEBPFWorker starts the ebpf worker.
func StartEBPFWorker(ch chan packet.Packet) {
stopper = make(chan struct{})
// ConnectionListenerWorker listens to new connections using ebpf.
func ConnectionListenerWorker(ctx context.Context, packets chan packet.Packet) error {
// Allow the current process to lock memory for eBPF resources.
if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("ebpf: failed to remove ebpf memlock: %w", err)
}
// Load pre-compiled programs and maps into the kernel.
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
if ebpfLoadingFailed.Add(1) >= 5 {
log.Warningf("ebpf: failed to load ebpf object 5 times, giving up with error %s", err)
return nil
}
return fmt.Errorf("ebpf: failed to load ebpf object: %w", err)
}
defer objs.Close() //nolint:errcheck
// Create a link to the tcp_connect program.
linkTCPConnect, err := link.AttachTracing(link.TracingOptions{
Program: objs.bpfPrograms.TcpConnect,
})
if err != nil {
return fmt.Errorf("ebpf: failed to attach to tcp_v4_connect: %w", err)
}
defer linkTCPConnect.Close() //nolint:errcheck
// Create a link to the udp_v4_connect program.
linkUDPV4, err := link.AttachTracing(link.TracingOptions{
Program: objs.bpfPrograms.UdpV4Connect,
})
if err != nil {
return fmt.Errorf("ebpf: failed to attach to udp_v4_connect: %w", err)
}
defer linkUDPV4.Close() //nolint:errcheck
// Create a link to the udp_v6_connect program.
linkUDPV6, err := link.AttachTracing(link.TracingOptions{
Program: objs.bpfPrograms.UdpV6Connect,
})
if err != nil {
return fmt.Errorf("ebpf: failed to attach to udp_v6_connect: %w", err)
}
defer linkUDPV6.Close() //nolint:errcheck
// Create new reader to read events.
rd, err := ringbuf.NewReader(objs.bpfMaps.PmConnectionEvents)
if err != nil {
return fmt.Errorf("ebpf: failed to open ring buffer: %w", err)
}
defer rd.Close() //nolint:errcheck
// Start watcher to close the reader when the context is canceled.
// TODO: Can we put this into a worker?
go func() {
// Allow the current process to lock memory for eBPF resources.
if err := rlimit.RemoveMemlock(); err != nil {
log.Errorf("ebpf: failed to remove ebpf memlock: %s", err)
return
}
// Load pre-compiled programs and maps into the kernel.
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
log.Errorf("ebpf: failed to load ebpf object: %s", err)
return
}
defer objs.Close() //nolint:errcheck
// Create a link to the tcp_connect program.
linkTCPConnect, err := link.AttachTracing(link.TracingOptions{
Program: objs.bpfPrograms.TcpConnect,
})
if err != nil {
log.Errorf("ebpf: failed to attach to tcp_v4_connect: %s ", err)
return
}
defer linkTCPConnect.Close() //nolint:errcheck
// Create a link to the udp_v4_connect program.
linkUDPV4, err := link.AttachTracing(link.TracingOptions{
Program: objs.bpfPrograms.UdpV4Connect,
})
if err != nil {
log.Errorf("ebpf: failed to attach to udp_v4_connect: %s ", err)
return
}
defer linkUDPV4.Close() //nolint:errcheck
// Create a link to the udp_v6_connect program.
linkUDPV6, err := link.AttachTracing(link.TracingOptions{
Program: objs.bpfPrograms.UdpV6Connect,
})
if err != nil {
log.Errorf("ebpf: failed to attach to udp_v6_connect: %s ", err)
return
}
defer linkUDPV6.Close() //nolint:errcheck
rd, err := ringbuf.NewReader(objs.bpfMaps.PmConnectionEvents)
if err != nil {
log.Errorf("ebpf: failed to open ring buffer: %s", err)
return
}
defer rd.Close()
go func() {
<-stopper
if err := rd.Close(); err != nil {
log.Errorf("ebpf: failed closing ringbuf reader: %s", err)
}
}()
for {
// Read next event
record, err := rd.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
// Normal return
return
}
log.Errorf("ebpf: failed to read from ring buffer: %s", err)
continue
}
var event bpfEvent
// Parse the ringbuf event entry into a bpfEvent structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.BigEndian, &event); err != nil {
log.Errorf("ebpf: failed to parse ringbuf event: %s", err)
continue
}
info := packet.Info{
Inbound: event.Direction == 1,
InTunnel: false,
Version: packet.IPVersion(event.IpVersion),
Protocol: packet.IPProtocol(event.Protocol),
SrcPort: event.Sport,
DstPort: event.Dport,
Src: convertArrayToIPv4(event.Saddr, packet.IPVersion(event.IpVersion)),
Dst: convertArrayToIPv4(event.Daddr, packet.IPVersion(event.IpVersion)),
PID: int(event.Pid),
}
if isEventValid(event) {
log.Debugf("ebpf: PID: %d conn: %s:%d -> %s:%d %s %s", info.PID, info.LocalIP(), info.LocalPort(), info.RemoteIP(), info.RemotePort(), info.Version.String(), info.Protocol.String())
p := &infoPacket{}
p.SetPacketInfo(info)
ch <- p
} else {
log.Debugf("ebpf: invalid event PID: %d conn: %s:%d -> %s:%d %s %s", info.PID, info.LocalIP(), info.LocalPort(), info.RemoteIP(), info.RemotePort(), info.Version.String(), info.Protocol.String())
}
<-ctx.Done()
if err := rd.Close(); err != nil {
log.Errorf("ebpf: failed closing ringbuf reader: %s", err)
}
}()
}
// StopEBPFWorker stops the ebpf worker.
func StopEBPFWorker() {
close(stopper)
for {
// Read next event
record, err := rd.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
// Normal return
return nil
}
log.Errorf("ebpf: failed to read from ring buffer: %s", err)
continue
}
var event bpfEvent
// Parse the ringbuf event entry into a bpfEvent structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.BigEndian, &event); err != nil {
log.Errorf("ebpf: failed to parse ringbuf event: %s", err)
continue
}
pkt := packet.NewInfoPacket(packet.Info{
Inbound: event.Direction == 1,
InTunnel: false,
Version: packet.IPVersion(event.IpVersion),
Protocol: packet.IPProtocol(event.Protocol),
SrcPort: event.Sport,
DstPort: event.Dport,
Src: convertArrayToIPv4(event.Saddr, packet.IPVersion(event.IpVersion)),
Dst: convertArrayToIPv4(event.Daddr, packet.IPVersion(event.IpVersion)),
PID: int(event.Pid),
})
if isEventValid(event) {
log.Debugf("ebpf: received valid connect event: PID: %d Conn: %s", pkt.Info().PID, pkt)
packets <- pkt
} else {
log.Warningf("ebpf: received invalid connect event: PID: %d Conn: %s", pkt.Info().PID, pkt)
}
}
}
// isEventValid checks whether the given bpfEvent is valid or not.
@ -166,11 +159,11 @@ func convertArrayToIPv4(input [4]uint32, ipVersion packet.IPVersion) net.IP {
addressBuf := make([]byte, 4)
binary.LittleEndian.PutUint32(addressBuf, input[0])
return net.IP(addressBuf)
} else {
addressBuf := make([]byte, 16)
for i := 0; i < 4; i++ {
binary.LittleEndian.PutUint32(addressBuf[i*4:i*4+4], input[i])
}
return net.IP(addressBuf)
}
addressBuf := make([]byte, 16)
for i := 0; i < 4; i++ {
binary.LittleEndian.PutUint32(addressBuf[i*4:i*4+4], input[i])
}
return net.IP(addressBuf)
}

View file

@ -2,7 +2,8 @@
#ifndef __BPF_TRACING_H__
#define __BPF_TRACING_H__
#include <bpf/bpf_helpers.h>
// #include <bpf/bpf_helpers.h>
#include "bpf_helpers.h"
/* Scan the ARCH passed in from ARCH env variable (see Makefile) */
#if defined(__TARGET_ARCH_x86)