From 47ccd8bc4c20056abf04e87f7567f40dac5e5695 Mon Sep 17 00:00:00 2001 From: Simone Rodigari <32323373+SRodi@users.noreply.github.com> Date: Fri, 13 Dec 2024 17:08:39 +0000 Subject: [PATCH] feat(ct-metrics): BPF implementation (#1102) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Description BPF implementation for connection tracking metrics. This is the data-plane work mentioned in this comment https://github.com/microsoft/retina/pull/1057#issuecomment-2519691748 Summary - feature flag enableConntrackMetrics - counters incremented within IFDEF in BPF - counters: packets forward/reply + bytes forward/reply - conntrack metadata includes metrics and is added to packets struct - add/update unit tests for conntrack_linux and packetparser_linux ## Related Issue #806 ## Checklist - [x] I have read the [contributing documentation](https://retina.sh/docs/contributing). - [x] I signed and signed-off the commits (`git commit -S -s ...`). See [this documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification) on signing commits. - [x] I have correctly attributed the author(s) of the code. - [x] I have tested the changes locally. - [x] I have followed the project's style guidelines. - [x] I have updated the documentation, if necessary. - [x] I have added tests, if applicable. ## Screenshots (if applicable) or Testing Completed Please add any relevant screenshots or GIFs to showcase the changes made. 1. `enableConntrackMetrics=false` ```sh # bpftool map dump id 994 -j | jq -r .[0] { "key": [ ... ], "value": [ ... ], "formatted": { "key": { ... }, "value": { ... "conntrack_metadata": { "bytes_forward_count": 0, "bytes_reply_count": 0, "packets_forward_count": 0, "packets_reply_count": 0 } } } } ``` 2. `enableConntrackMetrics=true` ```sh # bpftool map dump id 1019 -j | jq -r .[0] { "key": [ ... ], "value": [ ... ], "formatted": { "key": { ... }, "value": { ..., "conntrack_metadata": { "bytes_forward_count": 13440, "bytes_reply_count": 56335, "packets_forward_count": 56, "packets_reply_count": 43 } } } } ``` At userland level I provisionally added a debug statement, just for this test, in `packetparser_linux.go` (without IP and proto translation) ```sh ❯ k logs -n kube-system retina-agent-chvdh | head -n 10 | grep metadata Defaulted container "retina" out of: retina, init-retina (init) ts=2024-12-13T10:37:08.881Z level=debug caller=packetparser/packetparser_linux.go:577 msg="Conntrack metadata" SrcIp=788657162 DstIp=2499867658 SrcPort=19117 DstPort=23313 Proto=6 PacketsForwardCount=73 PacketsReplyCount=83 BytesForwardCount=16068 BytesReplyCount=6936 ts=2024-12-13T10:37:08.881Z level=debug caller=packetparser/packetparser_linux.go:577 msg="Conntrack metadata" SrcIp=788657162 DstIp=2499867658 SrcPort=19117 DstPort=23313 Proto=6 PacketsForwardCount=73 PacketsReplyCount=82 BytesForwardCount=16068 BytesReplyCount=6870 ``` ## Additional Notes Add any additional notes or context about the pull request here. --- Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more information on how to contribute to this project. --- .../retina/templates/agent/configmap.yaml | 1 + .../helm/retina/templates/configmap.yaml | 1 + .../controller/helm/retina/values.yaml | 1 + pkg/config/config.go | 1 + pkg/plugin/conntrack/_cprog/conntrack.c | 74 +++++++++++++-- pkg/plugin/conntrack/_cprog/dynamic.h | 2 + pkg/plugin/conntrack/conntrack_bpfel_arm64.go | 6 ++ pkg/plugin/conntrack/conntrack_bpfel_x86.go | 6 ++ pkg/plugin/conntrack/conntrack_linux.go | 25 +++++ pkg/plugin/conntrack/conntrack_linux_test.go | 92 +++++++++++++++++++ pkg/plugin/conntrack/types_linux.go | 5 +- pkg/plugin/packetparser/_cprog/packetparser.c | 6 ++ .../packetparser/packetparser_bpfel_arm64.go | 24 +++-- .../packetparser/packetparser_bpfel_x86.go | 24 +++-- pkg/plugin/packetparser/packetparser_linux.go | 44 ++++++++- .../packetparser/packetparser_linux_test.go | 81 +++++++++++----- 16 files changed, 345 insertions(+), 48 deletions(-) create mode 100644 pkg/plugin/conntrack/_cprog/dynamic.h create mode 100644 pkg/plugin/conntrack/conntrack_linux_test.go diff --git a/deploy/hubble/manifests/controller/helm/retina/templates/agent/configmap.yaml b/deploy/hubble/manifests/controller/helm/retina/templates/agent/configmap.yaml index 0e80cefed2..704cf5a9c5 100644 --- a/deploy/hubble/manifests/controller/helm/retina/templates/agent/configmap.yaml +++ b/deploy/hubble/manifests/controller/helm/retina/templates/agent/configmap.yaml @@ -108,6 +108,7 @@ data: metricsIntervalDuration: {{ .Values.metricsIntervalDuration }} enableTelemetry: {{ .Values.enableTelemetry }} enablePodLevel: {{ .Values.enablePodLevel }} + enableConntrackMetrics: {{ .Values.enableConntrackMetrics }} remoteContext: {{ .Values.remoteContext }} enableAnnotations: {{ .Values.enableAnnotations }} bypassLookupIPOfInterest: {{ .Values.bypassLookupIPOfInterest }} diff --git a/deploy/legacy/manifests/controller/helm/retina/templates/configmap.yaml b/deploy/legacy/manifests/controller/helm/retina/templates/configmap.yaml index 6218bc62b3..a53b8537a6 100644 --- a/deploy/legacy/manifests/controller/helm/retina/templates/configmap.yaml +++ b/deploy/legacy/manifests/controller/helm/retina/templates/configmap.yaml @@ -19,6 +19,7 @@ data: metricsIntervalDuration: {{ .Values.metricsIntervalDuration }} enableTelemetry: {{ .Values.enableTelemetry }} enablePodLevel: {{ .Values.enablePodLevel }} + enableConntrackMetrics: {{ .Values.enableConntrackMetrics }} remoteContext: {{ .Values.remoteContext }} enableAnnotations: {{ .Values.enableAnnotations }} bypassLookupIPOfInterest: {{ .Values.bypassLookupIPOfInterest }} diff --git a/deploy/legacy/manifests/controller/helm/retina/values.yaml b/deploy/legacy/manifests/controller/helm/retina/values.yaml index f95da03629..7dee8c519e 100644 --- a/deploy/legacy/manifests/controller/helm/retina/values.yaml +++ b/deploy/legacy/manifests/controller/helm/retina/values.yaml @@ -35,6 +35,7 @@ image: # Overrides the image tag whose default is the chart appVersion. tag: "v0.0.2" +enableConntrackMetrics: false enablePodLevel: false remoteContext: false enableAnnotations: false diff --git a/pkg/config/config.go b/pkg/config/config.go index 8ee4798ea2..5a5098456e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -61,6 +61,7 @@ type Config struct { EnableTelemetry bool `yaml:"enableTelemetry"` EnableRetinaEndpoint bool `yaml:"enableRetinaEndpoint"` EnablePodLevel bool `yaml:"enablePodLevel"` + EnableConntrackMetrics bool `yaml:"enableConntrackMetrics"` RemoteContext bool `yaml:"remoteContext"` EnableAnnotations bool `yaml:"enableAnnotations"` BypassLookupIPOfInterest bool `yaml:"bypassLookupIPOfInterest"` diff --git a/pkg/plugin/conntrack/_cprog/conntrack.c b/pkg/plugin/conntrack/_cprog/conntrack.c index c078ff78ab..9d415a1eb5 100644 --- a/pkg/plugin/conntrack/_cprog/conntrack.c +++ b/pkg/plugin/conntrack/_cprog/conntrack.c @@ -7,6 +7,7 @@ #include "compiler.h" #include "bpf_helpers.h" #include "conntrack.h" +#include "dynamic.h" struct tcpmetadata { __u32 seq; // TCP sequence number @@ -15,6 +16,20 @@ struct tcpmetadata { __u32 tsecr; // TCP timestamp echo reply }; +struct conntrackmetadata { + /* + bytes_*_count indicates the number of bytes sent and received in the forward and reply direction. + These values will be based on the conntrack entry. + */ + __u64 bytes_forward_count; + __u64 bytes_reply_count; + /* + packets_*_count indicates the number of packets sent and received in the forward and reply direction. + These values will be based on the conntrack entry. + */ + __u32 packets_forward_count; + __u32 packets_reply_count; +}; struct packet { @@ -30,6 +45,7 @@ struct packet __u8 proto; __u8 flags; // For TCP packets, this is the TCP flags. For UDP packets, this is will always be 1 for conntrack purposes. bool is_reply; + struct conntrackmetadata conntrack_metadata; }; @@ -68,6 +84,7 @@ struct ct_entry { * before retina deployment and the SYN packet was not captured. */ bool is_direction_unknown; + struct conntrackmetadata conntrack_metadata; }; struct { @@ -110,11 +127,11 @@ static __always_inline __u8 _ct_get_traffic_direction(__u8 observation_point) { /** * Create a new TCP connection. + * @arg *p pointer to the packet to be processed. * @arg key The key to be used to create the new connection. - * @arg flags The flags of the packet. * @arg observation_point The point in the network stack where the packet is observed. */ -static __always_inline bool _ct_create_new_tcp_connection(struct ct_v4_key key, __u8 flags, __u8 observation_point) { +static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key key, __u8 observation_point) { struct ct_entry new_value; __builtin_memset(&new_value, 0, sizeof(struct ct_entry)); __u64 now = bpf_mono_now(); @@ -123,9 +140,20 @@ static __always_inline bool _ct_create_new_tcp_connection(struct ct_v4_key key, return false; } new_value.eviction_time = now + CT_SYN_TIMEOUT; - new_value.flags_seen_tx_dir = flags; + new_value.flags_seen_tx_dir = p->flags; new_value.is_direction_unknown = false; new_value.traffic_direction = _ct_get_traffic_direction(observation_point); + + #ifdef ENABLE_CONNTRACK_METRICS + new_value.conntrack_metadata.packets_forward_count = 1; + new_value.conntrack_metadata.bytes_forward_count = p->bytes; + // Update initial conntrack metadata for the connection. + __builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata)); + #endif // ENABLE_CONNTRACK_METRICS + + // Update packet + p->is_reply = false; + p->traffic_direction = new_value.traffic_direction; bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY); return true; } @@ -148,10 +176,17 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c new_value.flags_seen_tx_dir = p->flags; new_value.last_report_tx_dir = now; new_value.traffic_direction = _ct_get_traffic_direction(observation_point); - bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY); + #ifdef ENABLE_CONNTRACK_METRICS + new_value.conntrack_metadata.packets_forward_count = 1; + new_value.conntrack_metadata.bytes_forward_count = p->bytes; + // Update packet's conntrack metadata. + __builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata));; + #endif // ENABLE_CONNTRACK_METRICS + // Update packet p->is_reply = false; p->traffic_direction = new_value.traffic_direction; + bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY); return true; } @@ -165,11 +200,8 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 observation_point) { // Check if the packet is a SYN packet. if (p->flags & TCP_SYN) { - // Update packet accordingly. - p->is_reply = false; - p->traffic_direction = _ct_get_traffic_direction(observation_point); // Create a new connection with a timeout of CT_SYN_TIMEOUT. - return _ct_create_new_tcp_connection(key, p->flags, observation_point); + return _ct_create_new_tcp_connection(p, key, observation_point); } // The packet is not a SYN packet and the connection corresponding to this packet is not found. @@ -193,13 +225,25 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c p->is_reply = true; new_value.flags_seen_rx_dir = p->flags; new_value.last_report_rx_dir = now; + #ifdef ENABLE_CONNTRACK_METRICS + new_value.conntrack_metadata.bytes_reply_count = p->bytes; + new_value.conntrack_metadata.packets_reply_count = 1; + #endif // ENABLE_CONNTRACK_METRICS bpf_map_update_elem(&retina_conntrack, &reverse_key, &new_value, BPF_ANY); } else { // Otherwise, the packet is considered as a packet in the send direction. p->is_reply = false; new_value.flags_seen_tx_dir = p->flags; new_value.last_report_tx_dir = now; + #ifdef ENABLE_CONNTRACK_METRICS + new_value.conntrack_metadata.bytes_forward_count = p->bytes; + new_value.conntrack_metadata.packets_forward_count = 1; + #endif // ENABLE_CONNTRACK_METRICS bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY); } + #ifdef ENABLE_CONNTRACK_METRICS + // Update packet's conntrack metadata. + __builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata)); + #endif // ENABLE_CONNTRACK_METRICS return true; } @@ -318,6 +362,13 @@ static __always_inline __attribute__((unused)) bool ct_process_packet(struct pac // Update the packet accordingly. p->is_reply = false; p->traffic_direction = entry->traffic_direction; + #ifdef ENABLE_CONNTRACK_METRICS + // Update packet count and bytes count on conntrack entry. + WRITE_ONCE(entry->conntrack_metadata.packets_forward_count, READ_ONCE(entry->conntrack_metadata.packets_forward_count) + 1); + WRITE_ONCE(entry->conntrack_metadata.bytes_forward_count, READ_ONCE(entry->conntrack_metadata.bytes_forward_count) + p->bytes); + // Update packet's conntract metadata. + __builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata)); + #endif // ENABLE_CONNTRACK_METRICS return _ct_should_report_packet(entry, p->flags, CT_PACKET_DIR_TX, &key); } @@ -333,6 +384,13 @@ static __always_inline __attribute__((unused)) bool ct_process_packet(struct pac // Update the packet accordingly. p->is_reply = true; p->traffic_direction = entry->traffic_direction; + #ifdef ENABLE_CONNTRACK_METRICS + // Update packet count and bytes count on conntrack entry. + WRITE_ONCE(entry->conntrack_metadata.packets_reply_count, READ_ONCE(entry->conntrack_metadata.packets_reply_count) + 1); + WRITE_ONCE(entry->conntrack_metadata.bytes_reply_count, READ_ONCE(entry->conntrack_metadata.bytes_reply_count) + p->bytes); + // Update packet's conntract metadata. + __builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata)); + #endif // ENABLE_CONNTRACK_METRICS return _ct_should_report_packet(entry, p->flags, CT_PACKET_DIR_RX, &reverse_key); } diff --git a/pkg/plugin/conntrack/_cprog/dynamic.h b/pkg/plugin/conntrack/_cprog/dynamic.h new file mode 100644 index 0000000000..80abbd931f --- /dev/null +++ b/pkg/plugin/conntrack/_cprog/dynamic.h @@ -0,0 +1,2 @@ +// Place holder header file that will be replaced by the actual header file during runtime +// DO NOT DELETE diff --git a/pkg/plugin/conntrack/conntrack_bpfel_arm64.go b/pkg/plugin/conntrack/conntrack_bpfel_arm64.go index b96504c3e0..efb93738a6 100644 --- a/pkg/plugin/conntrack/conntrack_bpfel_arm64.go +++ b/pkg/plugin/conntrack/conntrack_bpfel_arm64.go @@ -20,6 +20,12 @@ type conntrackCtEntry struct { FlagsSeenTxDir uint8 FlagsSeenRxDir uint8 IsDirectionUnknown bool + ConntrackMetadata struct { + BytesForwardCount uint64 + BytesReplyCount uint64 + PacketsForwardCount uint32 + PacketsReplyCount uint32 + } } type conntrackCtV4Key struct { diff --git a/pkg/plugin/conntrack/conntrack_bpfel_x86.go b/pkg/plugin/conntrack/conntrack_bpfel_x86.go index 9dede98e2c..baa46bcbfa 100644 --- a/pkg/plugin/conntrack/conntrack_bpfel_x86.go +++ b/pkg/plugin/conntrack/conntrack_bpfel_x86.go @@ -20,6 +20,12 @@ type conntrackCtEntry struct { FlagsSeenTxDir uint8 FlagsSeenRxDir uint8 IsDirectionUnknown bool + ConntrackMetadata struct { + BytesForwardCount uint64 + BytesReplyCount uint64 + PacketsForwardCount uint32 + PacketsReplyCount uint32 + } } type conntrackCtV4Key struct { diff --git a/pkg/plugin/conntrack/conntrack_linux.go b/pkg/plugin/conntrack/conntrack_linux.go index 0f67b427ef..dc2fb0c449 100644 --- a/pkg/plugin/conntrack/conntrack_linux.go +++ b/pkg/plugin/conntrack/conntrack_linux.go @@ -6,11 +6,15 @@ package conntrack import ( "context" + "fmt" + "path" + "runtime" "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/rlimit" "github.com/microsoft/retina/internal/ktime" + "github.com/microsoft/retina/pkg/loader" "github.com/microsoft/retina/pkg/log" plugincommon "github.com/microsoft/retina/pkg/plugin/common" _ "github.com/microsoft/retina/pkg/plugin/conntrack/_cprog" // nolint // This is needed so cprog is included when vendoring @@ -66,6 +70,27 @@ func New() (*Conntrack, error) { return ct, nil } +// Build dynamic header path +func BuildDynamicHeaderPath() string { + // Get absolute path to this file during runtime. + _, filename, _, ok := runtime.Caller(0) + if !ok { + return "" + } + currDir := path.Dir(filename) + return fmt.Sprintf("%s/%s/%s", currDir, bpfSourceDir, dynamicHeaderFileName) +} + +// Generate dynamic header file for conntrack eBPF program. +func GenerateDynamic(ctx context.Context, dynamicHeaderPath string, conntrackMetrics int) error { + st := fmt.Sprintf("#define ENABLE_CONNTRACK_METRICS %d\n", conntrackMetrics) + err := loader.WriteFile(ctx, dynamicHeaderPath, st) + if err != nil { + return errors.Wrap(err, "failed to write conntrack dynamic header") + } + return nil +} + // Run starts the Conntrack garbage collection loop. func (ct *Conntrack) Run(ctx context.Context) error { ticker := time.NewTicker(ct.gcFrequency) diff --git a/pkg/plugin/conntrack/conntrack_linux_test.go b/pkg/plugin/conntrack/conntrack_linux_test.go new file mode 100644 index 0000000000..971ddaead5 --- /dev/null +++ b/pkg/plugin/conntrack/conntrack_linux_test.go @@ -0,0 +1,92 @@ +package conntrack + +import ( + "context" + "fmt" + "os" + "path" + "runtime" + "testing" +) + +func TestBuildDynamicHeaderPath(t *testing.T) { + tests := []struct { + name string + expectedPath string + }{ + { + name: "ExpectedPath", + expectedPath: fmt.Sprintf("%s/%s/%s", path.Dir(getCurrentFilePath(t)), bpfSourceDir, dynamicHeaderFileName), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualPath := BuildDynamicHeaderPath() + if actualPath != tt.expectedPath { + t.Errorf("unexpected dynamic header path: got %q, want %q", actualPath, tt.expectedPath) + } + }) + } +} + +func TestGenerateDynamic(t *testing.T) { + tests := []struct { + name string + conntrackMetrics int + expectedContents string + }{ + { + name: "ConntrackMetricsEnabled", + conntrackMetrics: 1, + expectedContents: "#define ENABLE_CONNTRACK_METRICS 1\n", + }, + { + name: "ConntrackMetricsDisabled", + conntrackMetrics: 0, + expectedContents: "#define ENABLE_CONNTRACK_METRICS 0\n", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a temporary directory + tempDir, err := os.MkdirTemp("", "conntrack_test") + if err != nil { + t.Fatalf("failed to create temp directory: %v", err) + } + // Clean up the temporary directory after the test completes + defer os.RemoveAll(tempDir) + + // Override the dynamicHeaderPath to use the temporary directory + dynamicHeaderPath := path.Join(tempDir, dynamicHeaderFileName) + + // Call the GenerateDynamic function and check if it returns an error. + ctx := context.Background() + if err = GenerateDynamic(ctx, dynamicHeaderPath, tt.conntrackMetrics); err != nil { + t.Fatalf("failed to generate dynamic header: %v", err) + } + + // Verify that the dynamic header file was created in the expected location and contains the expected contents. + if _, err = os.Stat(dynamicHeaderPath); os.IsNotExist(err) { + t.Fatalf("dynamic header file does not exist: %v", err) + } + + actualContents, err := os.ReadFile(dynamicHeaderPath) + if err != nil { + t.Fatalf("failed to read dynamic header file: %v", err) + } + if string(actualContents) != tt.expectedContents { + t.Errorf("unexpected dynamic header file contents: got %q, want %q", string(actualContents), tt.expectedContents) + } + }) + } +} + +func getCurrentFilePath(t *testing.T) string { + _, filename, _, ok := runtime.Caller(1) + if !ok { + t.Fatal("failed to determine test file path") + } + return filename +} diff --git a/pkg/plugin/conntrack/types_linux.go b/pkg/plugin/conntrack/types_linux.go index 637fcade4a..63fa7e90a9 100644 --- a/pkg/plugin/conntrack/types_linux.go +++ b/pkg/plugin/conntrack/types_linux.go @@ -9,7 +9,10 @@ import ( ) const ( - defaultGCFrequency = 15 * time.Second + defaultGCFrequency = 15 * time.Second + bpfSourceDir = "_cprog" + bpfSourceFileName = "conntrack.c" + dynamicHeaderFileName = "dynamic.h" ) type Conntrack struct { diff --git a/pkg/plugin/packetparser/_cprog/packetparser.c b/pkg/plugin/packetparser/_cprog/packetparser.c index 0243ffadcb..40ec8fb03e 100644 --- a/pkg/plugin/packetparser/_cprog/packetparser.c +++ b/pkg/plugin/packetparser/_cprog/packetparser.c @@ -201,6 +201,12 @@ static void parse(struct __sk_buff *skb, __u8 obs) return; } + #ifdef ENABLE_CONNTRACK_METRICS + // Initialize conntrack metadata in packet struct. + struct conntrackmetadata conntrack_metadata; + __builtin_memset(&conntrack_metadata, 0, sizeof(conntrack_metadata)); + p.conntrack_metadata = conntrack_metadata; + #endif // ENABLE_CONNTRACK_METRICS // Process the packet in ct bool report __attribute__((unused)); diff --git a/pkg/plugin/packetparser/packetparser_bpfel_arm64.go b/pkg/plugin/packetparser/packetparser_bpfel_arm64.go index 30c6bb43d3..ebfa05e37f 100644 --- a/pkg/plugin/packetparser/packetparser_bpfel_arm64.go +++ b/pkg/plugin/packetparser/packetparser_bpfel_arm64.go @@ -20,6 +20,12 @@ type packetparserCtEntry struct { FlagsSeenTxDir uint8 FlagsSeenRxDir uint8 IsDirectionUnknown bool + ConntrackMetadata struct { + BytesForwardCount uint64 + BytesReplyCount uint64 + PacketsForwardCount uint32 + PacketsReplyCount uint32 + } } type packetparserCtV4Key struct { @@ -49,12 +55,18 @@ type packetparserPacket struct { Tsval uint32 Tsecr uint32 } - ObservationPoint uint8 - TrafficDirection uint8 - Proto uint8 - Flags uint8 - IsReply bool - _ [3]byte + ObservationPoint uint8 + TrafficDirection uint8 + Proto uint8 + Flags uint8 + IsReply bool + _ [3]byte + ConntrackMetadata struct { + BytesForwardCount uint64 + BytesReplyCount uint64 + PacketsForwardCount uint32 + PacketsReplyCount uint32 + } } // loadPacketparser returns the embedded CollectionSpec for packetparser. diff --git a/pkg/plugin/packetparser/packetparser_bpfel_x86.go b/pkg/plugin/packetparser/packetparser_bpfel_x86.go index 6d3cb0bfe0..36de41247b 100644 --- a/pkg/plugin/packetparser/packetparser_bpfel_x86.go +++ b/pkg/plugin/packetparser/packetparser_bpfel_x86.go @@ -20,6 +20,12 @@ type packetparserCtEntry struct { FlagsSeenTxDir uint8 FlagsSeenRxDir uint8 IsDirectionUnknown bool + ConntrackMetadata struct { + BytesForwardCount uint64 + BytesReplyCount uint64 + PacketsForwardCount uint32 + PacketsReplyCount uint32 + } } type packetparserCtV4Key struct { @@ -49,12 +55,18 @@ type packetparserPacket struct { Tsval uint32 Tsecr uint32 } - ObservationPoint uint8 - TrafficDirection uint8 - Proto uint8 - Flags uint8 - IsReply bool - _ [3]byte + ObservationPoint uint8 + TrafficDirection uint8 + Proto uint8 + Flags uint8 + IsReply bool + _ [3]byte + ConntrackMetadata struct { + BytesForwardCount uint64 + BytesReplyCount uint64 + PacketsForwardCount uint32 + PacketsReplyCount uint32 + } } // loadPacketparser returns the embedded CollectionSpec for packetparser. diff --git a/pkg/plugin/packetparser/packetparser_linux.go b/pkg/plugin/packetparser/packetparser_linux.go index 5a8dd690dd..fb54c8bb5e 100644 --- a/pkg/plugin/packetparser/packetparser_linux.go +++ b/pkg/plugin/packetparser/packetparser_linux.go @@ -32,6 +32,7 @@ import ( "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" plugincommon "github.com/microsoft/retina/pkg/plugin/common" + "github.com/microsoft/retina/pkg/plugin/conntrack" _ "github.com/microsoft/retina/pkg/plugin/lib/_amd64" // nolint _ "github.com/microsoft/retina/pkg/plugin/lib/_arm64" // nolint _ "github.com/microsoft/retina/pkg/plugin/lib/common/libbpf/_include/asm" // nolint @@ -68,23 +69,56 @@ func (p *packetParser) Name() string { return name } -func (p *packetParser) Generate(ctx context.Context) error { +func generateDynamicHeaderPath() (string, error) { // Get absolute path to this file during runtime. _, filename, _, ok := runtime.Caller(0) if !ok { - return errors.New("unable to get absolute path to this file") + return "", errors.New("unable to get absolute path to this file") } dir := path.Dir(filename) - dynamicHeaderPath := fmt.Sprintf("%s/%s/%s", dir, bpfSourceDir, dynamicHeaderFileName) + return fmt.Sprintf("%s/%s/%s", dir, bpfSourceDir, dynamicHeaderFileName), nil +} + +func (p *packetParser) Generate(ctx context.Context) error { + // Variable to store content of dynamic header. + var st string + + dynamicHeaderPath, err := generateDynamicHeaderPath() + if err != nil { + return err + } + // Check if packetparser will bypassing lookup IP of interest. bypassLookupIPOfInterest := 0 if p.cfg.BypassLookupIPOfInterest { p.l.Info("bypassing lookup IP of interest") bypassLookupIPOfInterest = 1 + st = fmt.Sprintf("#define BYPASS_LOOKUP_IP_OF_INTEREST %d\n", bypassLookupIPOfInterest) } + + conntrackMetrics := 0 + // Check if packetparser has Conntrack metrics enabled. + if p.cfg.EnableConntrackMetrics { + p.l.Info("conntrack metrics enabled") + conntrackMetrics = 1 + + // Generate dynamic header for conntrack. + ctDynamicHeaderPath := conntrack.BuildDynamicHeaderPath() + err = conntrack.GenerateDynamic(ctx, ctDynamicHeaderPath, conntrackMetrics) + if err != nil { + return errors.Wrap(err, "failed to generate dynamic header for conntrack") + } + + // Process packetparser dynamic.h conntrack metrics definition. + st += fmt.Sprintf("#define ENABLE_CONNTRACK_METRICS %d\n", conntrackMetrics) + } + + // Process packetparser data aggregation level. p.l.Info("data aggregation level", zap.String("level", p.cfg.DataAggregationLevel.String())) - st := fmt.Sprintf("#define BYPASS_LOOKUP_IP_OF_INTEREST %d\n#define DATA_AGGREGATION_LEVEL %d\n", bypassLookupIPOfInterest, p.cfg.DataAggregationLevel) - err := loader.WriteFile(ctx, dynamicHeaderPath, st) + st += fmt.Sprintf("#define DATA_AGGREGATION_LEVEL %d\n", p.cfg.DataAggregationLevel) + + // Generate dynamic header for packetparser. + err = loader.WriteFile(ctx, dynamicHeaderPath, st) if err != nil { return errors.Wrap(err, "failed to write dynamic header") } diff --git a/pkg/plugin/packetparser/packetparser_linux_test.go b/pkg/plugin/packetparser/packetparser_linux_test.go index 1a7a1bc350..aa27cfa356 100644 --- a/pkg/plugin/packetparser/packetparser_linux_test.go +++ b/pkg/plugin/packetparser/packetparser_linux_test.go @@ -37,6 +37,7 @@ var ( cfgPodLevelEnabled = &kcfg.Config{ EnablePodLevel: true, BypassLookupIPOfInterest: true, + EnableConntrackMetrics: false, } cfgPodLevelDisabled = &kcfg.Config{ EnablePodLevel: false, @@ -49,6 +50,12 @@ var ( EnablePodLevel: true, DataAggregationLevel: kcfg.High, } + cfgConntrackMetricsEnabled = &kcfg.Config{ + EnablePodLevel: true, + DataAggregationLevel: kcfg.High, + BypassLookupIPOfInterest: true, + EnableConntrackMetrics: true, + } ) func TestCleanAll(t *testing.T) { @@ -533,30 +540,60 @@ func TestPacketParseGenerate(t *testing.T) { currDir := path.Dir(filename) dynamicHeaderPath := fmt.Sprintf("%s/%s/%s", currDir, bpfSourceDir, dynamicHeaderFileName) - // Instantiate the packetParser struct with a mocked logger and context. - p := &packetParser{ - cfg: cfgPodLevelEnabled, - l: log.Logger().Named(name), - } - ctx := context.Background() - - // Call the Generate function and check if it returns an error. - if err := p.Generate(ctx); err != nil { - t.Fatalf("failed to generate PacketParser header: %v", err) - } - - // Verify that the dynamic header file was created in the expected location and contains the expected contents. - if _, err := os.Stat(dynamicHeaderPath); os.IsNotExist(err) { - t.Fatalf("dynamic header file does not exist: %v", err) + tests := []struct { + name string + cfg *kcfg.Config + expectedContents string + }{ + { + name: "PodLevelEnabled", + cfg: cfgPodLevelEnabled, + expectedContents: "#define BYPASS_LOOKUP_IP_OF_INTEREST 1\n#define DATA_AGGREGATION_LEVEL 0\n", + }, + { + name: "ConntrackMetricsEnabled", + cfg: cfgConntrackMetricsEnabled, + expectedContents: "#define BYPASS_LOOKUP_IP_OF_INTEREST 1\n#define ENABLE_CONNTRACK_METRICS 1\n#define DATA_AGGREGATION_LEVEL 1\n", + }, + { + name: "DataAggregationLevelLow", + cfg: cfgDataAggregationLevelLow, + expectedContents: "#define DATA_AGGREGATION_LEVEL 0\n", + }, + { + name: "DataAggregationLevelHigh", + cfg: cfgDataAggregationLevelHigh, + expectedContents: "#define DATA_AGGREGATION_LEVEL 1\n", + }, } - expectedContents := "#define BYPASS_LOOKUP_IP_OF_INTEREST 1\n#define DATA_AGGREGATION_LEVEL 0\n" - actualContents, err := os.ReadFile(dynamicHeaderPath) - if err != nil { - t.Fatalf("failed to read dynamic header file: %v", err) - } - if string(actualContents) != expectedContents { - t.Errorf("unexpected dynamic header file contents: got %q, want %q", string(actualContents), expectedContents) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Instantiate the packetParser struct with a mocked logger and context. + p := &packetParser{ + cfg: tt.cfg, + l: log.Logger().Named(name), + } + ctx := context.Background() + + // Call the Generate function and check if it returns an error. + if err := p.Generate(ctx); err != nil { + t.Fatalf("failed to generate PacketParser header: %v", err) + } + + // Verify that the dynamic header file was created in the expected location and contains the expected contents. + if _, err := os.Stat(dynamicHeaderPath); os.IsNotExist(err) { + t.Fatalf("dynamic header file does not exist: %v", err) + } + + actualContents, err := os.ReadFile(dynamicHeaderPath) + if err != nil { + t.Fatalf("failed to read dynamic header file: %v", err) + } + if string(actualContents) != tt.expectedContents { + t.Errorf("unexpected dynamic header file contents: got %q, want %q", string(actualContents), tt.expectedContents) + } + }) } }