Skip to content

Commit

Permalink
Light refactoring of the TC attachment code (#1466)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelroquetto authored Dec 17, 2024
1 parent 5770689 commit f03e4f4
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 220 deletions.
10 changes: 10 additions & 0 deletions pkg/internal/ebpf/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ var IntegrityModeOverride = false

var ActiveNamespaces = make(map[uint32]uint32)

// These represent unique traffic control (tc) handles to be supplied as
// arguments to RegisterIngress, RegisterEgress or RegisterTC. They all start
// from the 0xb310 offset in simplistic attempt to avoid collisions with
// 3rdparty handles.
const (
NetollyTCHandle = 0xb310 + iota
HTTPTracerTCHandle
TCTracerTCHandle
)

// ProbeDesc holds the information of the instrumentation points of a given
// function/symbol
type ProbeDesc struct {
Expand Down
111 changes: 60 additions & 51 deletions pkg/internal/ebpf/common/tc_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io/fs"
"log/slog"
"time"

"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand All @@ -21,10 +22,7 @@ type TCLinks struct {
IngressFilter *netlink.BpfFilter
}

func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), log *slog.Logger) {
informer := ifaces.NewWatcher(channelBufferLen)
registerer := ifaces.NewRegisterer(informer, channelBufferLen)

func StartTCMonitorLoop(ctx context.Context, registerer *ifaces.Registerer, register func(iface ifaces.Interface), log *slog.Logger) {
log.Debug("subscribing for network interface events")
ifaceEvents, err := registerer.Subscribe(ctx)
if err != nil {
Expand Down Expand Up @@ -54,10 +52,25 @@ func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func
}()
}

func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logger) *TCLinks {
links := TCLinks{}
// Convenience function
func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), log *slog.Logger) {
log.Debug("listening for new interfaces: use watching")

informer := ifaces.NewWatcher(channelBufferLen)
registerer := ifaces.NewRegisterer(informer, channelBufferLen)
StartTCMonitorLoop(ctx, registerer, register, log)
}

// Convenience function
func PollAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), period time.Duration, log *slog.Logger) {
log.Debug("listening for new interfaces: use polling", "period", period)

// Load pre-compiled programs and maps into the kernel, and rewrites the configuration
informer := ifaces.NewPoller(period, channelBufferLen)
registerer := ifaces.NewRegisterer(informer, channelBufferLen)
StartTCMonitorLoop(ctx, registerer, register, log)
}

func GetClsactQdisc(iface ifaces.Interface, log *slog.Logger) *netlink.GenericQdisc {
ipvlan, err := netlink.LinkByIndex(iface.Index)
if err != nil {
log.Error("failed to lookup ipvlan device", "index", iface.Index, "name", iface.Name, "error", err)
Expand All @@ -80,15 +93,29 @@ func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logge
return nil
}
}
links.Qdisc = qdisc

egressFilter, err := registerEgress(ipvlan, egressFD)
return qdisc
}

func RegisterTC(iface ifaces.Interface, egressFD int, egressHandle uint32, egressName string,
ingressFD int, ingressHandle uint32, ingressName string, log *slog.Logger) *TCLinks {
links := TCLinks{
Qdisc: GetClsactQdisc(iface, log),
}

if links.Qdisc == nil {
return nil
}

linkIndex := links.Qdisc.QdiscAttrs.LinkIndex

egressFilter, err := RegisterEgress(linkIndex, egressFD, egressHandle, egressName)
if err != nil {
log.Error("failed to install egress filters", "error", err)
}
links.EgressFilter = egressFilter

ingressFilter, err := registerIngress(ipvlan, ingressFD)
ingressFilter, err := RegisterIngress(linkIndex, ingressFD, ingressHandle, ingressName)
if err != nil {
log.Error("failed to install ingres filters", "error", err)
}
Expand All @@ -97,62 +124,44 @@ func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logge
return &links
}

func registerEgress(ipvlan netlink.Link, egressFD int) (*netlink.BpfFilter, error) {
// Fetch events on egress
egressAttrs := netlink.FilterAttrs{
LinkIndex: ipvlan.Attrs().Index,
Parent: netlink.HANDLE_MIN_EGRESS,
Handle: netlink.MakeHandle(0, 1),
Protocol: unix.ETH_P_ALL,
Priority: 1,
}
egressFilter := &netlink.BpfFilter{
FilterAttrs: egressAttrs,
Fd: egressFD,
Name: "tc/tc_http_egress",
DirectAction: true,
}
if err := netlink.FilterDel(egressFilter); err == nil {
log.Warn("egress filter already existed. Deleted it")
}
if err := netlink.FilterAdd(egressFilter); err != nil {
if errors.Is(err, fs.ErrExist) {
log.Warn("egress filter already exists. Ignoring", "error", err)
} else {
return nil, fmt.Errorf("failed to create egress filter: %w", err)
}
}
func RegisterEgress(linkIndex int, egressFD int, handle uint32, name string) (*netlink.BpfFilter, error) {
return registerFilter(linkIndex, egressFD, handle, netlink.HANDLE_MIN_EGRESS, name)
}

return egressFilter, nil
func RegisterIngress(linkIndex int, ingressFD int, handle uint32, name string) (*netlink.BpfFilter, error) {
return registerFilter(linkIndex, ingressFD, handle, netlink.HANDLE_MIN_INGRESS, name)
}

func registerIngress(ipvlan netlink.Link, ingressFD int) (*netlink.BpfFilter, error) {
func registerFilter(linkIndex int, fd int, handle uint32, parent uint32, name string) (*netlink.BpfFilter, error) {
// Fetch events on ingress
ingressAttrs := netlink.FilterAttrs{
LinkIndex: ipvlan.Attrs().Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Handle: netlink.MakeHandle(0, 1),
attrs := netlink.FilterAttrs{
LinkIndex: linkIndex,
Parent: parent,
Handle: handle,
Protocol: unix.ETH_P_ALL,
Priority: 1,
}
ingressFilter := &netlink.BpfFilter{
FilterAttrs: ingressAttrs,
Fd: ingressFD,
Name: "tc/tc_http_ingress",

filter := &netlink.BpfFilter{
FilterAttrs: attrs,
Fd: fd,
Name: name,
DirectAction: true,
}
if err := netlink.FilterDel(ingressFilter); err == nil {
log.Warn("ingress filter already existed. Deleted it")

if err := netlink.FilterDel(filter); err == nil {
log.Warn("filter already existed. Deleted it", "filter", name, "iface", linkIndex)
}
if err := netlink.FilterAdd(ingressFilter); err != nil {

if err := netlink.FilterAdd(filter); err != nil {
if errors.Is(err, fs.ErrExist) {
log.Warn("ingress filter already exists. Ignoring", "error", err)
log.Warn("filter already exists. Ignoring", "error", err)
} else {
return nil, fmt.Errorf("failed to create ingress filter: %w", err)
return nil, fmt.Errorf("failed to create filter: %w", err)
}
}

return ingressFilter, nil
return filter, nil
}

// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/ebpf/httptracer/httptracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) {
}

func (p *Tracer) registerTC(iface ifaces.Interface) {
links := ebpfcommon.RegisterTC(iface, p.bpfObjects.BeylaTcHttpEgress.FD(), p.bpfObjects.BeylaTcHttpIngress.FD(), p.log)
links := ebpfcommon.RegisterTC(iface,
p.bpfObjects.BeylaTcHttpEgress.FD(), ebpfcommon.HTTPTracerTCHandle, "tc/tc_http_egress",
p.bpfObjects.BeylaTcHttpIngress.FD(), ebpfcommon.HTTPTracerTCHandle, "tc/tc_http_ingress",
p.log)

if links == nil {
return
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/ebpf/tctracer/tctracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) {
}

func (p *Tracer) registerTC(iface ifaces.Interface) {
links := ebpfcommon.RegisterTC(iface, p.bpfObjects.BeylaAppEgress.FD(), p.bpfObjects.BeylaAppIngress.FD(), p.log)
links := ebpfcommon.RegisterTC(iface,
p.bpfObjects.BeylaAppEgress.FD(), ebpfcommon.TCTracerTCHandle, "tc/tc_egress",
p.bpfObjects.BeylaAppIngress.FD(), ebpfcommon.TCTracerTCHandle, "tc/tc_ingress",
p.log)

if links == nil {
return
}
Expand Down
32 changes: 4 additions & 28 deletions pkg/internal/netolly/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cilium/ebpf/ringbuf"

"github.com/grafana/beyla/pkg/beyla"
ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common"
"github.com/grafana/beyla/pkg/internal/netolly/ebpf"
"github.com/grafana/beyla/pkg/internal/netolly/flow"
"github.com/grafana/beyla/pkg/internal/netolly/ifaces"
Expand Down Expand Up @@ -89,7 +90,7 @@ type Flows struct {
ctxInfo *global.ContextInfo

// input data providers
interfaces ifaces.Informer
registerer *ifaces.Registerer
filter interfaceFilter
ebpf ebpfFlowFetcher

Expand Down Expand Up @@ -193,7 +194,7 @@ func flowsAgent(
return &Flows{
ctxInfo: ctxInfo,
ebpf: fetcher,
interfaces: registerer,
registerer: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
Expand Down Expand Up @@ -259,32 +260,7 @@ func (f *Flows) Status() Status {
func (f *Flows) interfacesManager(ctx context.Context) error {
slog := alog().With("function", "interfacesManager")

slog.Debug("subscribing for network interface events")
ifaceEvents, err := f.interfaces.Subscribe(ctx)
if err != nil {
return fmt.Errorf("instantiating interfaces' informer: %w", err)
}

go func() {
for {
select {
case <-ctx.Done():
slog.Debug("stopping interfaces' listener")
return
case event := <-ifaceEvents:
slog.Debug("received event", "event", event)
switch event.Type {
case ifaces.EventAdded:
f.onInterfaceAdded(event.Interface)
case ifaces.EventDeleted:
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
default:
slog.Warn("unknown event type", "event", event)
}
}
}
}()
ebpfcommon.StartTCMonitorLoop(ctx, f.registerer, f.onInterfaceAdded, slog)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/netolly/agent/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestFilter(t *testing.T) {
},
}},
},
interfaces: fakeInterfacesInformer{},
registerer: ifaces.NewRegisterer(fakeInterfacesInformer{}, 10),
interfaceNamer: func(_ int) string { return "fakeiface" },
}

Expand Down
Loading

0 comments on commit f03e4f4

Please sign in to comment.