diff --git a/build/charts/antrea/README.md b/build/charts/antrea/README.md index cc2f921bc1a..36c0594d717 100644 --- a/build/charts/antrea/README.md +++ b/build/charts/antrea/README.md @@ -44,6 +44,7 @@ Kubernetes: `>= 1.16.0-0` | antreaProxy.nodePortAddresses | list | `[]` | String array of values which specifies the host IPv4/IPv6 addresses for NodePort. By default, all host addresses are used. | | antreaProxy.proxyAll | bool | `false` | Proxy all Service traffic, for all Service types, regardless of where it comes from. | | antreaProxy.proxyLoadBalancerIPs | bool | `true` | When set to false, AntreaProxy no longer load-balances traffic destined to the External IPs of LoadBalancer Services. | +| antreaProxy.serviceProxyName | string | `""` | The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, but ignore Services with the label no matter what is the value. | | antreaProxy.skipServices | list | `[]` | List of Services which should be ignored by AntreaProxy. | | clientCAFile | string | `""` | File path of the certificate bundle for all the signers that is recognized for incoming client certificates. | | cni.hostBinPath | string | `"/opt/cni/bin"` | Installation path of CNI binaries on the host. | diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index e94e00e6125..bc435e0ac63 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -342,6 +342,11 @@ antreaProxy: # Note that setting ProxyLoadBalancerIPs to false usually only makes sense when ProxyAll is set to true and # kube-proxy is removed from the cluser, otherwise kube-proxy will still load-balance this traffic. proxyLoadBalancerIPs: {{ .proxyLoadBalancerIPs }} + # The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, + # then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, + # then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, + # but ignore Services with the label no matter what is the value. + serviceProxyName: {{ .serviceProxyName | quote }} {{- end }} # IPsec tunnel related configurations. diff --git a/build/charts/antrea/values.yaml b/build/charts/antrea/values.yaml index 76d01ec2568..42dff202fa5 100644 --- a/build/charts/antrea/values.yaml +++ b/build/charts/antrea/values.yaml @@ -135,6 +135,12 @@ antreaProxy: # -- When set to false, AntreaProxy no longer load-balances traffic destined # to the External IPs of LoadBalancer Services. proxyLoadBalancerIPs: true + # -- The value of the "service.kubernetes.io/service-proxy-name" label for + # AntreaProxy to match. If it is set, then AntreaProxy will only handle Services + # with the label that equals the provided value. If it is not set, then AntreaProxy + # will only handle Services without the "service.kubernetes.io/service-proxy-name" + # label, but ignore Services with the label no matter what is the value. + serviceProxyName: "" nodeIPAM: # -- Enable Node IPAM in Antrea diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index ec792ec21e6..fc476934229 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3291,6 +3291,11 @@ data: # Note that setting ProxyLoadBalancerIPs to false usually only makes sense when ProxyAll is set to true and # kube-proxy is removed from the cluser, otherwise kube-proxy will still load-balance this traffic. proxyLoadBalancerIPs: true + # The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, + # then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, + # then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, + # but ignore Services with the label no matter what is the value. + serviceProxyName: "" # IPsec tunnel related configurations. ipsec: @@ -4380,7 +4385,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 994e75167f0126a535cea63fc65a5ab86361648a20bcacb06d3c588f06f6e5f6 + checksum/config: 720e2b412e83992caf5874a01e67507617e079b896796e588c92fd75c9e06ad6 labels: app: antrea component: antrea-agent @@ -4621,7 +4626,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 994e75167f0126a535cea63fc65a5ab86361648a20bcacb06d3c588f06f6e5f6 + checksum/config: 720e2b412e83992caf5874a01e67507617e079b896796e588c92fd75c9e06ad6 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index c8175867cfd..23c07f67539 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3291,6 +3291,11 @@ data: # Note that setting ProxyLoadBalancerIPs to false usually only makes sense when ProxyAll is set to true and # kube-proxy is removed from the cluser, otherwise kube-proxy will still load-balance this traffic. proxyLoadBalancerIPs: true + # The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, + # then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, + # then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, + # but ignore Services with the label no matter what is the value. + serviceProxyName: "" # IPsec tunnel related configurations. ipsec: @@ -4380,7 +4385,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 994e75167f0126a535cea63fc65a5ab86361648a20bcacb06d3c588f06f6e5f6 + checksum/config: 720e2b412e83992caf5874a01e67507617e079b896796e588c92fd75c9e06ad6 labels: app: antrea component: antrea-agent @@ -4622,7 +4627,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 994e75167f0126a535cea63fc65a5ab86361648a20bcacb06d3c588f06f6e5f6 + checksum/config: 720e2b412e83992caf5874a01e67507617e079b896796e588c92fd75c9e06ad6 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 7812ff71dc6..ae5edb92de5 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3291,6 +3291,11 @@ data: # Note that setting ProxyLoadBalancerIPs to false usually only makes sense when ProxyAll is set to true and # kube-proxy is removed from the cluser, otherwise kube-proxy will still load-balance this traffic. proxyLoadBalancerIPs: true + # The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, + # then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, + # then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, + # but ignore Services with the label no matter what is the value. + serviceProxyName: "" # IPsec tunnel related configurations. ipsec: @@ -4380,7 +4385,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 04761c3e699fa0f59516b557f366049b0f1acf2f390d94e6753ee017cdffcfd9 + checksum/config: 46b91206a96d91e7e4861f20fa0e255ed660cf96e796116e562061efc09fdfcc labels: app: antrea component: antrea-agent @@ -4619,7 +4624,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 04761c3e699fa0f59516b557f366049b0f1acf2f390d94e6753ee017cdffcfd9 + checksum/config: 46b91206a96d91e7e4861f20fa0e255ed660cf96e796116e562061efc09fdfcc labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index f8f6d4f8538..d7c498ab220 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3304,6 +3304,11 @@ data: # Note that setting ProxyLoadBalancerIPs to false usually only makes sense when ProxyAll is set to true and # kube-proxy is removed from the cluser, otherwise kube-proxy will still load-balance this traffic. proxyLoadBalancerIPs: true + # The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, + # then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, + # then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, + # but ignore Services with the label no matter what is the value. + serviceProxyName: "" # IPsec tunnel related configurations. ipsec: @@ -4393,7 +4398,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 6b902d5d6e9a2c0e2fde41aedf349eeee38d4530330381b01849815211ad1dd8 + checksum/config: f7e797321f4228539c43945a503637bcabf4d4eee4f3d5393ba9e69a778a0916 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -4678,7 +4683,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 6b902d5d6e9a2c0e2fde41aedf349eeee38d4530330381b01849815211ad1dd8 + checksum/config: f7e797321f4228539c43945a503637bcabf4d4eee4f3d5393ba9e69a778a0916 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-windows.yml b/build/yamls/antrea-windows.yml index f31cae5c647..db4324aeec2 100644 --- a/build/yamls/antrea-windows.yml +++ b/build/yamls/antrea-windows.yml @@ -143,6 +143,11 @@ data: # Note that this option is experimental. If kube-proxy is removed, option kubeAPIServerOverride must be used to access # apiserver directly. #proxyAll: false + # The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, + # then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, + # then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, + # but ignore Services with the label no matter what is the value. + serviceProxyName: "" nodePortLocal: # Enable NodePortLocal, a feature used to make Pods reachable using port forwarding on the host. To @@ -172,7 +177,7 @@ kind: ConfigMap metadata: labels: app: antrea - name: antrea-windows-config-hth2gk6b96 + name: antrea-windows-config-cmccc6hbb4 namespace: kube-system --- apiVersion: apps/v1 @@ -260,7 +265,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-windows-config-hth2gk6b96 + name: antrea-windows-config-cmccc6hbb4 name: antrea-windows-config - configMap: defaultMode: 420 diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 8f9e23e8511..3ee30a36c2b 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3291,6 +3291,11 @@ data: # Note that setting ProxyLoadBalancerIPs to false usually only makes sense when ProxyAll is set to true and # kube-proxy is removed from the cluser, otherwise kube-proxy will still load-balance this traffic. proxyLoadBalancerIPs: true + # The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, + # then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, + # then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, + # but ignore Services with the label no matter what is the value. + serviceProxyName: "" # IPsec tunnel related configurations. ipsec: @@ -4380,7 +4385,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1bc87d7d5b568beb91ad2b29510cb5cff3613c68e51fb82abdf545046767f679 + checksum/config: 520e5bfad080176d9e77896e35756f8f947a1777817847cad27c764ecf6e3bec labels: app: antrea component: antrea-agent @@ -4619,7 +4624,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1bc87d7d5b568beb91ad2b29510cb5cff3613c68e51fb82abdf545046767f679 + checksum/config: 520e5bfad080176d9e77896e35756f8f947a1777817847cad27c764ecf6e3bec labels: app: antrea component: antrea-controller diff --git a/build/yamls/windows/base/conf/antrea-agent.conf b/build/yamls/windows/base/conf/antrea-agent.conf index 1f31288ff26..0a5065d6e76 100644 --- a/build/yamls/windows/base/conf/antrea-agent.conf +++ b/build/yamls/windows/base/conf/antrea-agent.conf @@ -125,6 +125,11 @@ antreaProxy: # Note that this option is experimental. If kube-proxy is removed, option kubeAPIServerOverride must be used to access # apiserver directly. #proxyAll: false + # The value of the "service.kubernetes.io/service-proxy-name" label for AntreaProxy to match. If it is set, + # then AntreaProxy will only handle Services with the label that equals the provided value. If it is not set, + # then AntreaProxy will only handle Services without the "service.kubernetes.io/service-proxy-name" label, + # but ignore Services with the label no matter what is the value. + serviceProxyName: "" nodePortLocal: # Enable NodePortLocal, a feature used to make Pods reachable using port forwarding on the host. To diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 94deecedadf..57b7393e72b 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -374,42 +374,39 @@ func run(o *Options) error { ) } + v4Enabled := networkConfig.IPv4Enabled + v6Enabled := networkConfig.IPv6Enabled + var groupCounters []proxytypes.GroupCounter groupIDUpdates := make(chan string, 100) v4GroupIDAllocator := openflow.NewGroupAllocator(false) v4GroupCounter := proxytypes.NewGroupCounter(v4GroupIDAllocator, groupIDUpdates) + if v4Enabled { + groupCounters = append(groupCounters, v4GroupCounter) + } v6GroupIDAllocator := openflow.NewGroupAllocator(true) v6GroupCounter := proxytypes.NewGroupCounter(v6GroupIDAllocator, groupIDUpdates) + if v6Enabled { + groupCounters = append(groupCounters, v6GroupCounter) + } - v4Enabled := networkConfig.IPv4Enabled - v6Enabled := networkConfig.IPv6Enabled var proxier proxy.Proxier if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { - proxyAll := o.config.AntreaProxy.ProxyAll - skipServices := o.config.AntreaProxy.SkipServices - proxyLoadBalancerIPs := *o.config.AntreaProxy.ProxyLoadBalancerIPs - - switch { - case v4Enabled && v6Enabled: - proxier, err = proxy.NewDualStackProxier(nodeConfig.Name, k8sClient, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter, enableMulticlusterGW) - if err != nil { - return fmt.Errorf("error when creating dual-stack proxier: %v", err) - } - groupCounters = append(groupCounters, v4GroupCounter, v6GroupCounter) - case v4Enabled: - proxier, err = proxy.NewProxier(nodeConfig.Name, k8sClient, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, enableMulticlusterGW) - if err != nil { - return fmt.Errorf("error when creating v4 proxier: %v", err) - } - groupCounters = append(groupCounters, v4GroupCounter) - case v6Enabled: - proxier, err = proxy.NewProxier(nodeConfig.Name, k8sClient, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter, enableMulticlusterGW) - if err != nil { - return fmt.Errorf("error when creating v6 proxier: %v", err) - } - groupCounters = append(groupCounters, v6GroupCounter) - default: - return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") + proxier, err = proxy.NewProxier(nodeConfig.Name, + k8sClient, + ofClient, + routeClient, + v4Enabled, + v6Enabled, + nodePortAddressesIPv4, + nodePortAddressesIPv6, + o.config.AntreaProxy, + v4GroupCounter, + v6GroupCounter, + enableMulticlusterGW, + informerFactory) + if err != nil { + return fmt.Errorf("error when creating proxier: %v", err) } } diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 898f7d897c9..3c911bedc29 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -28,7 +28,9 @@ import ( "antrea.io/ofnet/ofctrl" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" @@ -43,6 +45,7 @@ import ( "antrea.io/antrea/pkg/agent/proxy/metrics" "antrea.io/antrea/pkg/agent/proxy/types" "antrea.io/antrea/pkg/agent/route" + antreaconfig "antrea.io/antrea/pkg/config/agent" "antrea.io/antrea/pkg/features" binding "antrea.io/antrea/pkg/ovs/openflow" k8sutil "antrea.io/antrea/pkg/util/k8s" @@ -57,6 +60,9 @@ const ( // SessionAffinity timeout is implemented using a hard_timeout in OVS. hard_timeout is // represented by a uint16 in the OpenFlow protocol. maxSupportedAffinityTimeout = math.MaxUint16 + // labelServiceProxyName is the well-known label for service proxy name defined in + // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/2447-Make-kube-proxy-service-abstraction-optional + labelServiceProxyName = "service.kubernetes.io/service-proxy-name" ) // Proxier wraps proxy.Provider and adds extra methods. It is introduced for @@ -1095,8 +1101,9 @@ func (p *proxier) HandlePacketIn(pktIn *ofctrl.PacketIn) error { nil) } -func NewProxier( +func newProxier( hostname string, + serviceProxyName string, k8sClient clientset.Interface, informerFactory informers.SharedInformerFactory, ofClient openflow.Client, @@ -1141,10 +1148,24 @@ func NewProxier( serviceHealthServer = healthcheck.NewServiceHealthServer(hostname, nil, nodePortAddressesString) } + // TODO: The label selector nonHeadlessServiceSelector was added to pass the Kubernetes e2e test + // 'Services should implement service.kubernetes.io/headless'. You can find the test case at: + // https://github.com/kubernetes/kubernetes/blob/027ac5a426a261ba6b66a40e79e123e75e9baf5b/test/e2e/network/service.go#L2281 + // However, in AntreaProxy, headless Services are skipped by checking the ClusterIP. + nonHeadlessServiceSelector, _ := labels.NewRequirement(corev1.IsHeadlessService, selection.DoesNotExist, nil) + var serviceProxyNameSelector *labels.Requirement + if serviceProxyName == "" { + serviceProxyNameSelector, _ = labels.NewRequirement(labelServiceProxyName, selection.DoesNotExist, nil) + } else { + serviceProxyNameSelector, _ = labels.NewRequirement(labelServiceProxyName, selection.DoubleEquals, []string{serviceProxyName}) + } + serviceLabelSelector := labels.NewSelector() + serviceLabelSelector = serviceLabelSelector.Add(*serviceProxyNameSelector, *nonHeadlessServiceSelector) + p := &proxier{ serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), endpointsChanges: newEndpointsChangesTracker(hostname, endpointSliceEnabled, isIPv6), - serviceChanges: newServiceChangesTracker(recorder, ipFamily, skipServices), + serviceChanges: newServiceChangesTracker(recorder, ipFamily, serviceLabelSelector, skipServices), serviceMap: k8sproxy.ServiceMap{}, serviceInstalledMap: k8sproxy.ServiceMap{}, endpointsInstalledMap: types.EndpointsMap{}, @@ -1213,8 +1234,9 @@ func (p *metaProxierWrapper) GetServiceByIP(serviceStr string) (k8sproxy.Service return p.ipv4Proxier.GetServiceByIP(serviceStr) } -func NewDualStackProxier( +func newDualStackProxier( hostname string, + serviceProxyName string, k8sClient clientset.Interface, informerFactory informers.SharedInformerFactory, ofClient openflow.Client, @@ -1229,14 +1251,38 @@ func NewDualStackProxier( nestedServiceSupport bool) (*metaProxierWrapper, error) { // Create an IPv4 instance of the single-stack proxier. - ipv4Proxier, err := NewProxier(hostname, k8sClient, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAllEnabled, skipServices, proxyLoadBalancerIPs, v4groupCounter, nestedServiceSupport) + ipv4Proxier, err := newProxier(hostname, + serviceProxyName, + k8sClient, + informerFactory, + ofClient, + false, + routeClient, + nodePortAddressesIPv4, + proxyAllEnabled, + skipServices, + proxyLoadBalancerIPs, + v4groupCounter, + nestedServiceSupport) if err != nil { - return nil, fmt.Errorf("error when creating v4 proxier: %v", err) + return nil, fmt.Errorf("error when creating IPv4 proxier: %v", err) } // Create an IPv6 instance of the single-stack proxier. - ipv6Proxier, err := NewProxier(hostname, k8sClient, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAllEnabled, skipServices, proxyLoadBalancerIPs, v6groupCounter, nestedServiceSupport) + ipv6Proxier, err := newProxier(hostname, + serviceProxyName, + k8sClient, + informerFactory, + ofClient, + true, + routeClient, + nodePortAddressesIPv6, + proxyAllEnabled, + skipServices, + proxyLoadBalancerIPs, + v6groupCounter, + nestedServiceSupport) if err != nil { - return nil, fmt.Errorf("error when creating v6 proxier: %v", err) + return nil, fmt.Errorf("error when creating IPv6 proxier: %v", err) } // Create a meta-proxier that dispatch calls between the two // single-stack proxier instances. @@ -1244,3 +1290,83 @@ func NewDualStackProxier( return &metaProxierWrapper{ipv4Proxier, ipv6Proxier, metaProxier}, nil } + +func NewProxier(hostname string, + k8sClient clientset.Interface, + ofClient openflow.Client, + routeClient route.Interface, + v4Enabled bool, + v6Enabled bool, + nodePortAddressesIPv4 []net.IP, + nodePortAddressesIPv6 []net.IP, + proxyConfig antreaconfig.AntreaProxyConfig, + v4GroupCounter types.GroupCounter, + v6GroupCounter types.GroupCounter, + nestedServiceSupport bool, + informerFactory informers.SharedInformerFactory) (Proxier, error) { + proxyAllEnabled := proxyConfig.ProxyAll + skipServices := proxyConfig.SkipServices + proxyLoadBalancerIPs := *proxyConfig.ProxyLoadBalancerIPs + serviceProxyName := proxyConfig.ServiceProxyName + + var proxier Proxier + var err error + switch { + case v4Enabled && v6Enabled: + proxier, err = newDualStackProxier(hostname, + serviceProxyName, + k8sClient, + informerFactory, + ofClient, + routeClient, + nodePortAddressesIPv4, + nodePortAddressesIPv6, + proxyAllEnabled, + skipServices, + proxyLoadBalancerIPs, + v4GroupCounter, + v6GroupCounter, + nestedServiceSupport) + if err != nil { + return nil, fmt.Errorf("error when creating dual-stack proxier: %v", err) + } + case v4Enabled: + proxier, err = newProxier(hostname, + serviceProxyName, + k8sClient, + informerFactory, + ofClient, + false, + routeClient, + nodePortAddressesIPv4, + proxyAllEnabled, + skipServices, + proxyLoadBalancerIPs, + v4GroupCounter, + nestedServiceSupport) + if err != nil { + return nil, fmt.Errorf("error when creating IPv4 proxier: %v", err) + } + case v6Enabled: + proxier, err = newProxier(hostname, + serviceProxyName, + k8sClient, + informerFactory, + ofClient, + true, + routeClient, + nodePortAddressesIPv6, + proxyAllEnabled, + skipServices, + proxyLoadBalancerIPs, + v6GroupCounter, + nestedServiceSupport) + if err != nil { + return nil, fmt.Errorf("error when creating IPv6 proxier: %v", err) + } + default: + return nil, fmt.Errorf("either IPv4 or IPv6 proxier, or both proxiers should be created") + } + + return proxier, nil +} diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 55b583d4caf..c9acb328267 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -75,6 +75,8 @@ var ( skippedClusterIP = "192.168.1.2" ) +const testServiceProxyName = "antrea" + func makeSvcPortName(namespace, name, port string, protocol corev1.Protocol) k8sproxy.ServicePortName { return k8sproxy.ServicePortName{ NamespacedName: apimachinerytypes.NamespacedName{Namespace: namespace, Name: name}, @@ -145,7 +147,8 @@ func makeTestClusterIPService(svcPortName *k8sproxy.ServicePortName, protocol corev1.Protocol, affinitySeconds *int32, internalTrafficPolicy *corev1.ServiceInternalTrafficPolicyType, - nested bool) *corev1.Service { + nested bool, + labels map[string]string) *corev1.Service { return makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { svc.Spec.ClusterIP = clusterIP.String() svc.Spec.Ports = []corev1.ServicePort{{ @@ -172,6 +175,9 @@ func makeTestClusterIPService(svcPortName *k8sproxy.ServicePortName, if nested { svc.Annotations = map[string]string{mccommon.AntreaMCServiceAnnotation: "true"} } + if labels != nil { + svc.Labels = labels + } }) } @@ -320,6 +326,7 @@ type proxyOptions struct { proxyLoadBalancerIPs bool endpointSliceEnabled bool supportNestedService bool + serviceProxyNameSet bool } type proxyOptionsFn func(*proxyOptions) @@ -340,25 +347,35 @@ func withoutEndpointSlice(o *proxyOptions) { o.endpointSliceEnabled = false } +func withServiceProxyNameSet(o *proxyOptions) { + o.serviceProxyNameSet = true +} + func getMockClients(ctrl *gomock.Controller) (*ofmock.MockClient, *routemock.MockInterface) { mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) return mockOFClient, mockRouteClient } -func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodePortAddresses []net.IP, groupIDAllocator openflow.GroupAllocator, isIPv6 bool, options ...proxyOptionsFn) *proxier { +func newFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodePortAddresses []net.IP, groupIDAllocator openflow.GroupAllocator, isIPv6 bool, options ...proxyOptionsFn) *proxier { o := &proxyOptions{ proxyAllEnabled: false, proxyLoadBalancerIPs: true, endpointSliceEnabled: true, supportNestedService: false, + serviceProxyNameSet: false, } for _, fn := range options { fn(o) } + var serviceProxyName string + if o.serviceProxyNameSet { + serviceProxyName = testServiceProxyName + } fakeClient := fake.NewSimpleClientset() - p, _ := NewProxier(hostname, + p, _ := newProxier(hostname, + serviceProxyName, fakeClient, informers.NewSharedInformerFactory(fakeClient, 0), ofClient, @@ -392,7 +409,7 @@ func testClusterIPAdd(t *testing.T, options = append(options, withoutEndpointSlice) } options = append(options, withSupportNestedService) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, options...) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, options...) internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster if nodeLocalInternal { @@ -402,7 +419,7 @@ func testClusterIPAdd(t *testing.T, if externalIP != nil { externalIPs = append(externalIPs, externalIP) } - allSvcs := append(extraSvcs, makeTestClusterIPService(&svcPortName, svcIP, externalIPs, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicy, true)) + allSvcs := append(extraSvcs, makeTestClusterIPService(&svcPortName, svcIP, externalIPs, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicy, true, nil)) makeServiceMap(fp, allSvcs...) if !endpointSliceEnabled { @@ -489,7 +506,7 @@ func testLoadBalancerAdd(t *testing.T, if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) } - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeCluster if nodeLocalExternal { @@ -623,7 +640,7 @@ func testNodePortAdd(t *testing.T, if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) } - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeCluster if nodeLocalExternal { @@ -850,7 +867,7 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(false) nodePortAddresses := []net.IP{net.ParseIP("0.0.0.0")} - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, false, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, false, withProxyAll) port80Str := "port80" port80Int32 := int32(80) @@ -1073,8 +1090,8 @@ func TestClusterSkipServices(t *testing.T) { skippedServiceName := strings.Split(skippedServiceNN, "/")[1] svc1PortName := makeSvcPortName(skippedServiceNamespace, skippedServiceName, strconv.Itoa(svc1Port), corev1.ProtocolTCP) svc2PortName := makeSvcPortName("kube-system", "test", strconv.Itoa(svc2Port), corev1.ProtocolTCP) - svc1 := makeTestClusterIPService(&svc1PortName, svc1ClusterIP, nil, int32(svc1Port), corev1.ProtocolTCP, nil, nil, false) - svc2 := makeTestClusterIPService(&svc2PortName, svc2ClusterIP, nil, int32(svc2Port), corev1.ProtocolTCP, nil, nil, false) + svc1 := makeTestClusterIPService(&svc1PortName, svc1ClusterIP, nil, int32(svc1Port), corev1.ProtocolTCP, nil, nil, false, nil) + svc2 := makeTestClusterIPService(&svc2PortName, svc2ClusterIP, nil, int32(svc2Port), corev1.ProtocolTCP, nil, nil, false, nil) svcs := []*corev1.Service{svc1, svc2} epSubset := makeTestEndpointSubset(&svc1PortName, ep1IP, int32(svc1Port), corev1.ProtocolTCP, false) @@ -1091,8 +1108,8 @@ func TestDualStackService(t *testing.T) { mockOFClient, mockRouteClient := getMockClients(ctrl) ipv4GroupAllocator := openflow.NewGroupAllocator(false) ipv6GroupAllocator := openflow.NewGroupAllocator(true) - fpv4 := NewFakeProxier(mockRouteClient, mockOFClient, nil, ipv4GroupAllocator, false) - fpv6 := NewFakeProxier(mockRouteClient, mockOFClient, nil, ipv6GroupAllocator, true) + fpv4 := newFakeProxier(mockRouteClient, mockOFClient, nil, ipv4GroupAllocator, false) + fpv6 := newFakeProxier(mockRouteClient, mockOFClient, nil, ipv6GroupAllocator, true) metaProxier := k8sproxy.NewMetaProxier(fpv4, fpv6) svc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { @@ -1142,13 +1159,13 @@ func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, isIPv6 bo if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) } - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, options...) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, options...) internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster if nodeLocalInternal { internalTrafficPolicy = corev1.ServiceInternalTrafficPolicyLocal } - svc := makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicy, true) + svc := makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicy, true, nil) makeServiceMap(fp, svc) var ep *corev1.Endpoints @@ -1231,7 +1248,7 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externa if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) } - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) svc := makeTestNodePortService(&svcPortName, svcIP, @@ -1311,7 +1328,7 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, ext if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) } - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeLocal internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster @@ -1473,10 +1490,10 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) - svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) - updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort+1), corev1.ProtocolTCP, nil, nil, false) + svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort+1), corev1.ProtocolTCP, nil, nil, false, nil) makeServiceMap(fp, svc) makeEndpointSliceMap(fp) @@ -1505,7 +1522,7 @@ func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net. ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) svc := makeTestNodePortService(&svcPortName, svcIP, @@ -1565,7 +1582,7 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeLocal @@ -1637,13 +1654,13 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) svcPortNameTCP := makeSvcPortName("ns", "svc-tcp", strconv.Itoa(svcPort), corev1.ProtocolTCP) svcPortNameUDP := makeSvcPortName("ns", "svc-udp", strconv.Itoa(svcPort), corev1.ProtocolUDP) - svcTCP := makeTestClusterIPService(&svcPortNameTCP, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) - svcUDP := makeTestClusterIPService(&svcPortNameUDP, svcIP, nil, int32(svcPort), corev1.ProtocolUDP, nil, nil, false) + svcTCP := makeTestClusterIPService(&svcPortNameTCP, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + svcUDP := makeTestClusterIPService(&svcPortNameUDP, svcIP, nil, int32(svcPort), corev1.ProtocolUDP, nil, nil, false, nil) makeServiceMap(fp, svcTCP, svcUDP) epTCP, epPortTCP := makeTestEndpointSliceEndpointAndPort(&svcPortNameTCP, epIP, int32(svcPort), corev1.ProtocolTCP, false) @@ -1694,9 +1711,9 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) - svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) + svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) makeServiceMap(fp, svc) ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) @@ -1742,7 +1759,7 @@ func testSessionAffinity(t *testing.T, svcIP net.IP, epIP net.IP, affinitySecond ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) svc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { svc.Spec.Type = corev1.ServiceTypeNodePort @@ -1805,7 +1822,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) timeoutSeconds := corev1.DefaultClientIPServiceAffinitySeconds @@ -1855,13 +1872,13 @@ func testServiceClusterIPUpdate(t *testing.T, ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) var svc, updatedSvc *corev1.Service switch svcType { case corev1.ServiceTypeClusterIP: - svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) - updatedSvc = makeTestClusterIPService(&svcPortName, updatedSvcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) + svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + updatedSvc = makeTestClusterIPService(&svcPortName, updatedSvcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) case corev1.ServiceTypeNodePort: svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc = makeTestNodePortService(&svcPortName, updatedSvcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) @@ -1956,13 +1973,13 @@ func testServicePortUpdate(t *testing.T, ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) var svc, updatedSvc *corev1.Service switch svcType { case corev1.ServiceTypeClusterIP: - svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) - updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort+1), corev1.ProtocolTCP, nil, nil, false) + svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort+1), corev1.ProtocolTCP, nil, nil, false, nil) case corev1.ServiceTypeNodePort: svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort+1), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) @@ -2058,7 +2075,7 @@ func testServiceNodePortUpdate(t *testing.T, ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) var svc, updatedSvc *corev1.Service switch svcType { @@ -2143,7 +2160,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) var svc, updatedSvc *corev1.Service switch svcType { @@ -2248,13 +2265,13 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, withProxyAll) internalTrafficPolicyCluster := corev1.ServiceInternalTrafficPolicyCluster internalTrafficPolicyLocal := corev1.ServiceInternalTrafficPolicyLocal - svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicyCluster, false) - updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicyLocal, false) + svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicyCluster, false, nil) + updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicyLocal, false, nil) makeServiceMap(fp, svc) remoteEp, remoteEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep1IP, int32(svcPort), corev1.ProtocolTCP, false) @@ -2334,7 +2351,7 @@ func testServiceIngressIPsUpdate(t *testing.T, ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) var loadBalancerIPStrs, updatedLoadBalancerIPStrs []string for _, ip := range loadBalancerIPs { @@ -2421,15 +2438,15 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) var svc, updatedSvc *corev1.Service affinitySeconds := int32(10) updatedAffinitySeconds := int32(100) switch svcType { case corev1.ServiceTypeClusterIP: - svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &affinitySeconds, nil, false) - updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &updatedAffinitySeconds, nil, false) + svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &affinitySeconds, nil, false, nil) + updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &updatedAffinitySeconds, nil, false, nil) case corev1.ServiceTypeNodePort: svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &affinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &updatedAffinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) @@ -2520,14 +2537,14 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) var svc, updatedSvc *corev1.Service affinitySeconds := int32(100) switch svcType { case corev1.ServiceTypeClusterIP: - svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) - updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &affinitySeconds, nil, false) + svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &affinitySeconds, nil, false, nil) case corev1.ServiceTypeNodePort: svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &affinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) @@ -2616,12 +2633,12 @@ func TestServicesWithSameEndpoints(t *testing.T) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(false) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, false) + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, false) svcPortName1 := makeSvcPortName("ns", "svc1", strconv.Itoa(svcPort), corev1.ProtocolTCP) svcPortName2 := makeSvcPortName("ns", "svc2", strconv.Itoa(svcPort), corev1.ProtocolTCP) - svc1 := makeTestClusterIPService(&svcPortName1, svc1IPv4, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) - svc2 := makeTestClusterIPService(&svcPortName2, svc2IPv4, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) + svc1 := makeTestClusterIPService(&svcPortName1, svc1IPv4, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + svc2 := makeTestClusterIPService(&svcPortName2, svc2IPv4, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) makeServiceMap(fp, svc1, svc2) ep1, ep1Port := makeTestEndpointSliceEndpointAndPort(&svcPortName1, ep1IPv4, int32(svcPort), corev1.ProtocolTCP, false) @@ -2778,7 +2795,7 @@ func TestGetServiceFlowKeys(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddressesIPv4, groupAllocator, false, withProxyAll) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddressesIPv4, groupAllocator, false, withProxyAll) if tc.svc != nil { makeServiceMap(fp, svc) } @@ -2807,3 +2824,53 @@ func TestGetServiceFlowKeys(t *testing.T) { }) } } + +func TestServiceLabelSelector(t *testing.T) { + ctrl := gomock.NewController(t) + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator(false) + svcPortName1 := makeSvcPortName("ns", "svc1", strconv.Itoa(svcPort), corev1.ProtocolTCP) + svcPortName2 := makeSvcPortName("ns", "svc2", strconv.Itoa(svcPort), corev1.ProtocolTCP) + svcPortName3 := makeSvcPortName("ns", "svc3", strconv.Itoa(svcPort), corev1.ProtocolTCP) + svcPortName4 := makeSvcPortName("ns", "svc4", strconv.Itoa(svcPort), corev1.ProtocolTCP) + svc1IP := net.ParseIP("1.1.1.1") + svc2IP := net.ParseIP("1.1.1.2") + svc3IP := net.ParseIP("1.1.1.3") + svc4IP := net.ParseIP("1.1.1.4") + svc1 := makeTestClusterIPService(&svcPortName1, svc1IP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + svc2 := makeTestClusterIPService(&svcPortName2, svc2IP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, map[string]string{labelServiceProxyName: testServiceProxyName}) + svc3 := makeTestClusterIPService(&svcPortName3, svc3IP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, map[string]string{labelServiceProxyName: "other"}) + svc4 := makeTestClusterIPService(&svcPortName4, svc4IP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, map[string]string{corev1.IsHeadlessService: ""}) + + // Service with label "service.kubernetes.io/headless" should be always ignored. + + // When ServiceProxyName is set, only the Service with the label "service.kubernetes.io/service-proxy-name=antrea" + // should be processed. Other Services without the label "service.kubernetes.io/service-proxy-name=antrea" should + // be ignored. + t.Run("ServiceProxyName", func(t *testing.T) { + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, false, withServiceProxyNameSet) + makeServiceMap(fp, svc1, svc2, svc3, svc4) + makeEndpointSliceMap(fp) + + groupID := fp.groupCounter.AllocateIfNotExist(svcPortName2, false) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, []k8sproxy.Endpoint{}).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svc2IP, uint16(svcPort), gomock.Any(), uint16(0), false, false).Times(1) + fp.syncProxyRules() + assert.Contains(t, fp.serviceInstalledMap, svcPortName2) + }) + + // When ServiceProxyName is not set, only the Services without the label "service.kubernetes.io/service-proxy-name" + // should be processed. Other Services with the label "service.kubernetes.io/service-proxy-name" (regardless of + // the value) should be ignored. + t.Run("empty ServiceProxyName", func(t *testing.T) { + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, false) + makeServiceMap(fp, svc1, svc2, svc3, svc4) + makeEndpointSliceMap(fp) + + groupID := fp.groupCounter.AllocateIfNotExist(svcPortName1, false) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, []k8sproxy.Endpoint{}).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svc1IP, uint16(svcPort), gomock.Any(), uint16(0), false, false).Times(1) + fp.syncProxyRules() + assert.Contains(t, fp.serviceInstalledMap, svcPortName1) + }) +} diff --git a/pkg/agent/proxy/service.go b/pkg/agent/proxy/service.go index 67557e73440..dc2c7094f51 100644 --- a/pkg/agent/proxy/service.go +++ b/pkg/agent/proxy/service.go @@ -18,6 +18,7 @@ import ( "sync" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/record" "antrea.io/antrea/pkg/agent/proxy/types" @@ -31,8 +32,8 @@ type serviceChangesTracker struct { initialized bool } -func newServiceChangesTracker(recorder record.EventRecorder, ipFamily v1.IPFamily, skipServices []string) *serviceChangesTracker { - return &serviceChangesTracker{tracker: k8sproxy.NewServiceChangeTracker(types.NewServiceInfo, ipFamily, recorder, nil, skipServices)} +func newServiceChangesTracker(recorder record.EventRecorder, ipFamily v1.IPFamily, serviceLabelSelector labels.Selector, skipServices []string) *serviceChangesTracker { + return &serviceChangesTracker{tracker: k8sproxy.NewServiceChangeTracker(types.NewServiceInfo, ipFamily, recorder, nil, serviceLabelSelector, skipServices)} } func (sh *serviceChangesTracker) OnServiceSynced() { diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 9ead6ed7710..ccc031e07c7 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -218,6 +218,10 @@ type AntreaProxyConfig struct { // kube-proxy is removed from the cluser, otherwise kube-proxy will still load-balance this traffic. // Defaults to true. ProxyLoadBalancerIPs *bool `yaml:"proxyLoadBalancerIPs,omitempty"` + // The value of service.kubernetes.io/service-proxy-name label for AntreaProxy to match. If it is set, then + // AntreaProxy only handles the Service objects matching this label. The default value is empty string, which + // means that AntreaProxy will manage all Service objects without the mentioned label. + ServiceProxyName string `yaml:"serviceProxyName,omitempty"` } type WireGuardConfig struct { diff --git a/third_party/proxy/service.go b/third_party/proxy/service.go index 0c3a6d9e27f..a8548683f07 100644 --- a/third_party/proxy/service.go +++ b/third_party/proxy/service.go @@ -47,6 +47,7 @@ import ( "sync" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" @@ -316,6 +317,7 @@ type ServiceChangeTracker struct { processServiceMapChange processServiceMapChangeFunc ipFamily v1.IPFamily recorder record.EventRecorder + serviceLabelSelector labels.Selector // skipServices indicates the service list for which we should skip proxying // it will be initialized from antrea-agent.conf skipServices sets.Set[string] @@ -326,6 +328,7 @@ func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder record.EventRecorder, processServiceMapChange processServiceMapChangeFunc, + serviceLabelSelector labels.Selector, skipServices []string) *ServiceChangeTracker { return &ServiceChangeTracker{ items: make(map[types.NamespacedName]*serviceChange), @@ -333,6 +336,7 @@ func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, recorder: recorder, ipFamily: ipFamily, processServiceMapChange: processServiceMapChange, + serviceLabelSelector: serviceLabelSelector, skipServices: sets.New[string](skipServices...), } } @@ -415,7 +419,7 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic return nil } - if utilproxy.ShouldSkipService(service, sct.skipServices) { + if utilproxy.ShouldSkipService(service, sct.skipServices, sct.serviceLabelSelector) { return nil } diff --git a/third_party/proxy/util/utils.go b/third_party/proxy/util/utils.go index e9ad456175f..935a3c9072d 100644 --- a/third_party/proxy/util/utils.go +++ b/third_party/proxy/util/utils.go @@ -45,12 +45,12 @@ import ( "net" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" - utilnet "k8s.io/utils/net" - "k8s.io/klog/v2" + utilnet "k8s.io/utils/net" ) var ( @@ -67,8 +67,12 @@ type Resolver interface { } // ShouldSkipService checks if a given service should skip proxying -func ShouldSkipService(service *v1.Service, skipServices sets.Set[string]) bool { - // if ClusterIP is "None" or empty, skip proxying +func ShouldSkipService(service *v1.Service, skipServices sets.Set[string], serviceLabelSelector labels.Selector) bool { + // Skip proxying if the Service label doesn't match the serviceLabelSelector. + if !serviceLabelSelector.Matches(labels.Set(service.Labels)) { + return true + } + // If ClusterIP is "None" or empty, skip proxying if service.Spec.ClusterIP == v1.ClusterIPNone || service.Spec.ClusterIP == "" { klog.V(3).Infof("Skipping service %s in namespace %s due to clusterIP = %q", service.Name, service.Namespace, service.Spec.ClusterIP) return true