From 57fb343bc752d3dd8a5e933dcc2416ba48564c79 Mon Sep 17 00:00:00 2001 From: Tushar Tathgur Date: Sun, 11 Jun 2023 17:39:16 -0700 Subject: [PATCH] L7 Visibility support in Antrea Signed-off-by: Tushar Tathgur --- build/charts/antrea/README.md | 2 +- build/charts/antrea/conf/antrea-agent.conf | 3 + .../antrea/templates/agent/clusterrole.yaml | 13 +- build/charts/antrea/values.yaml | 2 +- build/yamls/antrea-aks.yml | 22 +- build/yamls/antrea-eks.yml | 22 +- build/yamls/antrea-gke.yml | 22 +- build/yamls/antrea-ipsec.yml | 22 +- build/yamls/antrea.yml | 22 +- cmd/antrea-agent/agent.go | 5 + go.mod | 2 + go.sum | 4 +- .../l7engine/l7visbility_watcher.go | 358 ++++++++++++++++++ .../networkpolicy/l7engine/reconciler.go | 15 +- .../networkpolicy/l7engine/reconciler_test.go | 13 + .../controller/trafficcontrol/controller.go | 10 +- .../trafficcontrol/controller_test.go | 26 +- .../flowexporter/connections/conntrack_ovs.go | 22 +- pkg/agent/flowexporter/exporter/exporter.go | 197 +++++++++- .../flowexporter/exporter/exporter_test.go | 262 +++++++++++++ pkg/agent/flowexporter/types.go | 2 + pkg/agent/flowexporter/utils.go | 23 ++ pkg/agent/flowexporter/utils_test.go | 22 ++ pkg/features/antrea_features.go | 4 + .../clickhouseclient/clickhouseclient.go | 9 +- .../clickhouseclient/clickhouseclient_test.go | 4 +- pkg/flowaggregator/flowlogger/logger.go | 2 + pkg/flowaggregator/flowlogger/logger_test.go | 4 +- pkg/flowaggregator/flowrecord/record.go | 8 + pkg/flowaggregator/flowrecord/record_test.go | 4 + pkg/flowaggregator/flowrecord/testing/util.go | 2 + pkg/flowaggregator/infoelements/elements.go | 2 + pkg/flowaggregator/s3uploader/s3uploader.go | 4 + .../s3uploader/s3uploader_test.go | 13 +- pkg/flowaggregator/testing/util.go | 8 + .../flow-visibility/templates/configmap.yaml | 4 +- test/e2e/flowaggregator_test.go | 4 + 37 files changed, 1090 insertions(+), 73 deletions(-) create mode 100644 pkg/agent/controller/networkpolicy/l7engine/l7visbility_watcher.go diff --git a/build/charts/antrea/README.md b/build/charts/antrea/README.md index 07ffa8efcd4..68227639ea3 100644 --- a/build/charts/antrea/README.md +++ b/build/charts/antrea/README.md @@ -78,7 +78,7 @@ Kubernetes: `>= 1.16.0-0` | enableBridgingMode | bool | `false` | Enable bridging mode of Pod network on Nodes, in which the Node's transport interface is connected to the OVS bridge. | | featureGates | object | `{}` | To explicitly enable or disable a FeatureGate and bypass the Antrea defaults, add an entry to the dictionary with the FeatureGate's name as the key and a boolean as the value. | | flowExporter.activeFlowExportTimeout | string | `"5s"` | timeout after which a flow record is sent to the collector for active flows. | -| flowExporter.enable | bool | `false` | Enable the flow exporter feature. | +| flowExporter.enable | bool | `true` | Enable the flow exporter feature. | | flowExporter.flowCollectorAddr | string | `"flow-aggregator/flow-aggregator:4739:tls"` | IPFIX collector address as a string with format :[][:]. If the collector is running in-cluster as a Service, set to /. | | flowExporter.flowPollInterval | string | `"5s"` | Determines how often the flow exporter polls for new connections. | | flowExporter.idleFlowExportTimeout | string | `"15s"` | timeout after which a flow record is sent to the collector for idle flows. | diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index e8b2ed9b2a5..b0eb7e63880 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -80,6 +80,9 @@ featureGates: # Allow users to specify the load balancer mode as DSR (Direct Server Return). {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "LoadBalancerModeDSR" "default" false) }} +# Enable L7Visibility on Pods and Namespace. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "L7Visibility" "default" false) }} + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 7db11aebb8e..aaf95e3725a 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -173,7 +173,6 @@ rules: resources: - externalippools - ippools - - trafficcontrols verbs: - get - watch @@ -219,3 +218,15 @@ rules: - get - list - watch + - apiGroups: + - crd.antrea.io + resources: + - trafficcontrols + verbs: + - get + - watch + - list + - update + - patch + - create + - delete diff --git a/build/charts/antrea/values.yaml b/build/charts/antrea/values.yaml index c28a90005e0..527c2959a1b 100644 --- a/build/charts/antrea/values.yaml +++ b/build/charts/antrea/values.yaml @@ -322,7 +322,7 @@ controller: flowExporter: # -- Enable the flow exporter feature. - enable: false + enable: true # -- IPFIX collector address as a string with format :[][:]. # If the collector is running in-cluster as a Service, set to # /. diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 1d6237f08b3..cda3f6585b4 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -5535,6 +5535,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable L7Visibility on Pods and Namespace. + # L7Visibility: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -5654,7 +5657,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be @@ -6161,7 +6164,6 @@ rules: resources: - externalippools - ippools - - trafficcontrols verbs: - get - watch @@ -6207,6 +6209,18 @@ rules: - get - list - watch + - apiGroups: + - crd.antrea.io + resources: + - trafficcontrols + verbs: + - get + - watch + - list + - update + - patch + - create + - delete --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -6818,7 +6832,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: e982ae7bedfa361f13e134516243f3c8d566b9297abc58f51c9cd1b637739790 + checksum/config: decff49e8a2dd2019acbb0b49a88c1deb81b53fc64e4b6ca2741babac00ae97a labels: app: antrea component: antrea-agent @@ -7059,7 +7073,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: e982ae7bedfa361f13e134516243f3c8d566b9297abc58f51c9cd1b637739790 + checksum/config: decff49e8a2dd2019acbb0b49a88c1deb81b53fc64e4b6ca2741babac00ae97a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index c4b587323c5..cc10810bbd3 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -5535,6 +5535,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable L7Visibility on Pods and Namespace. + # L7Visibility: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -5654,7 +5657,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be @@ -6161,7 +6164,6 @@ rules: resources: - externalippools - ippools - - trafficcontrols verbs: - get - watch @@ -6207,6 +6209,18 @@ rules: - get - list - watch + - apiGroups: + - crd.antrea.io + resources: + - trafficcontrols + verbs: + - get + - watch + - list + - update + - patch + - create + - delete --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -6818,7 +6832,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: e982ae7bedfa361f13e134516243f3c8d566b9297abc58f51c9cd1b637739790 + checksum/config: decff49e8a2dd2019acbb0b49a88c1deb81b53fc64e4b6ca2741babac00ae97a labels: app: antrea component: antrea-agent @@ -7060,7 +7074,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: e982ae7bedfa361f13e134516243f3c8d566b9297abc58f51c9cd1b637739790 + checksum/config: decff49e8a2dd2019acbb0b49a88c1deb81b53fc64e4b6ca2741babac00ae97a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index e7429fb773a..65ef02ddbcd 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -5535,6 +5535,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable L7Visibility on Pods and Namespace. + # L7Visibility: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -5654,7 +5657,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be @@ -6161,7 +6164,6 @@ rules: resources: - externalippools - ippools - - trafficcontrols verbs: - get - watch @@ -6207,6 +6209,18 @@ rules: - get - list - watch + - apiGroups: + - crd.antrea.io + resources: + - trafficcontrols + verbs: + - get + - watch + - list + - update + - patch + - create + - delete --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -6818,7 +6832,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 977cc5c6179f9ac01800457f4549f2783876dd94f2eaf165085808b742019cc1 + checksum/config: dd8501e04951f3bbe5249f53fc82d04235f5fb9f7e29bff008acf01324544e13 labels: app: antrea component: antrea-agent @@ -7057,7 +7071,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 977cc5c6179f9ac01800457f4549f2783876dd94f2eaf165085808b742019cc1 + checksum/config: dd8501e04951f3bbe5249f53fc82d04235f5fb9f7e29bff008acf01324544e13 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 11cfa843ba6..8f202430ca0 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -5548,6 +5548,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable L7Visibility on Pods and Namespace. + # L7Visibility: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -5667,7 +5670,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be @@ -6174,7 +6177,6 @@ rules: resources: - externalippools - ippools - - trafficcontrols verbs: - get - watch @@ -6220,6 +6222,18 @@ rules: - get - list - watch + - apiGroups: + - crd.antrea.io + resources: + - trafficcontrols + verbs: + - get + - watch + - list + - update + - patch + - create + - delete --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -6831,7 +6845,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 754b9e45b24d9a03a6be907d1dda1966a84598841e871bfa624932e11aeb739f + checksum/config: df60dfa3e1201bc80193f39d5f38b03f4713be2bbc2b2334381c8aa47d8f1e9a checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -7116,7 +7130,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 754b9e45b24d9a03a6be907d1dda1966a84598841e871bfa624932e11aeb739f + checksum/config: df60dfa3e1201bc80193f39d5f38b03f4713be2bbc2b2334381c8aa47d8f1e9a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 9ca402b0088..65697827973 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -5535,6 +5535,9 @@ data: # Allow users to specify the load balancer mode as DSR (Direct Server Return). # LoadBalancerModeDSR: false + # Enable L7Visibility on Pods and Namespace. + # L7Visibility: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -5654,7 +5657,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be @@ -6161,7 +6164,6 @@ rules: resources: - externalippools - ippools - - trafficcontrols verbs: - get - watch @@ -6207,6 +6209,18 @@ rules: - get - list - watch + - apiGroups: + - crd.antrea.io + resources: + - trafficcontrols + verbs: + - get + - watch + - list + - update + - patch + - create + - delete --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -6818,7 +6832,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 7218db7d40a2ba0043ebc518f71ed1e5c38a715360a732492ae762964e11c884 + checksum/config: c4f58fbb3065448295fab081c6bb9fa5e69536333ee9e32464ffaee9a84148ed labels: app: antrea component: antrea-agent @@ -7057,7 +7071,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 7218db7d40a2ba0043ebc518f71ed1e5c38a715360a732492ae762964e11c884 + checksum/config: c4f58fbb3065448295fab081c6bb9fa5e69536333ee9e32464ffaee9a84148ed labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index eddaa4bf81e..be8503c2bf6 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -39,6 +39,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/egress" "antrea.io/antrea/pkg/agent/controller/ipseccertificate" "antrea.io/antrea/pkg/agent/controller/networkpolicy" + "antrea.io/antrea/pkg/agent/controller/networkpolicy/l7engine" "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" @@ -710,6 +711,7 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.TrafficControl) { tcController := trafficcontrol.NewTrafficControlController(ofClient, + crdClient, ifaceStore, ovsBridgeClient, ovsCtlClient, @@ -718,6 +720,9 @@ func run(o *Options) error { namespaceInformer, podUpdateChannel) go tcController.Run(stopCh) + if features.DefaultFeatureGate.Enabled(features.L7Visibility) { + go l7engine.Run(tcController) + } } // Start the localPodInformer diff --git a/go.mod b/go.mod index f9b3bafa6fb..235c7c48782 100644 --- a/go.mod +++ b/go.mod @@ -236,3 +236,5 @@ require ( sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) + +replace github.com/vmware/go-ipfix v0.6.2 => github.com/tushartathgur/go-ipfix v0.0.0-20230803191543-aacd1016296d diff --git a/go.sum b/go.sum index c50ace5d228..90130adaab7 100644 --- a/go.sum +++ b/go.sum @@ -1081,6 +1081,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA= +github.com/tushartathgur/go-ipfix v0.0.0-20230803191543-aacd1016296d h1:pJ+1RREibFRvyp55mf7B7xnKRBQ/DUZyUn0yqbspfqg= +github.com/tushartathgur/go-ipfix v0.0.0-20230803191543-aacd1016296d/go.mod h1:dGCppoeqknr9o3yz9BD74mP/FPHgefb6v34xdUKxDPI= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= @@ -1098,8 +1100,6 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17 github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= -github.com/vmware/go-ipfix v0.6.2 h1:9awOJ9HV2ZsBYqB1bbUu0ULX8mqR/hekt884/IRjijs= -github.com/vmware/go-ipfix v0.6.2/go.mod h1:dGCppoeqknr9o3yz9BD74mP/FPHgefb6v34xdUKxDPI= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/pkg/agent/controller/networkpolicy/l7engine/l7visbility_watcher.go b/pkg/agent/controller/networkpolicy/l7engine/l7visbility_watcher.go new file mode 100644 index 00000000000..30bed60c3d5 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/l7engine/l7visbility_watcher.go @@ -0,0 +1,358 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package l7engine + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "os" + "os/exec" + "strings" + "sync" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/apis/crd/v1alpha2" + + "antrea.io/antrea/pkg/agent/controller/trafficcontrol" +) + +var ( + L7mutex sync.Mutex + labelsToTCMap map[string][]string +) + +func Run(tcController *trafficcontrol.Controller) { + kubeconfig, err := ResolveKubeConfig() + if err != nil { + klog.ErrorS(err, "") + } + + clientset, err := kubernetes.NewForConfig(kubeconfig) + if err != nil { + klog.ErrorS(err, "") + } + + // Create a watcher for pods and namespaces + podWatcher, err := clientset.CoreV1().Pods("").Watch(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.Everything().String(), + }) + if err != nil { + klog.ErrorS(err, "") + } + defer podWatcher.Stop() + + namespaceWatcher, err := clientset.CoreV1().Namespaces().Watch(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.Everything().String(), + }) + if err != nil { + klog.ErrorS(err, "") + } + defer namespaceWatcher.Stop() + + // Set up channels to receive watch events + podEvents := podWatcher.ResultChan() + namespaceEvents := namespaceWatcher.ResultChan() + l7Reconciler := NewReconciler() + labelsToTCMap = make(map[string][]string) + + // Start listening for events + go processPodEvents(clientset, podEvents, l7Reconciler, tcController) + go processNamespaceEvents(clientset, namespaceEvents, l7Reconciler, tcController) + + select {} +} + +func processPodEvents(clientset *kubernetes.Clientset, events <-chan watch.Event, l7Reconciler *Reconciler, tcController *trafficcontrol.Controller) { + direction := v1alpha2.DirectionBoth + for event := range events { + pod, ok := event.Object.(*corev1.Pod) + if !ok { + klog.InfoS("Received unexpected object: ", "", event.Object) + continue + } + nodeName := pod.Spec.NodeName + podLabels := pod.ObjectMeta.Labels + // Get the namespace of the pod + namespaceName := pod.ObjectMeta.Namespace + // Get the labels of the namespace + namespaceObj, err := clientset.CoreV1().Namespaces().Get(context.TODO(), namespaceName, metav1.GetOptions{}) + if err != nil { + klog.ErrorS(err, "") + return + } + namespaceLabels := namespaceObj.ObjectMeta.Labels + if event.Type == watch.Added || event.Type == watch.Modified { + if _, exists := pod.Annotations["l7visibility"]; exists { + value := pod.Annotations["l7visibility"] + elements := strings.Split(value, "/") + switch elements[0] { + case "Ingress": + direction = v1alpha2.DirectionIngress + case "Egress": + direction = v1alpha2.DirectionEgress + case "Both": + direction = v1alpha2.DirectionBoth + } + startSuricataAndCreateTC(direction, nodeName, podLabels, namespaceLabels, clientset, l7Reconciler, tcController) + } else { + deleteTC(podLabels, namespaceLabels, tcController) + } + } else if event.Type == watch.Deleted { + deleteTC(podLabels, namespaceLabels, tcController) + } + } +} + +func processNamespaceEvents(clientset *kubernetes.Clientset, events <-chan watch.Event, l7Reconciler *Reconciler, tcController *trafficcontrol.Controller) { + direction := v1alpha2.DirectionBoth + nodeList, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.Everything().String(), + LabelSelector: labels.Everything().String(), + }) + if err != nil { + klog.Errorf("Failed to list all nodes: %v", err) + } + for event := range events { + namespace, ok := event.Object.(*corev1.Namespace) + if !ok { + klog.InfoS("Received unexpected object: ", "", event.Object) + continue + } + namespaceLabels := namespace.ObjectMeta.Labels + if event.Type == watch.Added || event.Type == watch.Modified { + if _, exists := namespace.Annotations["l7visibility"]; exists { + value := namespace.Annotations["l7visibility"] + elements := strings.Split(value, "/") + switch elements[0] { + case "Ingress": + direction = v1alpha2.DirectionIngress + case "Egress": + direction = v1alpha2.DirectionEgress + case "Both": + direction = v1alpha2.DirectionBoth + } + for _, node := range nodeList.Items { + startSuricataAndCreateTC(direction, node.Name, nil, namespaceLabels, clientset, l7Reconciler, tcController) + } + } else { + deleteTC(nil, namespaceLabels, tcController) + } + } else if event.Type == watch.Deleted { + deleteTC(nil, namespaceLabels, tcController) + } + } +} + +func ResolveKubeConfig() (*rest.Config, error) { + var hasIt bool + var kubeconfigPath string + var err error + kubeconfigPath, hasIt = os.LookupEnv("KUBECONFIG") + if !hasIt || len(strings.TrimSpace(kubeconfigPath)) == 0 { + kubeconfigPath = clientcmd.RecommendedHomeFile + } + if _, err = os.Stat(kubeconfigPath); kubeconfigPath == clientcmd.RecommendedHomeFile && os.IsNotExist(err) { + return rest.InClusterConfig() + } + return clientcmd.BuildConfigFromFlags("", kubeconfigPath) +} + +func StartSuricataIfNotRunning(clientset *kubernetes.Clientset, nodeName string, l7Reconciler *Reconciler) { + L7mutex.Lock() + defer L7mutex.Unlock() + + // Get the node by name + node, err := clientset.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + klog.ErrorS(err, "") + } + + // Check if Suricata is running on the node + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue { + checkSuricataProcess(nodeName, l7Reconciler) + } + } +} + +func checkSuricataProcess(nodeName string, l7Reconciler *Reconciler) { + currentnode := os.Getenv("NODE_NAME") + if nodeName == currentnode { + cmd := exec.Command("bash", "-c", "ps aux | grep suricata") + stdout, err := cmd.CombinedOutput() + if err != nil { + return + } + if !strings.Contains(string(stdout), "suricata -c") { + l7Reconciler.startSuricata() + } + } +} + +func createTrafficControl( + tcName string, + nsSelector, + podSelector map[string]string, + direction v1alpha2.Direction, + action v1alpha2.TrafficControlAction, + targetPort interface{}, + tcController *trafficcontrol.Controller) *v1alpha2.TrafficControl { + tc := &v1alpha2.TrafficControl{ + ObjectMeta: metav1.ObjectMeta{Name: tcName}, + Spec: v1alpha2.TrafficControlSpec{ + Direction: direction, + Action: action, + }, + } + if nsSelector != nil { + tc.Spec.AppliedTo.NamespaceSelector = &metav1.LabelSelector{MatchLabels: nsSelector} + } + if podSelector != nil { + tc.Spec.AppliedTo.PodSelector = &metav1.LabelSelector{MatchLabels: podSelector} + } + tc.Spec.TargetPort.OVSInternal = targetPort.(*v1alpha2.OVSInternalPort) + trafficControlApi := tcController.CrdClient.CrdV1alpha2().TrafficControls() + tc, err := trafficControlApi.Create(context.TODO(), tc, metav1.CreateOptions{}) + if err != nil && !strings.Contains(err.Error(), "already exists") { + klog.ErrorS(err, "Failed to create TrafficControl") + } + return tc +} + +func postTrafficControl(tcName string, direction v1alpha2.Direction, podLabels, namespaceLabels map[string]string, tcController *trafficcontrol.Controller) *v1alpha2.TrafficControl { + targetPort := &v1alpha2.OVSInternalPort{Name: config.L7NetworkPolicyTargetPortName} + var tc *v1alpha2.TrafficControl + if len(podLabels) != 0 { + tc = createTrafficControl(tcName, namespaceLabels, podLabels, direction, v1alpha2.ActionMirror, targetPort, tcController) + } else { + tc = createTrafficControl(tcName, namespaceLabels, nil, direction, v1alpha2.ActionMirror, targetPort, tcController) + } + err := tcController.SyncTrafficControl(tcName) + if err != nil { + klog.ErrorS(err, "Error Syncing Traffic Control") + } + return tc +} + +func deleteTrafficControl(tcName string, tcController *trafficcontrol.Controller) error { + return tcController.CrdClient.CrdV1alpha2().TrafficControls().Delete(context.TODO(), tcName, metav1.DeleteOptions{}) +} + +func generateTCName(direction v1alpha2.Direction, podLabels, namespaceLabels map[string]string) string { + directionStr := "Both" + switch direction { + case v1alpha2.DirectionIngress: + directionStr = "Ingress" + case v1alpha2.DirectionEgress: + directionStr = "Egress" + case v1alpha2.DirectionBoth: + directionStr = "Both" + } + if podLabels == nil { + podLabels = make(map[string]string) + } + data := map[string]interface{}{ + "direction": directionStr, + "podLabels": podLabels, + "namespaceLabels": namespaceLabels, + } + dataJSON, err := json.Marshal(data) + if err != nil { + klog.ErrorS(err, "Error creating hash for Traffic Control") + return "" + } + hashval := sha256.Sum256(dataJSON) + hash := fmt.Sprintf("%x", hashval[:14]) + return hash +} + +func addToLabelsToTCMap(labelsKey, tcName string) error { + if tcArray, ok := labelsToTCMap[labelsKey]; ok { + labelsToTCMap[labelsKey] = append(tcArray, tcName) + } else { + labelsToTCMap[labelsKey] = []string{tcName} + } + return nil +} + +func createLabelsKey(podLabels, namespaceLabels map[string]string) string { + data := map[string]interface{}{ + "podLabels": podLabels, + "namespaceLabels": namespaceLabels, + } + dataJSON, err := json.Marshal(data) + if err != nil { + klog.ErrorS(err, "Error creating hash for Traffic Control") + return "" + } + hashval := sha256.Sum256(dataJSON) + key := fmt.Sprintf("%x", hashval[:14]) + return key +} + +func startSuricataAndCreateTC(direction v1alpha2.Direction, nodeName string, podLabels, namespaceLabels map[string]string, clientset *kubernetes.Clientset, l7Reconciler *Reconciler, tcController *trafficcontrol.Controller) { + StartSuricataIfNotRunning(clientset, nodeName, l7Reconciler) + if podLabels == nil { + podLabels = make(map[string]string) + } + key := createLabelsKey(podLabels, namespaceLabels) + if key == "" { + klog.Errorf("Labels Key not created!") + } + tcName := generateTCName(direction, podLabels, namespaceLabels) + if tcName == "" { + klog.Errorf("TC Name not created!") + } + err := addToLabelsToTCMap(key, tcName) + if err != nil { + klog.ErrorS(err, "Error Creating Labels to Traffic Control Map") + } + postTrafficControl(tcName, direction, podLabels, namespaceLabels, tcController) +} + +func deleteTC(podLabels, namespaceLabels map[string]string, tcController *trafficcontrol.Controller) { + if podLabels == nil { + podLabels = make(map[string]string) + } + key := createLabelsKey(podLabels, namespaceLabels) + if key == "" { + klog.Errorf("Labels Key not created for Delete op!") + return + } + if tcNames, ok := labelsToTCMap[key]; ok { + for _, tcName := range tcNames { + if err := deleteTrafficControl(tcName, tcController); err != nil && !strings.Contains(err.Error(), "not found") { + klog.ErrorS(err, "Deleting TrafficControl Failed:") + } else { + err = tcController.SyncTrafficControl(tcName) + if err != nil { + klog.ErrorS(err, "Error Syncing Traffic Control") + } + } + } + } +} diff --git a/pkg/agent/controller/networkpolicy/l7engine/reconciler.go b/pkg/agent/controller/networkpolicy/l7engine/reconciler.go index b669cfadb80..063a6625ca2 100644 --- a/pkg/agent/controller/networkpolicy/l7engine/reconciler.go +++ b/pkg/agent/controller/networkpolicy/l7engine/reconciler.go @@ -244,8 +244,8 @@ func (r *Reconciler) AddRule(ruleID, policyName string, vlanID uint32, l7Protoco // Add a Suricata tenant. if err := r.addBindingSuricataTenant(vlanID, rulesPath); err != nil { return fmt.Errorf("failed to add Suricata tenant for L7 rule %s of %s: %w", ruleID, policyName, err) - } + } return nil } @@ -421,6 +421,19 @@ outputs: types: - alert: tagged-packets: yes + - eve-log: + enabled: yes + filetype: unix_stream + filename: /var/log/antrea/networkpolicy/suricata_eve.socket + rotate-interval: day + pcap-file: false + community-id: false + community-id-seed: 0 + xff: + enabled: no + types: + - http: + extended: yes af-packet: - interface: %[1]s threads: auto diff --git a/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go b/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go index f5b960a1233..e73e44c01f8 100644 --- a/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go @@ -146,6 +146,19 @@ outputs: types: - alert: tagged-packets: yes + - eve-log: + enabled: yes + filetype: unix_stream + filename: /var/log/antrea/networkpolicy/suricata_eve.socket + rotate-interval: day + pcap-file: false + community-id: false + community-id-seed: 0 + xff: + enabled: no + types: + - http: + extended: yes af-packet: - interface: antrea-l7-tap0 threads: auto diff --git a/pkg/agent/controller/trafficcontrol/controller.go b/pkg/agent/controller/trafficcontrol/controller.go index 1c0d8720a77..488f710666b 100644 --- a/pkg/agent/controller/trafficcontrol/controller.go +++ b/pkg/agent/controller/trafficcontrol/controller.go @@ -42,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/apis/crd/v1alpha2" + clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2" crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" "antrea.io/antrea/pkg/ovs/ovsconfig" @@ -112,7 +113,8 @@ type portToTCBinding struct { } type Controller struct { - ofClient openflow.Client + ofClient openflow.Client + CrdClient clientsetversioned.Interface portToTCBindings map[string]*portToTCBinding ovsBridgeClient ovsconfig.OVSBridgeClient @@ -142,6 +144,7 @@ type Controller struct { } func NewTrafficControlController(ofClient openflow.Client, + crdClient clientsetversioned.Interface, interfaceStore interfacestore.InterfaceStore, ovsBridgeClient ovsconfig.OVSBridgeClient, ovsCtlClient ovsctl.OVSCtlClient, @@ -151,6 +154,7 @@ func NewTrafficControlController(ofClient openflow.Client, podUpdateSubscriber channel.Subscriber) *Controller { c := &Controller{ ofClient: ofClient, + CrdClient: crdClient, ovsBridgeClient: ovsBridgeClient, ovsCtlClient: ovsCtlClient, interfaceStore: interfaceStore, @@ -404,7 +408,7 @@ func (c *Controller) processNextWorkItem() bool { c.queue.Forget(obj) klog.Errorf("Expected string in work queue but got %#v", obj) return true - } else if err := c.syncTrafficControl(key); err == nil { + } else if err := c.SyncTrafficControl(key); err == nil { // If no error occurs we Forget this item, so it does not get queued again until // another change happens. c.queue.Forget(key) @@ -801,7 +805,7 @@ func (c *Controller) releaseTrafficControlPort(portName, tcName string, isReturn return nil } -func (c *Controller) syncTrafficControl(tcName string) error { +func (c *Controller) SyncTrafficControl(tcName string) error { startTime := time.Now() defer func() { klog.V(2).InfoS("Finished syncing TrafficControl", "TrafficControl", tcName, "durationTime", time.Since(startTime)) diff --git a/pkg/agent/controller/trafficcontrol/controller_test.go b/pkg/agent/controller/trafficcontrol/controller_test.go index ec362e65833..b645ade331a 100644 --- a/pkg/agent/controller/trafficcontrol/controller_test.go +++ b/pkg/agent/controller/trafficcontrol/controller_test.go @@ -161,7 +161,7 @@ func newFakeController(t *testing.T, objects []runtime.Object, initObjects []run } podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) - tcController := NewTrafficControlController(mockOFClient, ifaceStore, mockOVSBridgeClient, mockOVSCtlClient, tcInformer, localPodInformer, nsInformer, podUpdateChannel) + tcController := NewTrafficControlController(mockOFClient, crdClient, ifaceStore, mockOVSBridgeClient, mockOVSCtlClient, tcInformer, localPodInformer, nsInformer, podUpdateChannel) podUpdateChannel.Subscribe(tcController.processPodUpdate) return &fakeController{ @@ -486,7 +486,7 @@ func TestTrafficControlAdd(t *testing.T) { c.startInformers(stopCh) tt.expectedCalls(c.mockOFClient, c.mockOVSBridgeClient, c.mockOVSCtlClient) - assert.NoError(t, c.syncTrafficControl(tt.tc.Name)) + assert.NoError(t, c.SyncTrafficControl(tt.tc.Name)) }) } } @@ -609,7 +609,7 @@ func TestTrafficControlUpdate(t *testing.T) { tt.expectedCalls(c.mockOFClient, c.mockOVSBridgeClient, c.mockOVSCtlClient) waitEvents(t, 1, c) - require.NoError(t, c.syncTrafficControl(tc1Name)) + require.NoError(t, c.SyncTrafficControl(tc1Name)) require.Equal(t, tt.expectedState, c.tcStates[tc1Name]) }) } @@ -644,7 +644,7 @@ func TestSharedTargetPort(t *testing.T) { waitEvents(t, 2, c) for i := 0; i < 2; i++ { item, _ := c.queue.Get() - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) } @@ -661,7 +661,7 @@ func TestSharedTargetPort(t *testing.T) { waitEvents(t, 1, c) item, _ := c.queue.Get() require.Equal(t, tc1Name, item) - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) // Delete TrafficControl tc2. @@ -670,7 +670,7 @@ func TestSharedTargetPort(t *testing.T) { waitEvents(t, 1, c) item, _ = c.queue.Get() require.Equal(t, tc2Name, item) - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) } @@ -713,7 +713,7 @@ func TestPodUpdateFromCNIServer(t *testing.T) { waitEvents(t, 1, c) item, _ = c.queue.Get() require.Equal(t, tc1Name, item) - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) // After syncing, verify the state of TrafficControl tc1. @@ -733,7 +733,7 @@ func TestPodUpdateFromCNIServer(t *testing.T) { waitEvents(t, 1, c) item, _ = c.queue.Get() require.Equal(t, tc1Name, item) - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) // After syncing, verify the state of TrafficControl tc1. @@ -896,7 +896,7 @@ func TestPodLabelsUpdate(t *testing.T) { waitEvents(t, tt.eventsTriggeredByPodLabelsUpdate, c) for i := 0; i < tt.eventsTriggeredByPodLabelsUpdate; i++ { item, _ := c.queue.Get() - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) } @@ -905,7 +905,7 @@ func TestPodLabelsUpdate(t *testing.T) { waitEvents(t, tt.eventsTriggeredByPodEffectiveTCUpdate, c) for i := 0; i < tt.eventsTriggeredByPodEffectiveTCUpdate; i++ { item, _ := c.queue.Get() - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) } } @@ -1072,7 +1072,7 @@ func TestNamespaceLabelsUpdate(t *testing.T) { waitEvents(t, tt.eventsTriggeredByNSLabelsUpdate, c) for i := 0; i < tt.eventsTriggeredByNSLabelsUpdate; i++ { item, _ := c.queue.Get() - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) } @@ -1081,7 +1081,7 @@ func TestNamespaceLabelsUpdate(t *testing.T) { waitEvents(t, tt.eventsTriggeredByPodEffectiveTCUpdate, c) for i := 0; i < tt.eventsTriggeredByPodEffectiveTCUpdate; i++ { item, _ := c.queue.Get() - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) } } @@ -1169,7 +1169,7 @@ func TestPodDelete(t *testing.T) { waitEvents(t, 3, c) for i := 0; i < 3; i++ { item, _ := c.queue.Get() - require.NoError(t, c.syncTrafficControl(item.(string))) + require.NoError(t, c.SyncTrafficControl(item.(string))) c.queue.Done(item) } diff --git a/pkg/agent/flowexporter/connections/conntrack_ovs.go b/pkg/agent/flowexporter/connections/conntrack_ovs.go index 52e0f89bfba..e4f4b67c8d3 100644 --- a/pkg/agent/flowexporter/connections/conntrack_ovs.go +++ b/pkg/agent/flowexporter/connections/conntrack_ovs.go @@ -32,13 +32,6 @@ import ( // Following map is for converting protocol name (string) to protocol identifier var ( - protocols = map[string]uint8{ - "icmp": 1, - "igmp": 2, - "tcp": 6, - "udp": 17, - "ipv6-icmp": 58, - } // Mapping is defined at https://github.com/torvalds/linux/blob/v5.9/include/uapi/linux/netfilter/nf_conntrack_common.h#L42 conntrackStatusMap = map[string]uint32{ "EXPECTED": uint32(1), @@ -140,7 +133,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter switch { case hasAnyProto(fs): // Proto identifier - proto, err := lookupProtocolMap(fs) + proto, err := flowexporter.LookupProtocolMap(fs) if err != nil { return nil, err } @@ -287,7 +280,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter } func hasAnyProto(text string) bool { - for proto := range protocols { + for proto := range flowexporter.Protocols { if strings.Contains(strings.ToLower(text), proto) { return true } @@ -295,17 +288,6 @@ func hasAnyProto(text string) bool { return false } -// lookupProtocolMap returns protocol identifier given protocol name -func lookupProtocolMap(name string) (uint8, error) { - name = strings.TrimSpace(name) - lowerCaseStr := strings.ToLower(name) - proto, found := protocols[lowerCaseStr] - if !found { - return 0, fmt.Errorf("unknown IP protocol specified: %s", name) - } - return proto, nil -} - func (ct *connTrackOvsCtl) GetMaxConnections() (int, error) { cmdOutput, execErr := ct.ovsctlClient.RunAppctlCmd("dpctl/ct-get-maxconns", false) if execErr != nil { diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index fb853f895c5..52921f4820d 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -15,10 +15,17 @@ package exporter import ( + "bufio" "context" + "encoding/json" "fmt" "hash/fnv" "net" + "os" + "path/filepath" + "strconv" + "strings" + "sync" "time" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" @@ -36,6 +43,7 @@ import ( "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/proxy" + "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/ipfix" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/querier" @@ -54,9 +62,13 @@ import ( // can be taking a fraction of the size of connection store to approximate the // number of expired connections, while having a min and a max to handle edge cases, // e.g. min(50 + 0.1 * connectionStore.size(), 200) -const maxConnsToExport = 64 +const ( + maxConnsToExport = 64 + socketPath = "suricata_eve.socket" +) var ( + l7mut sync.Mutex IANAInfoElementsCommon = []string{ "flowStartSeconds", "flowEndSeconds", @@ -101,6 +113,8 @@ var ( "flowType", "egressName", "egressIP", + "isL7", + "httpVals", } AntreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...) AntreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...) @@ -130,6 +144,39 @@ type FlowExporter struct { expiredConns []flowexporter.Connection egressQuerier querier.EgressQuerier podStore podstore.Interface + l7events map[flowexporter.ConnectionKey]*httpEvent +} + +// Define struct to hold the L7 flow JSON values +type Http struct { + Hostname string `json:"hostname"` + URL string `json:"url"` + UserAgent string `json:"http_user_agent"` + ContentType string `json:"http_content_type"` + Method string `json:"http_method"` + Protocol string `json:"protocol"` + Status int `json:"status"` + ContentLength int `json:"length"` +} + +type JsonToEvent struct { + Timestamp string `json:"timestamp"` + FlowID int64 `json:"flow_id"` + InInterface string `json:"in_iface"` + EventType string `json:"event_type"` + VLAN []int `json:"vlan"` + SrcIP net.IP `json:"src_ip"` + SrcPort int `json:"src_port"` + DestIP net.IP `json:"dest_ip"` + DestPort int `json:"dest_port"` + Proto string `json:"proto"` + TxID int `json:"tx_id"` + HTTP Http `json:"http"` +} + +type httpEvent struct { + http [][]string + httpQueryFlag bool } func genObservationID(nodeName string) uint32 { @@ -191,6 +238,7 @@ func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClie expiredConns: make([]flowexporter.Connection, 0, maxConnsToExport*2), egressQuerier: egressQuerier, podStore: podStore, + l7events: make(map[flowexporter.ConnectionKey]*httpEvent), }, nil } @@ -206,6 +254,11 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) { // Start the goroutine to poll conntrack flows. go exp.conntrackConnStore.Run(stopCh) + // Start L7 connection flow socket + if features.DefaultFeatureGate.Enabled(features.L7NetworkPolicy) { + go exp.l7Listener() + } + defaultTimeout := exp.conntrackPriorityQueue.ActiveFlowTimeout expireTimer := time.NewTimer(defaultTimeout) for { @@ -421,6 +474,7 @@ func (exp *FlowExporter) sendTemplateSet(isIPv6 bool) (int, error) { func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { exp.ipfixSet.ResetSet() + connkey := flowexporter.NewConnectionKey(conn) eL := exp.elementsListv4 templateID := exp.templateIDv4 @@ -567,6 +621,10 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { ie.SetStringValue(conn.EgressName) case "egressIP": ie.SetStringValue(conn.EgressIP) + case "isL7": + ie.SetStringValue(exp.getL7EventData(connkey, "isL7")) + case "httpVals": + ie.SetStringValue(exp.getL7EventData(connkey, "httpVals")) } } err := exp.ipfixSet.AddRecord(eL, templateID) @@ -651,3 +709,140 @@ func getMinTime(t1, t2 time.Duration) time.Duration { } return t2 } + +func (exp *FlowExporter) l7Listener() { + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + klog.ErrorS(err, "Error removing Suricata socket") + } + if err := os.MkdirAll(filepath.Dir(socketPath), 0755); err != nil { + klog.Fatalf("Failed to create directory %s: %v", filepath.Dir(socketPath), err) + } + listener, err := net.Listen("unix", socketPath) + if err != nil { + klog.ErrorS(err, "Error listening on Suricata socket") + return + } + defer listener.Close() + + klog.InfoS("L7 Listener Server started. Listening for connections...") + + for { + conn, err := listener.Accept() + if err != nil { + klog.ErrorS(err, "Error accepting Suricata connection") + continue + } + go exp.handleClientConnection(conn) + } +} + +func (exp *FlowExporter) handleClientConnection(conn net.Conn) { + l7mut.Lock() + defer l7mut.Unlock() + defer conn.Close() + reader := bufio.NewReader(conn) + for { + buffer, err := reader.ReadBytes('\n') + if err != nil { + klog.ErrorS(err, "Error reading data", "buffer", buffer) + break + } + exp.processLog(buffer) + } +} + +func (exp *FlowExporter) processLog(data []byte) { + // Check if the event type is "http" + if eventType := extractEventType(data); eventType != "http" { + return + } + var event JsonToEvent + err := json.Unmarshal(data, &event) + if err != nil { + klog.ErrorS(err, "Error parsing JSON:", "data", data) + return + } + exp.WriteConnection(event) +} + +func extractEventType(data []byte) string { + // Define a temporary struct for extracting the event type + type Temp struct { + EventType string `json:"event_type"` + } + + // Parse the JSON string and extract the event type + var temp Temp + err := json.Unmarshal(data, &temp) + if err != nil { + klog.ErrorS(err, "Error parsing JSON for eventtype:", "data", data) + return "" + } + + return temp.EventType +} + +func (exp *FlowExporter) WriteConnection(event JsonToEvent) { + protocol, err := flowexporter.LookupProtocolMap(event.Proto) + if err != nil { + klog.ErrorS(err, "InValid Protocol type") + return + } + // Get 5-tuple information + tuple := flowexporter.Tuple{ + SourceAddress: event.SrcIP, + DestinationAddress: event.DestIP, + Protocol: protocol, + SourcePort: uint16(event.SrcPort), + DestinationPort: uint16(event.DestPort), + } + conn := flowexporter.Connection{} + conn.FlowKey = tuple + connKey := flowexporter.NewConnectionKey(&conn) + var tempArr []string + tempArr = append(tempArr, + event.HTTP.Hostname, + event.HTTP.URL, + event.HTTP.UserAgent, + event.HTTP.ContentType, + event.HTTP.Method, + event.HTTP.Protocol, + strconv.Itoa(event.HTTP.Status), + strconv.Itoa(event.HTTP.ContentLength)) + if _, found := exp.l7events[connKey]; found { + exp.l7events[connKey].http = append(exp.l7events[connKey].http, tempArr) + exp.l7events[connKey].httpQueryFlag = false + } else { + var temp httpEvent + temp.http = append(temp.http, tempArr) + temp.httpQueryFlag = false + exp.l7events[connKey] = &temp + } +} + +func (exp *FlowExporter) getL7EventData(connkey flowexporter.ConnectionKey, field string) string { + _, exists := exp.l7events[connkey] + if exists { + switch field { + case "isL7": + return "true" + case "httpVals": + var temp []string + // Using '<' '>' as delimiters + for _, httpVals := range exp.l7events[connkey].http { + temp = append(temp, strings.Join(httpVals, "<>")) + } + allHttpVals := strings.Join(temp, "><") + exp.l7events[connkey].httpQueryFlag = true + exp.verifyAndDeleteEvent(connkey) + return allHttpVals + } + } + return "" +} + +func (exp *FlowExporter) verifyAndDeleteEvent(connkey flowexporter.ConnectionKey) { + if exp.l7events[connkey].httpQueryFlag { + delete(exp.l7events, connkey) + } +} diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 2a1b5951c87..7a65843015f 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -16,6 +16,7 @@ package exporter import ( "context" + "encoding/json" "fmt" "net" "strings" @@ -849,3 +850,264 @@ func TestFlowExporter_fillEgressInfo(t *testing.T) { }) } } + +func TestFlowExporter_l7Listener(t *testing.T) { + flowExp := &FlowExporter{ + l7events: make(map[flowexporter.ConnectionKey]*httpEvent), + } + go flowExp.l7Listener() + <-time.After(100 * time.Millisecond) + + conn, err := net.Dial("unix", socketPath) + if err != nil { + t.Fatalf("Failed to connect to server: %s", err) + } + defer conn.Close() + + testCases := []struct { + name string + input JsonToEvent + eventPresent bool + expected httpEvent + expectedErr error + }{ + { + name: "InValid eventType", + input: JsonToEvent{ + Timestamp: "2023-06-16T20:31:48.910477+0000", + FlowID: 1, + InInterface: "mock_interface", + EventType: "mock_event", + VLAN: []int{1}, + SrcIP: net.ParseIP("10.10.0.1"), + SrcPort: 59921, + DestIP: net.ParseIP("10.10.0.2"), + DestPort: 80, + Proto: "TCP", + TxID: 0, + HTTP: Http{ + Hostname: "10.10.0.1", + URL: "/public/", + UserAgent: "curl/7.74.0", + ContentType: "text/html", + Method: "GET", + Protocol: "HTTP/1.1", + Status: 200, + ContentLength: 153, + }, + }, + eventPresent: false, + expected: httpEvent{}, + }, { + name: "Valid case", + input: JsonToEvent{ + Timestamp: "2023-06-16T20:31:48.910477+0000", + FlowID: 1, + InInterface: "mock_interface", + EventType: "http", + VLAN: []int{1}, + SrcIP: net.ParseIP("10.10.0.1"), + SrcPort: 59920, + DestIP: net.ParseIP("10.10.0.2"), + DestPort: 80, + Proto: "TCP", + TxID: 0, + HTTP: Http{ + Hostname: "10.10.0.1", + URL: "/public/", + UserAgent: "curl/7.74.0", + ContentType: "text/html", + Method: "GET", + Protocol: "HTTP/1.1", + Status: 200, + ContentLength: 153, + }, + }, + eventPresent: true, + expected: httpEvent{ + http: [][]string{{"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}}, + httpQueryFlag: false, + }, + }, { + name: "Valid case for persistent http", + input: JsonToEvent{ + Timestamp: "2023-06-17T20:31:48.910477+0000", + FlowID: 1, + InInterface: "mock_interface", + EventType: "http", + VLAN: []int{1}, + SrcIP: net.ParseIP("10.10.0.1"), + SrcPort: 59920, + DestIP: net.ParseIP("10.10.0.2"), + DestPort: 80, + Proto: "TCP", + TxID: 1, + HTTP: Http{ + Hostname: "10.10.0.1", + URL: "/public/", + UserAgent: "curl/7.74.0", + ContentType: "text/html", + Method: "GET", + Protocol: "HTTP/1.1", + Status: 200, + ContentLength: 153, + }, + }, + eventPresent: true, + expected: httpEvent{ + http: [][]string{{"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}, {"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}}, + httpQueryFlag: false, + }, + }, + } + for _, tc := range testCases { + jsonData, err := json.Marshal(tc.input) + if err != nil { + t.Errorf("Error Marshaling data: %v\n", err) + } + jsonData = append(jsonData, '\n') + conn.Write(jsonData) + if err != nil { + t.Errorf("Error writing event data: %v\n", err) + } + <-time.After(1000 * time.Millisecond) + protocol, _ := flowexporter.LookupProtocolMap(tc.input.Proto) + // Get 5-tuple information + tuple := flowexporter.Tuple{ + SourceAddress: tc.input.SrcIP, + DestinationAddress: tc.input.DestIP, + Protocol: protocol, + SourcePort: uint16(tc.input.SrcPort), + DestinationPort: uint16(tc.input.DestPort), + } + // Generate deny connection and add to deny connection store + conn := flowexporter.Connection{} + conn.FlowKey = tuple + connKey := flowexporter.NewConnectionKey(&conn) + // Check if event is present in event map + fmt.Printf("TUSHAR TEST RESULT %v\n",flowExp.l7events[connKey]) + existingEvent, exists := flowExp.l7events[connKey] + assert.Equal(t, tc.eventPresent, exists) + if exists == true { + assert.Equal(t, tc.expected, *existingEvent) + } + fmt.Printf("TUSHAR TEST RESULT %v\n",flowExp.l7events[connKey]) + } +} + +func TestGetL7EventData(t *testing.T) { + flowExp := &FlowExporter{ + l7events: make(map[flowexporter.ConnectionKey]*httpEvent), + } + + // 5-tuple information + tuple := flowexporter.Tuple{ + SourceAddress: net.IP("10.10.0.1"), + DestinationAddress: net.IP("10.10.0.2"), + Protocol: 6, + SourcePort: uint16(5229), + DestinationPort: uint16(80), + } + conn := flowexporter.Connection{} + conn.FlowKey = tuple + connKey := flowexporter.NewConnectionKey(&conn) + testCases := []struct { + name string + field string + input httpEvent + expected string + }{ + { + name: "Valid case", + field: "httpVals", + input: httpEvent{ + http: [][]string{{"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}}, + httpQueryFlag: false, + }, + expected: "10.10.0.1<>/public/<>curl/7.74.0<>text/html<>GET<>HTTP/1.1<>200<>153", + }, { + name: "InValid eventType", + field: "httpVals", + input: httpEvent{ + http: [][]string{}, + httpQueryFlag: false, + }, + expected: "", + }, { + name: "Valid case for persistent http", + field: "httpVals", + input: httpEvent{ + http: [][]string{{"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}, {"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}}, + httpQueryFlag: false, + }, + expected: "10.10.0.1<>/public/<>curl/7.74.0<>text/html<>GET<>HTTP/1.1<>200<>153><10.10.0.1<>/public/<>curl/7.74.0<>text/html<>GET<>HTTP/1.1<>200<>153", + }, { + name: "Valid case for isL7 field", + field: "isL7", + input: httpEvent{ + http: [][]string{{"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}, {"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}}, + httpQueryFlag: false, + }, + expected: "true", + }, { + name: "InValid case for isL7 field", + field: "isL7", + input: httpEvent{ + http: [][]string{}, + httpQueryFlag: false, + }, + expected: "", + }, + } + for _, tc := range testCases { + if tc.name != "InValid eventType" && tc.name != "InValid case for isL7 field" { + flowExp.l7events[connKey] = &tc.input + result := flowExp.getL7EventData(connKey, tc.field) + assert.Equal(t, tc.expected, result) + } + } +} + +func TestVerifyAndDeleteEvent(t *testing.T) { + flowExp := &FlowExporter{ + l7events: make(map[flowexporter.ConnectionKey]*httpEvent), + } + + // 5-tuple information + tuple := flowexporter.Tuple{ + SourceAddress: net.IP("10.10.0.1"), + DestinationAddress: net.IP("10.10.0.2"), + Protocol: 6, + SourcePort: uint16(5229), + DestinationPort: uint16(80), + } + conn := flowexporter.Connection{} + conn.FlowKey = tuple + connKey := flowexporter.NewConnectionKey(&conn) + testCases := []struct { + name string + input httpEvent + expected int + }{ + { + name: "Deletion case", + input: httpEvent{ + http: [][]string{{"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}}, + httpQueryFlag: true, + }, + expected: 0, + }, { + name: "non Deletion Case", + input: httpEvent{ + http: [][]string{{"10.10.0.1", "/public/", "curl/7.74.0", "text/html", "GET", "HTTP/1.1", "200", "153"}}, + httpQueryFlag: false, + }, + expected: 1, + }, + } + for _, tc := range testCases { + flowExp.l7events[connKey] = &tc.input + flowExp.verifyAndDeleteEvent(connKey) + assert.Equal(t, tc.expected, len(flowExp.l7events)) + } +} diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go index cc0867a1631..829e03c368e 100644 --- a/pkg/agent/flowexporter/types.go +++ b/pkg/agent/flowexporter/types.go @@ -81,6 +81,8 @@ type Connection struct { FlowType uint8 EgressName string EgressIP string + IsL7 string + HttpVals string } type ItemToExpire struct { diff --git a/pkg/agent/flowexporter/utils.go b/pkg/agent/flowexporter/utils.go index 7b007ed1863..5ce3ac49180 100644 --- a/pkg/agent/flowexporter/utils.go +++ b/pkg/agent/flowexporter/utils.go @@ -15,7 +15,9 @@ package flowexporter import ( + "fmt" "strconv" + "strings" "github.com/vmware/go-ipfix/pkg/registry" @@ -26,6 +28,16 @@ const ( connectionDyingFlag = uint32(1 << 9) ) +var ( + Protocols = map[string]uint8{ + "icmp": 1, + "igmp": 2, + "tcp": 6, + "udp": 17, + "ipv6-icmp": 58, + } +) + // NewConnectionKey creates 5-tuple of flow as connection key func NewConnectionKey(conn *Connection) ConnectionKey { return ConnectionKey{conn.FlowKey.SourceAddress.String(), @@ -91,3 +103,14 @@ func PolicyTypeToUint8(policyType v1beta2.NetworkPolicyType) uint8 { return registry.PolicyTypeK8sNetworkPolicy } } + +// LookupProtocolMap returns protocol identifier given protocol name +func LookupProtocolMap(name string) (uint8, error) { + name = strings.TrimSpace(name) + lowerCaseStr := strings.ToLower(name) + proto, found := Protocols[lowerCaseStr] + if !found { + return 0, fmt.Errorf("unknown IP protocol specified: %s", name) + } + return proto, nil +} diff --git a/pkg/agent/flowexporter/utils_test.go b/pkg/agent/flowexporter/utils_test.go index b5b788337f0..64afcb12fdd 100644 --- a/pkg/agent/flowexporter/utils_test.go +++ b/pkg/agent/flowexporter/utils_test.go @@ -96,3 +96,25 @@ func TestPolicyTypeToUint8(t *testing.T) { assert.Equal(t, tc.expectedResult, result) } } + +func TestLookupProtocolMap(t *testing.T) { + for _, tc := range []struct { + protocol string + expectedResult uint8 + }{ + {"icmp", 1}, + {"igmp", 2}, + {"tcp", 6}, + {"udp", 17}, + {"ipv6-icmp", 58}, + {"IPV6-ICMP", 58}, + {"mockProtocol", 0}, + } { + proto, err := LookupProtocolMap(tc.protocol) + if err == nil { + assert.Equal(t, tc.expectedResult, proto) + } else { + assert.Contains(t, err.Error(), "unknown IP protocol specified") + } + } +} diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 1426bd0a4c3..3500b5472c2 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -138,6 +138,8 @@ const ( // Enable the AdminNetworkPolicy APIs // https://github.com/kubernetes-sigs/network-policy-api AdminNetworkPolicy featuregate.Feature = "AdminNetworkPolicy" + // Enable L7 visibility on Pods and Namespaces + L7Visibility featuregate.Feature = "L7Visibility" ) var ( @@ -175,6 +177,7 @@ var ( L7NetworkPolicy: {Default: false, PreRelease: featuregate.Alpha}, LoadBalancerModeDSR: {Default: false, PreRelease: featuregate.Alpha}, AdminNetworkPolicy: {Default: false, PreRelease: featuregate.Alpha}, + L7Visibility: {Default: false, PreRelease: featuregate.Alpha}, } // UnsupportedFeaturesOnWindows records the features not supported on @@ -200,6 +203,7 @@ var ( L7NetworkPolicy: {}, LoadBalancerModeDSR: {}, CleanupStaleUDPSvcConntrack: {}, + L7Visibility: {}, } // supportedFeaturesOnExternalNode records the features supported on an external // Node. Antrea Agent checks the enabled features if it is running on an diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index 0cfea52cbf5..a4403582d3c 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -87,9 +87,12 @@ const ( reverseThroughputFromDestinationNode, clusterUUID, egressName, - egressIP) + egressIP, + isL7, + httpVals) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?)` ) // PrepareClickHouseConnection is used for unit testing @@ -329,6 +332,8 @@ func (ch *ClickHouseExportProcess) batchCommitAll(ctx context.Context) (int, err ch.clusterUUID, record.EgressName, record.EgressIP, + record.IsL7, + record.HttpVals, ) if err != nil { diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index 9253874383b..c15674edaf2 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -135,7 +135,9 @@ func TestBatchCommitAll(t *testing.T) { 12381346, fakeClusterUUID, "test-egress", - "172.18.0.1"). + "172.18.0.1", + "true", + "mockHttpString"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() diff --git a/pkg/flowaggregator/flowlogger/logger.go b/pkg/flowaggregator/flowlogger/logger.go index 798589e7d65..ee2c0e22713 100644 --- a/pkg/flowaggregator/flowlogger/logger.go +++ b/pkg/flowaggregator/flowlogger/logger.go @@ -116,6 +116,8 @@ func (fl *FlowLogger) WriteRecord(r *flowrecord.FlowRecord, prettyPrint bool) er egressNetworkPolicyType, r.EgressName, r.EgressIP, + r.IsL7, + r.HttpVals, } str := strings.Join(fields, ",") diff --git a/pkg/flowaggregator/flowlogger/logger_test.go b/pkg/flowaggregator/flowlogger/logger_test.go index de682828c8b..5ca3c313ee3 100644 --- a/pkg/flowaggregator/flowlogger/logger_test.go +++ b/pkg/flowaggregator/flowlogger/logger_test.go @@ -70,11 +70,11 @@ func TestWriteRecord(t *testing.T) { }{ { prettyPrint: true, - expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,TCP,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,Drop,K8sNetworkPolicy,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,Invalid,Invalid,test-egress,172.18.0.1", + expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,TCP,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,Drop,K8sNetworkPolicy,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,Invalid,Invalid,test-egress,172.18.0.1,true,mockHttpString", }, { prettyPrint: false, - expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,6,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,test-egress,172.18.0.1", + expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,6,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,test-egress,172.18.0.1,true,mockHttpString", }, } diff --git a/pkg/flowaggregator/flowrecord/record.go b/pkg/flowaggregator/flowrecord/record.go index ed14f09f283..c40dd72068b 100644 --- a/pkg/flowaggregator/flowrecord/record.go +++ b/pkg/flowaggregator/flowrecord/record.go @@ -70,6 +70,8 @@ type FlowRecord struct { ReverseThroughputFromDestinationNode uint64 EgressName string EgressIP string + IsL7 string + HttpVals string } // GetFlowRecord converts ipfixentities.Record to FlowRecord @@ -228,6 +230,12 @@ func GetFlowRecord(record ipfixentities.Record) *FlowRecord { if egressIP, _, ok := record.GetInfoElementWithValue("egressIP"); ok { r.EgressIP = egressIP.GetStringValue() } + if isL7, _, ok := record.GetInfoElementWithValue("isL7"); ok { + r.IsL7 = isL7.GetStringValue() + } + if httpVals, _, ok := record.GetInfoElementWithValue("httpVals"); ok { + r.HttpVals = httpVals.GetStringValue() + } return r } diff --git a/pkg/flowaggregator/flowrecord/record_test.go b/pkg/flowaggregator/flowrecord/record_test.go index 6c14bf529d3..c6bb8ec8f40 100644 --- a/pkg/flowaggregator/flowrecord/record_test.go +++ b/pkg/flowaggregator/flowrecord/record_test.go @@ -88,6 +88,10 @@ func TestGetFlowRecord(t *testing.T) { assert.Equal(t, uint64(15902813474), flowRecord.ThroughputFromDestinationNode) assert.Equal(t, uint64(12381345), flowRecord.ReverseThroughputFromSourceNode) assert.Equal(t, uint64(12381346), flowRecord.ReverseThroughputFromDestinationNode) + assert.Equal(t, "test-egress", flowRecord.EgressName) + assert.Equal(t, "172.18.0.1", flowRecord.EgressIP) + assert.Equal(t, "true", flowRecord.IsL7) + assert.Equal(t, "mockHttpString", flowRecord.HttpVals) if tc.isIPv4 { assert.Equal(t, "10.10.0.79", flowRecord.SourceIP) diff --git a/pkg/flowaggregator/flowrecord/testing/util.go b/pkg/flowaggregator/flowrecord/testing/util.go index 21eacd60e2b..3e19473c8f2 100644 --- a/pkg/flowaggregator/flowrecord/testing/util.go +++ b/pkg/flowaggregator/flowrecord/testing/util.go @@ -72,5 +72,7 @@ func PrepareTestFlowRecord() *flowrecord.FlowRecord { ReverseThroughputFromDestinationNode: 12381346, EgressName: "test-egress", EgressIP: "172.18.0.1", + IsL7: "true", + HttpVals: "mockHttpString", } } diff --git a/pkg/flowaggregator/infoelements/elements.go b/pkg/flowaggregator/infoelements/elements.go index f93f422398b..f69abba1818 100644 --- a/pkg/flowaggregator/infoelements/elements.go +++ b/pkg/flowaggregator/infoelements/elements.go @@ -59,6 +59,8 @@ var ( "flowType", "egressName", "egressIP", + "isL7", + "httpVals", } AntreaInfoElementsIPv4 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv4"}...) AntreaInfoElementsIPv6 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv6"}...) diff --git a/pkg/flowaggregator/s3uploader/s3uploader.go b/pkg/flowaggregator/s3uploader/s3uploader.go index 68431b10153..ce92092b2ae 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader.go +++ b/pkg/flowaggregator/s3uploader/s3uploader.go @@ -484,4 +484,8 @@ func writeRecord(w io.Writer, r *flowrecord.FlowRecord, clusterUUID string) { io.WriteString(w, r.EgressName) io.WriteString(w, ",") io.WriteString(w, r.EgressIP) + io.WriteString(w, ",") + io.WriteString(w, r.IsL7) + io.WriteString(w, ",") + io.WriteString(w, r.HttpVals) } diff --git a/pkg/flowaggregator/s3uploader/s3uploader_test.go b/pkg/flowaggregator/s3uploader/s3uploader_test.go index 1fb91cc4cea..77b82f69e03 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader_test.go +++ b/pkg/flowaggregator/s3uploader/s3uploader_test.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "math/rand" + "strings" "testing" "time" @@ -37,8 +38,8 @@ import ( var ( fakeClusterUUID = uuid.New().String() - recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID - recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,true,mockHttpString" + recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,true,mockHttpString" ) const seed = 1 @@ -80,7 +81,9 @@ func TestCacheRecord(t *testing.T) { flowaggregatortesting.PrepareMockIpfixRecord(mockRecord, true) s3UploadProc.CacheRecord(mockRecord) assert.Equal(t, 1, s3UploadProc.cachedRecordCount) - assert.Contains(t, s3UploadProc.currentBuffer.String(), recordStrIPv4) + currentBuffer := strings.TrimRight(s3UploadProc.currentBuffer.String(), "\n") + assert.Equal(t, strings.Split(currentBuffer, ",")[:50], strings.Split(recordStrIPv4, ",")[:50]) + assert.Equal(t, strings.Split(currentBuffer, ",")[51:], strings.Split(recordStrIPv4, ",")[51:]) // Second call, reach currentBuffer max size, add the currentBuffer to bufferQueue. mockRecord = ipfixentitiestesting.NewMockRecord(ctrl) @@ -88,7 +91,9 @@ func TestCacheRecord(t *testing.T) { s3UploadProc.CacheRecord(mockRecord) assert.Equal(t, 1, len(s3UploadProc.bufferQueue)) buf := s3UploadProc.bufferQueue[0] - assert.Contains(t, buf.String(), recordStrIPv6) + currentBuf := strings.TrimRight(strings.Split(buf.String(), "\n")[1], "\n") + assert.Equal(t, strings.Split(currentBuf, ",")[:50], strings.Split(recordStrIPv6, ",")[:50]) + assert.Equal(t, strings.Split(currentBuf, ",")[51:], strings.Split(recordStrIPv6, ",")[51:]) assert.Equal(t, 0, s3UploadProc.cachedRecordCount) assert.Equal(t, "", s3UploadProc.currentBuffer.String()) } diff --git a/pkg/flowaggregator/testing/util.go b/pkg/flowaggregator/testing/util.go index 2aa9dd714f3..8d27f727cc6 100644 --- a/pkg/flowaggregator/testing/util.go +++ b/pkg/flowaggregator/testing/util.go @@ -215,6 +215,14 @@ func PrepareMockIpfixRecord(mockRecord *ipfixentitiestesting.MockRecord, isIPv4 egressIPElem.SetStringValue("172.18.0.1") mockRecord.EXPECT().GetInfoElementWithValue("egressIP").Return(egressIPElem, 0, true) + isL7Elem := createElement("isL7", ipfixregistry.AntreaEnterpriseID) + isL7Elem.SetStringValue("true") + mockRecord.EXPECT().GetInfoElementWithValue("isL7").Return(isL7Elem, 0, true) + + httpValsElem := createElement("httpVals", ipfixregistry.AntreaEnterpriseID) + httpValsElem.SetStringValue("mockHttpString") + mockRecord.EXPECT().GetInfoElementWithValue("httpVals").Return(httpValsElem, 0, true) + if isIPv4 { sourceIPv4Elem := createElement("sourceIPv4Address", ipfixregistry.IANAEnterpriseID) sourceIPv4Elem.SetIPAddressValue(net.ParseIP("10.10.0.79")) diff --git a/test/e2e/charts/flow-visibility/templates/configmap.yaml b/test/e2e/charts/flow-visibility/templates/configmap.yaml index 611a2488a2a..4573b734b24 100644 --- a/test/e2e/charts/flow-visibility/templates/configmap.yaml +++ b/test/e2e/charts/flow-visibility/templates/configmap.yaml @@ -76,7 +76,9 @@ data: clusterUUID String, trusted UInt8 DEFAULT 0, egressName String, - egressIP String + egressIP String, + isL7 String, + httpVals String ) engine=MergeTree ORDER BY (timeInserted, flowEndSeconds) TTL timeInserted + INTERVAL 1 HOUR diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index edbe2e249eb..7b751f77928 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -86,6 +86,8 @@ DATA SET: flowType: 1 egressName: test-egressbkclk egressIP: 172.18.0.2 + isL7: true + httpVals: mockHttpString destinationClusterIPv4: 0.0.0.0 octetDeltaCountFromSourceNode: 8982624938 octetDeltaCountFromDestinationNode: 8982624938 @@ -1634,4 +1636,6 @@ type ClickHouseFullRow struct { Trusted uint8 `json:"trusted"` EgressName string `json:"egressName"` EgressIP string `json:"egressIP"` + IsL7 string `json:"isL7"` + HttpVals string `json:"httpVals"` }