diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 24eaf7df46b..5d10f890134 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -91,6 +91,10 @@ featureGates: # Enable NodeLatencyMonitor to monitor the latency between Nodes. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NodeLatencyMonitor" "default" false) }} +# Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to +# remote BGP peers. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "BGPPolicy" "default" false) }} + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index ef1c43e40e8..a2a74e45beb 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -177,6 +177,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -234,3 +235,13 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - list + - watch diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index dc57b20d28d..5f54dc54b5d 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,16 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - list + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5125,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 71bf05ff579aa9bea7b360669c5e2ce2830ca88dc4ab54480638ce006eaeaf11 + checksum/config: cce7d6644fb552607ebeda9bf30a5fafa871dd4382afc609500fcb493b61768c labels: app: antrea component: antrea-agent @@ -5348,7 +5363,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 71bf05ff579aa9bea7b360669c5e2ce2830ca88dc4ab54480638ce006eaeaf11 + checksum/config: cce7d6644fb552607ebeda9bf30a5fafa871dd4382afc609500fcb493b61768c labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index dbc71ccf22b..c7114acb05a 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,16 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - list + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5125,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 71bf05ff579aa9bea7b360669c5e2ce2830ca88dc4ab54480638ce006eaeaf11 + checksum/config: cce7d6644fb552607ebeda9bf30a5fafa871dd4382afc609500fcb493b61768c labels: app: antrea component: antrea-agent @@ -5349,7 +5364,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 71bf05ff579aa9bea7b360669c5e2ce2830ca88dc4ab54480638ce006eaeaf11 + checksum/config: cce7d6644fb552607ebeda9bf30a5fafa871dd4382afc609500fcb493b61768c labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 206405a4548..e152b21dd6b 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,16 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - list + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5125,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 91ff2b609519e4aaead6ab850252a49bbe674dec17f6f239c4d0fa6c7b5705f6 + checksum/config: e30c52c9fcb04d362d018e846cf72dc633c5e891e02b3ebb87fab4d7ee08e15a labels: app: antrea component: antrea-agent @@ -5346,7 +5361,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 91ff2b609519e4aaead6ab850252a49bbe674dec17f6f239c4d0fa6c7b5705f6 + checksum/config: e30c52c9fcb04d362d018e846cf72dc633c5e891e02b3ebb87fab4d7ee08e15a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 5b828a7091c..fcbdb4d0b2f 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3820,6 +3820,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4458,6 +4462,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4515,6 +4520,16 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - list + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5123,7 +5138,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 2d75956786eb552eaba94f89dfa5c6bab570bf662b82449e9af31a57ca138750 + checksum/config: 73a49a9a8508cc8fb94eb2c770bb3589e68d9623327231943cba60a48716568a checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -5405,7 +5420,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 2d75956786eb552eaba94f89dfa5c6bab570bf662b82449e9af31a57ca138750 + checksum/config: 73a49a9a8508cc8fb94eb2c770bb3589e68d9623327231943cba60a48716568a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index ecbb28127a8..0640d1114a8 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,16 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - list + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5125,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ebc0be79b0fc65db51609f5c9185ca8a0533e265811d14c687f577cf93497a58 + checksum/config: 20130c4a5dbfeec75182bc3053288f64c06d0350b34c86675ac88d5961c47853 labels: app: antrea component: antrea-agent @@ -5346,7 +5361,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ebc0be79b0fc65db51609f5c9185ca8a0533e265811d14c687f577cf93497a58 + checksum/config: 20130c4a5dbfeec75182bc3053288f64c06d0350b34c86675ac88d5961c47853 labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 4f7099b6ae0..5bd503571f1 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -38,6 +38,7 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/controller/bgp" "antrea.io/antrea/pkg/agent/controller/egress" "antrea.io/antrea/pkg/agent/controller/ipseccertificate" "antrea.io/antrea/pkg/agent/controller/l7flowexporter" @@ -743,6 +744,23 @@ func run(o *Options) error { } } + if features.DefaultFeatureGate.Enabled(features.BGPPolicy) { + bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies() + bgpController, err := bgp.NewBGPPolicyController(nodeInformer, + serviceInformer, + egressInformer, + bgpPolicyInformer, + endpointSliceInformer, + o.enableEgress, + k8sClient, + nodeConfig, + networkConfig) + if err != nil { + return err + } + go bgpController.Run(ctx) + } + if features.DefaultFeatureGate.Enabled(features.TrafficControl) { tcController := trafficcontrol.NewTrafficControlController(ofClient, ifaceStore, diff --git a/docs/feature-gates.md b/docs/feature-gates.md index 08f60c32635..add4c40da8a 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -59,6 +59,7 @@ edit the Agent configuration in the | `EgressSeparateSubnet` | Agent | `false` | Alpha | v1.15 | N/A | N/A | No | | | `NodeNetworkPolicy` | Agent | `false` | Alpha | v1.15 | N/A | N/A | Yes | | | `L7FlowExporter` | Agent | `false` | Alpha | v1.15 | N/A | N/A | Yes | | +| `BGPPolicy` | Agent | `false` | Alpha | v2.1 | N/A | N/A | No | | ## Description and Requirements of Features @@ -435,3 +436,13 @@ Refer to this [document](network-flow-visibility.md#l7-visibility) for more info #### Requirements for this Feature - Linux Nodes only. + +### BGPPolicy + +`BGPPolicy` allows users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs (e.g., +ClusterIPs, ExternalIPs, LoadBalancerIPs), Pod IPs and Egress IPs to remote BGP peers, providing a flexible mechanism +for integrating Kubernetes clusters with external BGP-enabled networks. + +#### Requirements for this Feature + +- Linux Nodes only. diff --git a/pkg/agent/controller/bgp/controller.go b/pkg/agent/controller/bgp/controller.go new file mode 100644 index 00000000000..ed303852f1a --- /dev/null +++ b/pkg/agent/controller/bgp/controller.go @@ -0,0 +1,931 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bgp + +import ( + "context" + "encoding/json" + "fmt" + "hash/fnv" + "net" + "reflect" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + apitypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + discoveryinformers "k8s.io/client-go/informers/discovery/v1" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + discoverylisters "k8s.io/client-go/listers/discovery/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + utilnet "k8s.io/utils/net" + "k8s.io/utils/ptr" + "k8s.io/utils/strings/slices" + + "antrea.io/antrea/pkg/agent/bgp" + "antrea.io/antrea/pkg/agent/bgp/gobgp" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + "antrea.io/antrea/pkg/apis/crd/v1beta1" + crdinformersv1a1 "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + crdinformersv1b1 "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1" + crdlistersv1a1 "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + crdlistersv1b1 "antrea.io/antrea/pkg/client/listers/crd/v1beta1" + "antrea.io/antrea/pkg/util/env" +) + +const ( + controllerName = "BGPPolicyController" + // How long to wait before retrying the processing of a BGPPolicy change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Disable resyncing. + resyncPeriod time.Duration = 0 +) + +const ( + ipv4Suffix = "/32" + ipv6Suffix = "/128" +) + +const dummyKey = "dummyKey" + +type bgpPolicyState struct { + // The local BGP server. + bgpServer bgp.Interface + // The port on which the local BGP server listens. + listenPort int32 + // The AS number used by the local BGP server. + localASN int32 + // The router ID used by the local BGP server. + routerID string + // routes stores all BGP routers advertised to BGP peers. + routes sets.Set[bgp.Route] + // peerConfigs is a map that stores configurations of BGP peers. The map keys are the concatenated strings of BGP + // peer IP address and ASN (e.g., "192.168.77.100-65000", "2001::1-65000"). + peerConfigs map[string]bgp.PeerConfig +} + +type Controller struct { + nodeInformer cache.SharedIndexInformer + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced + + serviceInformer cache.SharedIndexInformer + serviceLister corelisters.ServiceLister + serviceListerSynced cache.InformerSynced + + egressInformer cache.SharedIndexInformer + egressLister crdlistersv1b1.EgressLister + egressListerSynced cache.InformerSynced + + bgpPolicyInformer cache.SharedIndexInformer + bgpPolicyLister crdlistersv1a1.BGPPolicyLister + bgpPolicyListerSynced cache.InformerSynced + + endpointSliceInformer cache.SharedIndexInformer + endpointSliceLister discoverylisters.EndpointSliceLister + endpointSliceListerSynced cache.InformerSynced + + secretInformer cache.SharedIndexInformer + + bgpPolicyState *bgpPolicyState + + k8sClient kubernetes.Interface + bgpPeerPasswords map[string]string + bgpPeerPasswordsMutex sync.RWMutex + + nodeName string + enabledIPv4 bool + enabledIPv6 bool + podIPv4CIDR string + podIPv6CIDR string + nodeIPv4Addr string + + egressEnabled bool + + newBGPServerFn func(globalConfig *bgp.GlobalConfig) bgp.Interface + + queue workqueue.RateLimitingInterface +} + +func NewBGPPolicyController(nodeInformer coreinformers.NodeInformer, + serviceInformer coreinformers.ServiceInformer, + egressInformer crdinformersv1b1.EgressInformer, + bgpPolicyInformer crdinformersv1a1.BGPPolicyInformer, + endpointSliceInformer discoveryinformers.EndpointSliceInformer, + egressEnabled bool, + k8sClient kubernetes.Interface, + nodeConfig *config.NodeConfig, + networkConfig *config.NetworkConfig) (*Controller, error) { + c := &Controller{ + nodeInformer: nodeInformer.Informer(), + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + serviceInformer: serviceInformer.Informer(), + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + bgpPolicyInformer: bgpPolicyInformer.Informer(), + bgpPolicyLister: bgpPolicyInformer.Lister(), + bgpPolicyListerSynced: bgpPolicyInformer.Informer().HasSynced, + endpointSliceInformer: endpointSliceInformer.Informer(), + endpointSliceLister: endpointSliceInformer.Lister(), + endpointSliceListerSynced: endpointSliceInformer.Informer().HasSynced, + k8sClient: k8sClient, + bgpPeerPasswords: make(map[string]string), + nodeName: nodeConfig.Name, + enabledIPv4: networkConfig.IPv4Enabled, + enabledIPv6: networkConfig.IPv6Enabled, + podIPv4CIDR: nodeConfig.PodIPv4CIDR.String(), + podIPv6CIDR: nodeConfig.PodIPv6CIDR.String(), + nodeIPv4Addr: nodeConfig.NodeIPv4Addr.IP.String(), + egressEnabled: egressEnabled, + newBGPServerFn: func(globalConfig *bgp.GlobalConfig) bgp.Interface { + return gobgp.NewGoBGPServer(globalConfig) + }, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "bgpPolicy"), + } + c.bgpPolicyInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addBGPPolicy, + UpdateFunc: c.updateBGPPolicy, + DeleteFunc: c.deleteBGPPolicy, + }, + resyncPeriod, + ) + c.serviceInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addService, + UpdateFunc: c.updateService, + DeleteFunc: c.deleteService, + }, + resyncPeriod, + ) + c.endpointSliceInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addEndpointSlice, + UpdateFunc: c.updateEndpointSlice, + DeleteFunc: c.deleteEndpointSlice, + }, + resyncPeriod, + ) + if c.egressEnabled { + c.egressInformer = egressInformer.Informer() + c.egressLister = egressInformer.Lister() + c.egressListerSynced = egressInformer.Informer().HasSynced + c.egressInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addEgress, + UpdateFunc: c.updateEgress, + DeleteFunc: c.deleteEgress, + }, + resyncPeriod, + ) + } + c.nodeInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addNode, + UpdateFunc: c.updateNode, + DeleteFunc: nil, + }, + resyncPeriod, + ) + + c.secretInformer = coreinformers.NewFilteredSecretInformer(k8sClient, + env.GetAntreaNamespace(), + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", types.BGPPolicySecretName).String() + }) + c.secretInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addSecret, + UpdateFunc: c.updateSecret, + DeleteFunc: c.deleteSecret, + }) + + return c, nil +} + +func (c *Controller) Run(ctx context.Context) { + defer c.queue.ShutDown() + + klog.InfoS("Starting", "controllerName", controllerName) + defer klog.InfoS("Shutting down", "controllerName", controllerName) + + go c.secretInformer.Run(ctx.Done()) + + cacheSyncs := []cache.InformerSynced{ + c.nodeListerSynced, + c.serviceListerSynced, + c.bgpPolicyListerSynced, + c.endpointSliceListerSynced, + c.serviceListerSynced, + c.secretInformer.HasSynced, + } + if c.egressEnabled { + cacheSyncs = append(cacheSyncs, c.egressListerSynced) + } + if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), cacheSyncs...) { + return + } + + go wait.UntilWithContext(ctx, c.worker, time.Second) + + <-ctx.Done() +} + +func (c *Controller) worker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *Controller) processNextWorkItem(ctx context.Context) bool { + _, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(dummyKey) + + if err := c.syncBGPPolicy(ctx); err == nil { + // If no error occurs we Forget this item, so it does not get queued again until another change happens. + c.queue.Forget(dummyKey) + } else { + // Put the item back on the work queue to handle any transient errors. + c.queue.AddRateLimited(dummyKey) + klog.ErrorS(err, "Syncing BGPPolicy failed, requeue") + } + return true +} + +func (c *Controller) getEffectiveBGPPolicy() *v1alpha1.BGPPolicy { + allPolicies, _ := c.bgpPolicyLister.List(labels.Everything()) + var oldestPolicy *v1alpha1.BGPPolicy + for _, policy := range allPolicies { + if c.matchesCurrentNode(policy) { + if oldestPolicy == nil || policy.CreationTimestamp.Before(&oldestPolicy.CreationTimestamp) { + oldestPolicy = policy + } + } + } + return oldestPolicy +} + +func (c *Controller) syncBGPPolicy(ctx context.Context) error { + ctx, cancel := context.WithTimeoutCause(ctx, 60*time.Second, fmt.Errorf("BGPPolicy took too long to sync")) + defer cancel() + + startTime := time.Now() + defer func() { + klog.InfoS("Finished syncing BGPPolicy", "durationTime", time.Since(startTime)) + }() + + // Get the oldest BGPPolicy applied to the current Node as the effective BGPPolicy. + effectivePolicy := c.getEffectiveBGPPolicy() + + // When the effective BGPPolicy is nil, it means that there is no available BGPPolicy. + if effectivePolicy == nil { + // If the BGPPolicy state is nil, just return. + if c.bgpPolicyState == nil { + return nil + } + + // If the BGPPolicy state is not nil, stop the BGP server and reset the state to nil, then return. + if err := c.bgpPolicyState.bgpServer.Stop(ctx); err != nil { + return err + } + c.bgpPolicyState = nil + return nil + } + + klog.V(2).InfoS("Syncing BGPPolicy", "BGPPolicy", klog.KObj(effectivePolicy)) + // Retrieve the listen port, local AS number and router ID from the effective BGPPolicy, and update them to the + // current state. + routerID, err := c.getRouterID() + if err != nil { + return err + } + listenPort := *effectivePolicy.Spec.ListenPort + localASN := effectivePolicy.Spec.LocalASN + + // If the BGPPolicy state is nil, a new BGP server should be started, initialize the BGPPolicy state to store the + // new BGP server, listen port, local ASN, and router ID. + // If the BGPPolicy is not nil, any of the listen port, local AS number, or router ID have changed, stop the current + // BGP server first and reset the BGPPolicy state to nil; then start a new BGP server and initialize the BGPPolicy + // state to store the new BGP server, listen port, local ASN, and router ID. + needUpdateBGPServer := c.bgpPolicyState == nil || + c.bgpPolicyState.listenPort != listenPort || + c.bgpPolicyState.localASN != localASN || + c.bgpPolicyState.routerID != routerID + + if needUpdateBGPServer { + if c.bgpPolicyState != nil { + // Stop the current BGP server. + if err := c.bgpPolicyState.bgpServer.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop current BGP server: %w", err) + } + // Reset the BGPPolicy state to nil. + c.bgpPolicyState = nil + } + + // Create a new BGP server. + bgpServer := c.newBGPServerFn(&bgp.GlobalConfig{ + ASN: uint32(localASN), + RouterID: routerID, + ListenPort: listenPort, + }) + + // Start the new BGP server. + if err := bgpServer.Start(ctx); err != nil { + return fmt.Errorf("failed to start BGP server: %w", err) + } + + // Initialize the BGPPolicy state to store the new BGP server, listen port, local ASN, and router ID. + c.bgpPolicyState = &bgpPolicyState{ + bgpServer: bgpServer, + routerID: routerID, + listenPort: listenPort, + localASN: localASN, + routes: make(sets.Set[bgp.Route]), + peerConfigs: make(map[string]bgp.PeerConfig), + } + } + + // Reconcile BGP peers. + if err := c.reconcileBGPPeers(ctx, effectivePolicy.Spec.BGPPeers); err != nil { + return err + } + + // Reconcile BGP advertisements. + if err := c.reconcileBGPAdvertisements(ctx, effectivePolicy.Spec.Advertisements); err != nil { + return err + } + + return nil +} + +func (c *Controller) reconcileBGPPeers(ctx context.Context, bgpPeers []v1alpha1.BGPPeer) error { + curPeerConfigs := c.getPeerConfigs(bgpPeers) + prePeerConfigs := c.bgpPolicyState.peerConfigs + prePeerKeys := sets.KeySet(prePeerConfigs) + curPeerKeys := sets.KeySet(curPeerConfigs) + + peerToAddKeys := curPeerKeys.Difference(prePeerKeys) + peerToUpdateKeys := sets.New[string]() + for peerKey := range prePeerKeys.Intersection(curPeerKeys) { + prevPeerConfig := prePeerConfigs[peerKey] + curPeerConfig := curPeerConfigs[peerKey] + if !reflect.DeepEqual(prevPeerConfig, curPeerConfig) { + peerToUpdateKeys.Insert(peerKey) + } + } + peerToDeleteKeys := prePeerKeys.Difference(curPeerKeys) + + bgpServer := c.bgpPolicyState.bgpServer + for key := range peerToAddKeys { + peerConfig := curPeerConfigs[key] + if err := bgpServer.AddPeer(ctx, peerConfig); err != nil { + return err + } + c.bgpPolicyState.peerConfigs[key] = peerConfig + } + for key := range peerToUpdateKeys { + peerConfig := curPeerConfigs[key] + if err := bgpServer.UpdatePeer(ctx, peerConfig); err != nil { + return err + } + c.bgpPolicyState.peerConfigs[key] = peerConfig + } + for key := range peerToDeleteKeys { + peerConfig := prePeerConfigs[key] + if err := bgpServer.RemovePeer(ctx, peerConfig); err != nil { + return err + } + delete(c.bgpPolicyState.peerConfigs, key) + } + + return nil +} + +func (c *Controller) reconcileBGPAdvertisements(ctx context.Context, bgpAdvertisements v1alpha1.Advertisements) error { + curRoutes, err := c.getRoutes(bgpAdvertisements) + if err != nil { + return err + } + preRoutes := c.bgpPolicyState.routes + routesToAdvertise := curRoutes.Difference(preRoutes) + routesToWithdraw := preRoutes.Difference(curRoutes) + + bgpServer := c.bgpPolicyState.bgpServer + for route := range routesToAdvertise { + if err := bgpServer.AdvertiseRoutes(ctx, []bgp.Route{route}); err != nil { + return err + } + c.bgpPolicyState.routes.Insert(route) + } + for route := range routesToWithdraw { + if err := bgpServer.WithdrawRoutes(ctx, []bgp.Route{route}); err != nil { + return err + } + c.bgpPolicyState.routes.Delete(route) + } + + return nil +} + +func hashNodeNameToIP(s string) string { + h := fnv.New32a() // Create a new FNV hash + h.Write([]byte(s)) + hashValue := h.Sum32() // Get the 32-bit hash + + // Convert the hash to a 4-byte slice + ip := make(net.IP, 4) + ip[0] = byte(hashValue >> 24) + ip[1] = byte(hashValue >> 16) + ip[2] = byte(hashValue >> 8) + ip[3] = byte(hashValue) + + return ip.String() +} + +func (c *Controller) getRouterID() (string, error) { + // According to RFC 4271: + // BGP Identifier: + // This 4-octet unsigned integer indicates the BGP Identifier of + // the sender. A given BGP speaker sets the value of its BGP + // Identifier to an IP address that is assigned to that BGP + // speaker. The value of the BGP Identifier is determined upon + // startup and is the same for every local interface and BGP peer. + // + // In goBGP, only an IPv4 address can be used as the BGP Identifier (BGP router ID). + // For IPv4-only or dual-stack Kubernetes clusters, the Node's IPv4 address is used as the BGP router ID, ensuring + // uniqueness. + // For IPv6-only Kubernetes clusters without a Node IPv4 address, the router ID could be specified in the Node + // annotation `node.antrea.io/bgp-router-id`. If the annotation is not present, an IPv4 address will be generated by + // hashing the Node name and updated to the Node annotation `node.antrea.io/bgp-router-id`. + + if c.enabledIPv4 { + return c.nodeIPv4Addr, nil + } + + nodeObj, err := c.nodeLister.Get(c.nodeName) + if err != nil { + return "", fmt.Errorf("failed to get Node object: %w", err) + } + + var exists bool + var routerID string + routerID, exists = nodeObj.GetAnnotations()[types.NodeBGPRouterIDAnnotationKey] + if !exists { + routerID = hashNodeNameToIP(c.nodeName) + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]string{ + types.NodeBGPRouterIDAnnotationKey: routerID, + }, + }, + }) + if _, err := c.k8sClient.CoreV1().Nodes().Patch(context.TODO(), c.nodeName, apitypes.MergePatchType, patch, metav1.PatchOptions{}, "status"); err != nil { + return "", fmt.Errorf("failed to patch BGP router ID to Node annotation %s: %w", types.NodeBGPRouterIDAnnotationKey, err) + } + } else if !utilnet.IsIPv4String(routerID) { + return "", fmt.Errorf("BGP router ID should be an IPv4 address string") + } + return routerID, nil +} + +func (c *Controller) getRoutes(advertisements v1alpha1.Advertisements) (sets.Set[bgp.Route], error) { + allRoutes := sets.New[bgp.Route]() + + if advertisements.Service != nil { + c.addServiceRoutes(advertisements.Service, allRoutes) + } + if c.egressEnabled && advertisements.Egress != nil { + c.addEgressRoutes(allRoutes) + } + if advertisements.Pod != nil { + c.addPodRoutes(allRoutes) + } + + return allRoutes, nil +} + +func (c *Controller) addServiceRoutes(advertisement *v1alpha1.ServiceAdvertisement, allRoutes sets.Set[bgp.Route]) { + ipTypes := sets.New(advertisement.IPTypes...) + services, _ := c.serviceLister.List(labels.Everything()) + + var serviceIPs []string + for _, svc := range services { + internalLocal := svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal + externalLocal := svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyLocal + var hasLocalEndpoints bool + if internalLocal || externalLocal { + hasLocalEndpoints = c.hasLocalEndpoints(svc) + } + if ipTypes.Has(v1alpha1.ServiceIPTypeClusterIP) { + if internalLocal && hasLocalEndpoints || !internalLocal { + for _, clusterIP := range svc.Spec.ClusterIPs { + serviceIPs = append(serviceIPs, clusterIP) + } + } + } + if ipTypes.Has(v1alpha1.ServiceIPTypeExternalIP) { + if externalLocal && hasLocalEndpoints || !externalLocal { + for _, externalIP := range svc.Spec.ExternalIPs { + serviceIPs = append(serviceIPs, externalIP) + } + } + } + if ipTypes.Has(v1alpha1.ServiceIPTypeLoadBalancerIP) && svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + if externalLocal && hasLocalEndpoints || !externalLocal { + serviceIPs = append(serviceIPs, getIngressIPs(svc)...) + } + } + } + + for _, ip := range serviceIPs { + if c.enabledIPv4 && utilnet.IsIPv4String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv4Suffix}) + } + if c.enabledIPv6 && utilnet.IsIPv6String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv6Suffix}) + } + } +} + +func (c *Controller) addEgressRoutes(allRoutes sets.Set[bgp.Route]) { + egresses, _ := c.egressLister.List(labels.Everything()) + for _, eg := range egresses { + if eg.Status.EgressNode != c.nodeName { + continue + } + ip := eg.Status.EgressIP + if c.enabledIPv4 && utilnet.IsIPv4String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv4Suffix}) + } + if c.enabledIPv6 && utilnet.IsIPv6String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv6Suffix}) + } + } +} + +func (c *Controller) addPodRoutes(allRoutes sets.Set[bgp.Route]) { + if c.enabledIPv4 { + allRoutes.Insert(bgp.Route{Prefix: c.podIPv4CIDR}) + } + if c.enabledIPv6 { + allRoutes.Insert(bgp.Route{Prefix: c.podIPv6CIDR}) + } +} + +func (c *Controller) hasLocalEndpoints(svc *corev1.Service) bool { + labelSelector := labels.Set{discovery.LabelServiceName: svc.GetName()}.AsSelector() + items, _ := c.endpointSliceLister.EndpointSlices(svc.GetNamespace()).List(labelSelector) + for _, eps := range items { + for _, ep := range eps.Endpoints { + if ep.NodeName != nil && *ep.NodeName == c.nodeName { + return true + } + } + } + return false +} + +func (c *Controller) getPeerConfigs(peers []v1alpha1.BGPPeer) map[string]bgp.PeerConfig { + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + + peerConfigs := make(map[string]bgp.PeerConfig) + for i := range peers { + if c.enabledIPv4 && utilnet.IsIPv4String(peers[i].Address) || + c.enabledIPv6 && utilnet.IsIPv6String(peers[i].Address) { + peerKey := generateBGPPeerKey(peers[i].Address, peers[i].ASN) + + var password string + if p, exists := c.bgpPeerPasswords[peerKey]; exists { + password = p + } + + peerConfigs[peerKey] = bgp.PeerConfig{ + BGPPeer: &peers[i], + Password: password, + } + } + } + return peerConfigs +} + +func generateBGPPeerKey(address string, asn int32) string { + return fmt.Sprintf("%s-%d", address, asn) +} + +func (c *Controller) addBGPPolicy(obj interface{}) { + bgpPolicy := obj.(*v1alpha1.BGPPolicy) + if !c.matchesCurrentNode(bgpPolicy) { + return + } + klog.V(2).InfoS("Processing BGPPolicy ADD event", "BGPPolicy", klog.KObj(bgpPolicy)) + c.queue.Add(dummyKey) +} + +func (c *Controller) updateBGPPolicy(oldObj, obj interface{}) { + oldBGPPolicy := oldObj.(*v1alpha1.BGPPolicy) + policy := obj.(*v1alpha1.BGPPolicy) + if !c.matchesCurrentNode(policy) && !c.matchesCurrentNode(oldBGPPolicy) { + return + } + if policy.GetGeneration() != oldBGPPolicy.GetGeneration() { + klog.V(2).InfoS("Processing BGPPolicy UPDATE event", "BGPPolicy", klog.KObj(policy)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) deleteBGPPolicy(obj interface{}) { + bgpPolicy := obj.(*v1alpha1.BGPPolicy) + if !c.matchesCurrentNode(bgpPolicy) { + return + } + klog.V(2).InfoS("Processing BGPPolicy DELETE event", "BGPPolicy", klog.KObj(bgpPolicy)) + c.queue.Add(dummyKey) +} + +func getIngressIPs(svc *corev1.Service) []string { + var ips []string + for _, ingress := range svc.Status.LoadBalancer.Ingress { + if ingress.IP != "" { + ips = append(ips, ingress.IP) + } + } + return ips +} + +func (c *Controller) matchesCurrentNode(bgpPolicy *v1alpha1.BGPPolicy) bool { + node, _ := c.nodeLister.Get(c.nodeName) + if node == nil { + return false + } + return matchesNode(node, bgpPolicy) +} + +func matchesNode(node *corev1.Node, bgpPolicy *v1alpha1.BGPPolicy) bool { + nodeSelector, _ := metav1.LabelSelectorAsSelector(&bgpPolicy.Spec.NodeSelector) + return nodeSelector.Matches(labels.Set(node.Labels)) +} + +func matchesService(svc *corev1.Service, bgpPolicy *v1alpha1.BGPPolicy) bool { + ipTypeMap := sets.New(bgpPolicy.Spec.Advertisements.Service.IPTypes...) + if ipTypeMap.Has(v1alpha1.ServiceIPTypeClusterIP) && len(svc.Spec.ClusterIPs) != 0 || + ipTypeMap.Has(v1alpha1.ServiceIPTypeExternalIP) && len(svc.Spec.ExternalIPs) != 0 || + ipTypeMap.Has(v1alpha1.ServiceIPTypeLoadBalancerIP) && len(getIngressIPs(svc)) != 0 { + return true + } + return false +} + +func (c *Controller) hasAffectedPolicyByService(svc *corev1.Service) bool { + allPolicies, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, policy := range allPolicies { + if policy.Spec.Advertisements.Service == nil || !c.matchesCurrentNode(policy) { + continue + } + if matchesService(svc, policy) { + return true + } + } + return false +} + +func (c *Controller) addService(obj interface{}) { + svc := obj.(*corev1.Service) + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing Service ADD event", "Service", klog.KObj(svc)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) updateService(oldObj, obj interface{}) { + oldSvc := oldObj.(*corev1.Service) + svc := obj.(*corev1.Service) + + if slices.Equal(oldSvc.Spec.ClusterIPs, svc.Spec.ClusterIPs) && + slices.Equal(oldSvc.Spec.ExternalIPs, svc.Spec.ExternalIPs) && + slices.Equal(getIngressIPs(oldSvc), getIngressIPs(svc)) && + oldSvc.Spec.ExternalTrafficPolicy == svc.Spec.ExternalTrafficPolicy && + ptr.Equal(oldSvc.Spec.InternalTrafficPolicy, svc.Spec.InternalTrafficPolicy) { + return + } + if c.hasAffectedPolicyByService(oldSvc) || c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing Service UPDATE event", "Service", klog.KObj(svc)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) deleteService(obj interface{}) { + svc := obj.(*corev1.Service) + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing Service DELETE event", "Service", klog.KObj(svc)) + c.queue.Add(dummyKey) + } +} + +func noLocalTrafficPolicy(svc *corev1.Service) bool { + internalTrafficCluster := svc.Spec.InternalTrafficPolicy == nil || *svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyCluster + if svc.Spec.Type == corev1.ServiceTypeClusterIP { + return internalTrafficCluster + } + externalTrafficCluster := svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeCluster + return internalTrafficCluster && externalTrafficCluster +} + +func (c *Controller) addEndpointSlice(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + svc, _ := c.serviceLister.Services(eps.GetNamespace()).Get(eps.GetLabels()[discovery.LabelServiceName]) + if svc == nil { + return + } + // Events of EndpointSlices for Services without a `Local` traffic policy are ignored, as the Service IPs will + // always be advertised. + if noLocalTrafficPolicy(svc) { + return + } + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing EndpointSlice ADD event", "EndpointSlice", klog.KObj(eps)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) updateEndpointSlice(_, obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + svc, _ := c.serviceLister.Services(eps.GetNamespace()).Get(eps.GetLabels()[discovery.LabelServiceName]) + if svc == nil { + return + } + // Events of EndpointSlices for Services without a `Local` traffic policy are ignored, as the Service IPs will + // always be advertised. + if noLocalTrafficPolicy(svc) { + return + } + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing EndpointSlice UPDATE event", "EndpointSlice", klog.KObj(eps)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) deleteEndpointSlice(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + svc, _ := c.serviceLister.Services(eps.GetNamespace()).Get(eps.GetLabels()[discovery.LabelServiceName]) + if svc == nil { + return + } + // Events of EndpointSlices for Services without a `Local` traffic policy are ignored, as the Service IPs will + // always be advertised. + if noLocalTrafficPolicy(svc) { + return + } + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing EndpointSlice DELETE event", "EndpointSlice", klog.KObj(eps)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) hasAffectedPolicyByEgress() bool { + allPolicies, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, policy := range allPolicies { + if !c.matchesCurrentNode(policy) { + continue + } + if policy.Spec.Advertisements.Egress != nil { + return true + } + } + return false +} + +func (c *Controller) addEgress(obj interface{}) { + eg := obj.(*v1beta1.Egress) + if eg.Status.EgressNode != c.nodeName { + return + } + if c.hasAffectedPolicyByEgress() { + klog.V(2).InfoS("Processing Egress ADD event", "Egress", klog.KObj(eg)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) updateEgress(oldObj, obj interface{}) { + oldEg := oldObj.(*v1beta1.Egress) + eg := obj.(*v1beta1.Egress) + if oldEg.Status.EgressNode != c.nodeName && eg.Status.EgressNode != c.nodeName { + return + } + if oldEg.Status.EgressIP == eg.Status.EgressIP && oldEg.Status.EgressNode == eg.Status.EgressNode { + return + } + if c.hasAffectedPolicyByEgress() { + klog.V(2).InfoS("Processing Egress UPDATE event", "Egress", klog.KObj(eg)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) deleteEgress(obj interface{}) { + eg := obj.(*v1beta1.Egress) + if eg.Status.EgressNode != c.nodeName { + return + } + if c.hasAffectedPolicyByEgress() { + klog.V(2).InfoS("Processing Egress DELETE event", "Egress", klog.KObj(eg)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) hasAffectedPolicyByNode(node *corev1.Node) bool { + allPolicies, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, policy := range allPolicies { + if matchesNode(node, policy) { + return true + } + } + return false +} + +func (c *Controller) addNode(obj interface{}) { + node := obj.(*corev1.Node) + if node.GetName() != c.nodeName { + return + } + if c.hasAffectedPolicyByNode(node) { + klog.V(2).InfoS("Processing Node UPDATE event", "Node", klog.KObj(node)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) updateNode(oldObj, obj interface{}) { + oldNode := oldObj.(*corev1.Node) + node := obj.(*corev1.Node) + if node.GetName() != c.nodeName { + return + } + if reflect.DeepEqual(node.GetLabels(), oldNode.GetLabels()) && + reflect.DeepEqual(node.GetAnnotations(), oldNode.GetAnnotations()) { + return + } + if c.hasAffectedPolicyByNode(oldNode) || c.hasAffectedPolicyByNode(node) { + klog.V(2).InfoS("Processing Node UPDATE event", "Node", klog.KObj(node)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) addSecret(obj interface{}) { + secret := obj.(*corev1.Secret) + klog.V(2).InfoS("Processing Secret ADD event", "Secret", klog.KObj(secret)) + c.updateBGPPeerPasswords(secret) + c.queue.Add(dummyKey) +} + +func (c *Controller) updateSecret(_, obj interface{}) { + secret := obj.(*corev1.Secret) + klog.V(2).InfoS("Processing Secret UPDATE event", "Secret", klog.KObj(secret)) + c.updateBGPPeerPasswords(secret) + c.queue.Add(dummyKey) +} + +func (c *Controller) deleteSecret(obj interface{}) { + klog.V(2).InfoS("Processing Secret DELETE event", "Secret", klog.KObj(obj.(*corev1.Secret))) + c.updateBGPPeerPasswords(nil) + c.queue.Add(dummyKey) +} + +func (c *Controller) updateBGPPeerPasswords(secret *corev1.Secret) { + c.bgpPeerPasswordsMutex.Lock() + defer c.bgpPeerPasswordsMutex.Unlock() + + c.bgpPeerPasswords = make(map[string]string) + if secret.Data != nil { + for k, v := range secret.Data { + c.bgpPeerPasswords[k] = string(v) + } + } +} diff --git a/pkg/agent/controller/bgp/controller_test.go b/pkg/agent/controller/bgp/controller_test.go new file mode 100644 index 00000000000..fba9c586697 --- /dev/null +++ b/pkg/agent/controller/bgp/controller_test.go @@ -0,0 +1,2043 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bgp + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + utilnet "k8s.io/utils/net" + "k8s.io/utils/ptr" + + "antrea.io/antrea/pkg/agent/bgp" + bgptest "antrea.io/antrea/pkg/agent/bgp/testing" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1" + fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/util/ip" +) + +const ( + namespaceDefault = "default" + namespaceKubeSystem = "kube-system" +) + +var ( + podIPv4CIDR = ip.MustParseCIDR("10.10.0.0/24") + podIPv6CIDR = ip.MustParseCIDR("fec0:10:10::/64") + nodeIPv4Addr = ip.MustParseCIDR("192.168.77.100/24") + + testNodeConfig = &config.NodeConfig{ + PodIPv4CIDR: podIPv4CIDR, + PodIPv6CIDR: podIPv6CIDR, + NodeIPv4Addr: nodeIPv4Addr, + Name: localNodeName, + } + + peer1ASN = int32(65531) + peer1AuthPassword = "bgp-peer1" // #nosec G101 + ipv4Peer1Addr = "192.168.77.251" + ipv6Peer1Addr = "fec0::196:168:77:251" + ipv4Peer1 = generateBGPPeer(ipv4Peer1Addr, peer1ASN, 179, 120) + ipv6Peer1 = generateBGPPeer(ipv6Peer1Addr, peer1ASN, 179, 120) + ipv4Peer1Config = generateBGPPeerConfig(&ipv4Peer1, peer1AuthPassword) + ipv6Peer1Config = generateBGPPeerConfig(&ipv6Peer1, peer1AuthPassword) + + peer2ASN = int32(65532) + peer2AuthPassword = "bgp-peer2" // #nosec G101 + ipv4Peer2Addr = "192.168.77.252" + ipv6Peer2Addr = "fec0::196:168:77:252" + ipv4Peer2 = generateBGPPeer(ipv4Peer2Addr, peer2ASN, 179, 120) + ipv6Peer2 = generateBGPPeer(ipv6Peer2Addr, peer2ASN, 179, 120) + ipv4Peer2Config = generateBGPPeerConfig(&ipv4Peer2, peer2AuthPassword) + ipv6Peer2Config = generateBGPPeerConfig(&ipv6Peer2, peer2AuthPassword) + + updatedIPv4Peer2 = generateBGPPeer(ipv4Peer2Addr, peer2ASN, 179, 60) + updatedIPv6Peer2 = generateBGPPeer(ipv6Peer2Addr, peer2ASN, 179, 60) + updatedIPv4Peer2Config = generateBGPPeerConfig(&updatedIPv4Peer2, peer2AuthPassword) + updatedIPv6Peer2Config = generateBGPPeerConfig(&updatedIPv6Peer2, peer2AuthPassword) + + peer3ASN = int32(65533) + peer3AuthPassword = "bgp-peer3" // #nosec G101 + ipv4Peer3Addr = "192.168.77.253" + ipv6Peer3Addr = "fec0::196:168:77:253" + ipv4Peer3 = generateBGPPeer(ipv4Peer3Addr, peer3ASN, 179, 120) + ipv6Peer3 = generateBGPPeer(ipv6Peer3Addr, peer3ASN, 179, 120) + ipv4Peer3Config = generateBGPPeerConfig(&ipv4Peer3, peer3AuthPassword) + ipv6Peer3Config = generateBGPPeerConfig(&ipv6Peer3, peer3AuthPassword) + + nodeLabels1 = map[string]string{"node": "control-plane"} + nodeLabels2 = map[string]string{"os": "linux"} + nodeLabels3 = map[string]string{"node": "control-plane", "os": "linux"} + nodeAnnotations1 = map[string]string{types.NodeBGPRouterIDAnnotationKey: "192.168.77.100"} + nodeAnnotations2 = map[string]string{types.NodeBGPRouterIDAnnotationKey: "10.10.0.100"} + + localNodeName = "local" + node = generateNode(localNodeName, nodeLabels1, nodeAnnotations1) + + ipv4EgressIP1 = "192.168.77.200" + ipv6EgressIP1 = "fec0::192:168:77:200" + ipv4EgressIP2 = "192.168.77.201" + ipv6EgressIP2 = "fec0::192:168:77:2001" + + ipv4Egress1 = generateEgress("eg1-4", ipv4EgressIP1, localNodeName) + ipv6Egress1 = generateEgress("eg1-6", ipv6EgressIP1, localNodeName) + ipv4Egress2 = generateEgress("eg2-4", ipv4EgressIP2, "test-remote-node") + ipv6Egress2 = generateEgress("eg2-6", ipv6EgressIP2, "test-remote-node") + + bgpPolicyName1 = "policy-1" + bgpPolicyName2 = "policy-2" + bgpPolicyName3 = "policy-3" + bgpPolicyName4 = "policy-4" + + creationTimestamp = metav1.Now() + creationTimestampAdd1s = metav1.NewTime(creationTimestamp.Add(time.Second)) + creationTimestampAdd2s = metav1.NewTime(creationTimestamp.Add(2 * time.Second)) + creationTimestampAdd3s = metav1.NewTime(creationTimestamp.Add(3 * time.Second)) + + clusterIPv4 = "10.96.10.10" + externalIPv4 = "192.168.77.100" + loadBalancerIPv4 = "192.168.77.150" + endpointIPv4 = "10.10.0.10" + clusterIPv6 = "fec0::10:96:10:10" + externalIPv6 = "fec0::192:168:77:100" + loadBalancerIPv6 = "fec0::192:168:77:150" + endpointIPv6 = "fec0::10:10:0:10" + + ipv4ClusterIPName1 = "clusterip-4" + ipv4ClusterIPName2 = "clusterip-4-local" + ipv6ClusterIPName1 = "clusterip-6" + ipv6ClusterIPName2 = "clusterip-6-local" + ipv4LoadBalancerName = "loadbalancer-4" + ipv6LoadBalancerName = "loadbalancer-6" + + endpointSliceSuffix = rand.String(5) + ipv4ClusterIP1 = generateService(ipv4ClusterIPName1, corev1.ServiceTypeClusterIP, clusterIPv4, externalIPv4, "", false, false) + ipv4ClusterIP1Eps = generateEndpointSlice(ipv4ClusterIPName1, endpointSliceSuffix, false, false, endpointIPv4) + ipv4ClusterIP2 = generateService(ipv4ClusterIPName2, corev1.ServiceTypeClusterIP, clusterIPv4, externalIPv4, "", true, true) + ipv4ClusterIP2Eps = generateEndpointSlice(ipv4ClusterIPName2, endpointSliceSuffix, false, false, endpointIPv4) + + ipv6ClusterIP1 = generateService(ipv6ClusterIPName1, corev1.ServiceTypeClusterIP, clusterIPv6, externalIPv6, "", false, false) + ipv6ClusterIP1Eps = generateEndpointSlice(ipv6ClusterIPName1, endpointSliceSuffix, false, false, endpointIPv6) + ipv6ClusterIP2 = generateService(ipv6ClusterIPName2, corev1.ServiceTypeClusterIP, clusterIPv6, externalIPv6, "", true, true) + ipv6ClusterIP2Eps = generateEndpointSlice(ipv6ClusterIPName2, endpointSliceSuffix, false, false, endpointIPv6) + + ipv4LoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, clusterIPv4, externalIPv4, loadBalancerIPv4, false, false) + ipv4LoadBalancerEps = generateEndpointSlice(ipv4LoadBalancerName, endpointSliceSuffix, false, false, endpointIPv4) + ipv6LoadBalancer = generateService(ipv6LoadBalancerName, corev1.ServiceTypeLoadBalancer, clusterIPv6, externalIPv6, loadBalancerIPv6, false, false) + ipv6LoadBalancerEps = generateEndpointSlice(ipv6LoadBalancerName, endpointSliceSuffix, false, false, endpointIPv6) + + bgpPeerPasswords = map[string]string{ + generateBGPPeerKey(ipv4Peer1Addr, peer1ASN): peer1AuthPassword, + generateBGPPeerKey(ipv6Peer1Addr, peer1ASN): peer1AuthPassword, + generateBGPPeerKey(ipv4Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv6Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv4Peer3Addr, peer3ASN): peer3AuthPassword, + generateBGPPeerKey(ipv6Peer3Addr, peer3ASN): peer3AuthPassword, + } +) + +type fakeController struct { + *Controller + mockController *gomock.Controller + mockBGPServer *bgptest.MockInterface + crdClient *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + client *fake.Clientset + informerFactory informers.SharedInformerFactory +} + +func (c *fakeController) startInformers(stopCh chan struct{}) { + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) +} + +func newFakeController(t *testing.T, objects []runtime.Object, crdObjects []runtime.Object, ipv4Enabled, ipv6Enabled bool) *fakeController { + ctrl := gomock.NewController(t) + mockBGPServer := bgptest.NewMockInterface(ctrl) + + client := fake.NewSimpleClientset(objects...) + crdClient := fakeversioned.NewSimpleClientset(crdObjects...) + + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0) + informerFactory := informers.NewSharedInformerFactory(client, 0) + + nodeInformer := informerFactory.Core().V1().Nodes() + serviceInformer := informerFactory.Core().V1().Services() + egressInformer := crdInformerFactory.Crd().V1beta1().Egresses() + endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices() + bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies() + + bgpController, _ := NewBGPPolicyController(nodeInformer, + serviceInformer, + egressInformer, + bgpPolicyInformer, + endpointSliceInformer, + true, + client, + testNodeConfig, + &config.NetworkConfig{ + IPv4Enabled: ipv4Enabled, + IPv6Enabled: ipv6Enabled, + }) + bgpController.egressEnabled = true + bgpController.newBGPServerFn = func(_ *bgp.GlobalConfig) bgp.Interface { + return mockBGPServer + } + + return &fakeController{ + Controller: bgpController, + mockController: ctrl, + mockBGPServer: mockBGPServer, + crdClient: crdClient, + crdInformerFactory: crdInformerFactory, + client: client, + informerFactory: informerFactory, + } +} + +func TestBGPPolicyAdd(t *testing.T) { + testCases := []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + policiesToAdd []runtime.Object + objects []runtime.Object + crdObjects []runtime.Object + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + expectedError string + }{ + { + name: "IPv4, as effective BGPPolicy, advertise ClusterIP", + ipv4Enabled: true, + policiesToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + false, + true, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1}), + }, + objects: []runtime.Object{ + ipv4ClusterIP1, + ipv4ClusterIP1Eps, + node, + }, + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + }, + }, + { + name: "IPv6, as effective BGPPolicy, advertise ExternalIP", + ipv6Enabled: true, + policiesToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + true, + true, + true, + false, + []v1alpha1.BGPPeer{ipv6Peer1})}, + objects: []runtime.Object{ + ipv6ClusterIP1, + ipv6ClusterIP1Eps, + node, + }, + expectedState: generateBGPPolicyState(179, + 65000, + "192.168.77.100", + []string{ipStrToPrefix(externalIPv6)}, + []bgp.PeerConfig{ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + }, + }, + { + name: "IPv4 & IPv6, as effective BGPPolicy, advertise LoadBalancerIP", + ipv4Enabled: true, + ipv6Enabled: true, + policiesToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1})}, + objects: []runtime.Object{ + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + }, + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4), ipStrToPrefix(loadBalancerIPv6)}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + }, + }, + { + name: "IPv4, as effective BGPPolicy, advertise EgressIP", + ipv4Enabled: true, + policiesToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + true, + true, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1})}, + objects: []runtime.Object{node}, + crdObjects: []runtime.Object{ + ipv4Egress1, + ipv4Egress2, + }, + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(ipv4EgressIP1)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv4EgressIP1)}}) + }, + }, + { + name: "IPv6, as effective BGPPolicy, advertise Pod CIDR", + ipv6Enabled: true, + policiesToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + true, + true, + true, + true, + []v1alpha1.BGPPeer{ipv6Peer1})}, + objects: []runtime.Object{node}, + expectedState: generateBGPPolicyState(179, + 65000, + "192.168.77.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "IPv4 & IPv6, as effective BGPPolicy, not advertise any Service IP due to no local Endpoint", + ipv4Enabled: true, + ipv6Enabled: true, + policiesToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 1179, + 65001, + true, + true, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1})}, + objects: []runtime.Object{ + ipv4ClusterIP2, + ipv4ClusterIP2Eps, + ipv6ClusterIP2, + ipv6ClusterIP2Eps, + node, + }, + expectedState: generateBGPPolicyState(1179, + 65001, + nodeIPv4Addr.IP.String(), + nil, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + }, + }, + { + name: "IPv4, as alternative BGPPolicy", + ipv4Enabled: true, + policiesToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName2, + creationTimestamp, // As the effective BGPPolicy because the creationTimestamp is the oldest. + nodeLabels1, + 179, + 65000, + true, + false, + false, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1}), + generateBGPPolicy(bgpPolicyName1, + creationTimestampAdd1s, + nodeLabels1, + 179, + 65000, + true, + false, + false, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1})}, + objects: []runtime.Object{ipv4ClusterIP1, ipv4ClusterIP1Eps, node}, + existingState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, tt.objects, append(tt.crdObjects, tt.policiesToAdd...), tt.ipv4Enabled, tt.ipv6Enabled) + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Fake the BGPPolicy state and the passwords of BGP peers. + c.bgpPolicyState = tt.existingState + if c.bgpPolicyState != nil { + c.bgpPolicyState.bgpServer = c.mockBGPServer + } + c.bgpPeerPasswords = bgpPeerPasswords + + // Wait for the dummy event triggered by BGPPolicy add events. + waitAndGetDummyEvent(t, c) + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + if tt.expectedError != "" { + assert.EqualError(t, c.syncBGPPolicy(ctx), tt.expectedError) + } else { + assert.NoError(t, c.syncBGPPolicy(ctx)) + } + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestBGPPolicyUpdate(t *testing.T) { + effectivePolicy := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + effectivePolicyState := generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ) + alternativePolicy := generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels1, + 1179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + unrelatedPolicy := generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd2s, + nodeLabels2, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + objects := []runtime.Object{ + ipv4ClusterIP2, + ipv4ClusterIP2Eps, + ipv6ClusterIP2, + ipv6ClusterIP2Eps, + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + } + crdObjects := []runtime.Object{ipv4Egress1, + ipv4Egress2, + ipv6Egress1, + ipv6Egress2, + effectivePolicy, + alternativePolicy, + unrelatedPolicy, + } + testCases := []struct { + name string + policyToUpdate *v1alpha1.BGPPolicy + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + expectedError string + }{ + { + name: "Effective BGPPolicy, update NodeSelector (not applied to current Node), an alternative takes effect", + policyToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels2, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + expectedState: generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + }, + { + name: "Effective BGPPolicy, update NodeSelector (not applied to current Node), failed to stop current BGP server", + policyToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels2, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()).Return(fmt.Errorf("failed to stop")) + }, + expectedState: deepCopyBGPPolicyState(effectivePolicyState), + expectedError: "failed to stop current BGP server: failed to stop", + }, + { + name: "Effective BGPPolicy, update Advertisements", + policyToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + true, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + ipStrToPrefix(ipv4EgressIP1), + ipStrToPrefix(ipv6EgressIP1), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv4EgressIP1)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv6EgressIP1)}}) + + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv6)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "Effective BGPPolicy, update LocalASN and Advertisements", + policyToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65001, + false, + true, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(179, + 65001, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + ipStrToPrefix(ipv4EgressIP1), + ipStrToPrefix(ipv6EgressIP1), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv4EgressIP1)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv6EgressIP1)}}) + }, + }, + { + name: "Effective BGPPolicy, update ListenPort", + policyToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 1179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "Effective BGPPolicy, update BGPPeers", + policyToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{updatedIPv4Peer2, + updatedIPv6Peer2, + ipv4Peer3, + ipv6Peer3}), + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{updatedIPv4Peer2Config, + updatedIPv6Peer2Config, + ipv4Peer3Config, + ipv6Peer3Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer3Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer3Config) + mockBGPServer.RemovePeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.RemovePeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.UpdatePeer(gomock.Any(), updatedIPv4Peer2Config) + mockBGPServer.UpdatePeer(gomock.Any(), updatedIPv6Peer2Config) + }, + }, + { + name: "Unrelated BGPPolicy, update NodeSelector (applied to current Node)", + policyToUpdate: generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd2s, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + existingState: deepCopyBGPPolicyState(effectivePolicyState), + expectedState: deepCopyBGPPolicyState(effectivePolicyState), + }, + { + name: "Alternative BGPPolicy, update Advertisements, LocalASN, ListenPort and BGPPeers", + policyToUpdate: generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels1, + 1179, + 65001, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + updatedIPv4Peer2, + ipv6Peer1, + updatedIPv6Peer2, + }), + existingState: deepCopyBGPPolicyState(effectivePolicyState), + expectedState: deepCopyBGPPolicyState(effectivePolicyState), + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, objects, crdObjects, true, true) + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Wait for the dummy event triggered by BGPPolicy add events, and mark it done directly + // since we fake the expected state. + waitAndGetDummyEvent(t, c) + doneDummyEvent(t, c) + + // Fake the BGPPolicy state the passwords of BGP peers. + c.bgpPolicyState = deepCopyBGPPolicyState(effectivePolicyState) + c.bgpPolicyState.bgpServer = c.mockBGPServer + c.bgpPeerPasswords = bgpPeerPasswords + + tt.policyToUpdate.Generation += 1 + _, err := c.crdClient.CrdV1alpha1().BGPPolicies().Update(context.TODO(), tt.policyToUpdate, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Wait for the dummy event triggered by BGPPolicy update events. + waitAndGetDummyEvent(t, c) + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + if tt.expectedError != "" { + assert.EqualError(t, c.syncBGPPolicy(ctx), tt.expectedError) + } else { + assert.NoError(t, c.syncBGPPolicy(ctx)) + } + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestBGPPolicyDelete(t *testing.T) { + policy1 := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ + ipv4Peer1, + ipv6Peer1, + }) + policy1State := generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + }, + []bgp.PeerConfig{ + ipv4Peer1Config, + ipv6Peer1Config, + }, + ) + policy2 := generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels1, + 179, + 65000, + false, + true, + false, + false, + false, + []v1alpha1.BGPPeer{ + ipv4Peer2, + ipv6Peer2, + }) + policy2State := generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ + ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + }, + []bgp.PeerConfig{ + ipv4Peer2Config, + ipv6Peer2Config}, + ) + policy3 := generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd1s, + nodeLabels1, + 1179, + 65000, + false, + true, + false, + false, + false, + []v1alpha1.BGPPeer{ + ipv4Peer2, + ipv6Peer2, + }) + policy3State := generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ + ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + }, + []bgp.PeerConfig{ + ipv4Peer2Config, + ipv6Peer2Config}, + ) + objects := []runtime.Object{ + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + } + testCases := []struct { + name string + policyToDelete string + crdObjects []runtime.Object + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + }{ + { + name: "Delete effective BGPPolicy and there is no alternative one", + policyToDelete: bgpPolicyName1, + crdObjects: []runtime.Object{policy1}, + existingState: deepCopyBGPPolicyState(policy1State), + expectedState: nil, + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()) + }, + }, + { + name: "Delete effective BGPPolicy and there is an alternative one, not need to start new BGP server", + policyToDelete: bgpPolicyName1, + crdObjects: []runtime.Object{policy1, policy2}, + existingState: deepCopyBGPPolicyState(policy1State), + expectedState: deepCopyBGPPolicyState(policy2State), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.RemovePeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.RemovePeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + }, + }, + { + name: "Delete effective BGPPolicy and there is an alternative one, need to start new BGP server", + policyToDelete: bgpPolicyName1, + crdObjects: []runtime.Object{policy1, policy3}, + existingState: deepCopyBGPPolicyState(policy1State), + expectedState: deepCopyBGPPolicyState(policy3State), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + }, + }, + { + name: "Delete an alternative BGPPolicy", + policyToDelete: bgpPolicyName2, + crdObjects: []runtime.Object{policy1, policy2}, + existingState: deepCopyBGPPolicyState(policy1State), + expectedState: deepCopyBGPPolicyState(policy1State), + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, objects, tt.crdObjects, true, true) + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Wait for the dummy event triggered by BGPPolicy add events, and mark it done. + waitAndGetDummyEvent(t, c) + doneDummyEvent(t, c) + + // Fake the BGPPolicy state and the passwords of BGP peers. + c.bgpPolicyState = tt.existingState + c.bgpPolicyState.bgpServer = c.mockBGPServer + c.bgpPeerPasswords = bgpPeerPasswords + + err := c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), tt.policyToDelete, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Wait for the dummy event triggered by BGPPolicy delete events. + waitAndGetDummyEvent(t, c) + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + assert.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestNodeUpdate(t *testing.T) { + policy1 := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + policy1State := generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{podIPv4CIDR.String(), podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}) + policy2 := generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels2, + 1179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + policy2State := generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{podIPv4CIDR.String(), podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}) + policy3 := generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd2s, + nodeLabels3, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + crdObjects := []runtime.Object{ + policy1, + policy2, + policy3, + } + testCases := []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + node *corev1.Node + updatedNode *corev1.Node + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + }{ + { + name: "Update labels, a BGPPolicy is added to alternatives", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels3, nodeAnnotations1), + existingState: deepCopyBGPPolicyState(policy1State), + expectedState: deepCopyBGPPolicyState(policy1State), + }, + { + name: "Update labels, a BGPPolicy is removed from alternatives", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels3, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + existingState: deepCopyBGPPolicyState(policy1State), + expectedState: deepCopyBGPPolicyState(policy1State), + }, + { + name: "Update labels, effective BGPPolicy is updated to another one", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels2, nodeAnnotations1), + existingState: deepCopyBGPPolicyState(policy1State), + expectedState: deepCopyBGPPolicyState(policy2State), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "Update labels, effective BGPPolicy is updated to empty", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nil, nodeAnnotations1), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()) + }, + existingState: deepCopyBGPPolicyState(policy1State), + }, + { + name: "IPv6 only, update annotations, effective BGPPolicy router ID is updated", + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels1, nodeAnnotations2), + existingState: generateBGPPolicyState(179, + 65000, + "192.168.77.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedState: generateBGPPolicyState(179, + 65000, + "10.10.0.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "IPv6 only, remove annotations, router ID is generated from Node name ", + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels1, nil), + existingState: generateBGPPolicyState(179, + 65000, + "192.168.77.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedState: generateBGPPolicyState(179, + 65000, + "156.67.103.8", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, nil, crdObjects, tt.ipv4Enabled, tt.ipv6Enabled) + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + // Initializing BGPPolicy objects will not trigger a dummy event because the local Node object has not been + // initialized and synced yet. The dummy event will be trigger by adding the local Node object. + _, err := c.client.CoreV1().Nodes().Create(context.TODO(), tt.node, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for the dummy event triggered by Node add event. + waitAndGetDummyEvent(t, c) + doneDummyEvent(t, c) + + // Fake the BGPPolicy state. + c.bgpPolicyState = tt.existingState + c.bgpPolicyState.bgpServer = c.mockBGPServer + + _, err = c.client.CoreV1().Nodes().Update(context.TODO(), tt.updatedNode, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Wait for the dummy event triggered by Node update events. + waitAndGetDummyEvent(t, c) + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + assert.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + if !tt.ipv4Enabled && tt.ipv6Enabled { + updatedNode, err := c.client.CoreV1().Nodes().Get(context.TODO(), localNodeName, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, updatedNode.Annotations) + assert.Equal(t, tt.expectedState.routerID, updatedNode.Annotations[types.NodeBGPRouterIDAnnotationKey]) + } + }) + } +} + +func TestServiceLifecycle(t *testing.T) { + policy := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + true, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1}) + c := newFakeController(t, []runtime.Object{node}, []runtime.Object{policy}, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + // Wait for the dummy event triggered by BGPPolicy add events. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + + // Create a Service configured with both `internalTrafficPolicy` and `externalTrafficPolicy` set to `Local`. + loadBalancer := generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.100", "192.168.77.150", true, true) + _, err := c.client.CoreV1().Services(namespaceDefault).Create(context.TODO(), loadBalancer, metav1.CreateOptions{}) + require.NoError(t, err) + + // Add an EndpointSlice without Endpoint IP for the Service. This could be happened at the moment after a Service is + // just created. + endpointSlice := generateEndpointSlice(ipv4LoadBalancerName, endpointSliceSuffix, true, false, "") + _, err = c.client.DiscoveryV1().EndpointSlices(namespaceDefault).Create(context.TODO(), endpointSlice, metav1.CreateOptions{}) + require.NoError(t, err) + + // Since both `internalTrafficPolicy` and `externalTrafficPolicy` are `Local` and no local Endpoint, no Service IP + // will be advertised. + waitAndGetDummyEvent(t, c) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update the EndpointSlice with a local Endpoint IP. + endpointSlice = generateEndpointSlice(ipv4LoadBalancerName, endpointSliceSuffix, true, false, "10.10.0.2") + _, err = c.client.DiscoveryV1().EndpointSlices(namespaceDefault).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Since there is a local Endpoint IP and both `internalTrafficPolicy` and `externalTrafficPolicy` are `Local`, the + // ClusterIP, externalIP, and LoadBalancerIP will be advertised. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "10.96.10.10/32"}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.100/32"}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.150/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update externalIP and LoadBalancerIP of the Service. Additionally, update both `externalTrafficPolicy` and + // `internalTrafficPolicy` to `Cluster`. + updatedLoadBalancer := generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", false, false) + _, err = c.client.CoreV1().Services(namespaceDefault).Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + // The stale externalIP and LoadBalancerIP will be withdrawn. The new externalIP and LoadBalancerIP will be advertised. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.151/32"}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.101/32"}}) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.100/32"}}) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.150/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update `externalTrafficPolicy` of the Service from `Cluster` to `Local`. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", false, true) + _, err = c.client.CoreV1().Services(namespaceDefault).Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Update the EndpointSlice with a remote Endpoint. + endpointSlice = generateEndpointSlice(ipv4LoadBalancerName, endpointSliceSuffix, false, false, "10.10.0.3") + _, err = c.client.DiscoveryV1().EndpointSlices(namespaceDefault).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Since there is no local Endpoint and `externalTrafficPolicy` is `Local`, the externalIP and LoadBalancerIP will be + // withdrawn. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.101/32"}}) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.151/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update `internalTrafficPolicy` of the Service from `Cluster` to `Local`. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", true, true) + _, err = c.client.CoreV1().Services(namespaceDefault).Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Since there is no local Endpoint and `internalTrafficPolicy` is `Local`, the ClusterIP will be withdrawn. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "10.96.10.10/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update `externalTrafficPolicy` of the Service from `Local` to `Cluster`. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", true, false) + _, err = c.client.CoreV1().Services(namespaceDefault).Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Since `externalTrafficPolicy` is `Cluster`, the ClusterIP will be advertised even if there is no local Endpoint. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.101/32"}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.151/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Delete the Service. + err = c.client.CoreV1().Services(namespaceDefault).Delete(context.TODO(), updatedLoadBalancer.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Since the Service is deleted, all corresponding Service IPs will be withdrawn. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.101/32"}}) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.151/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) +} + +func TestEgressLifecycle(t *testing.T) { + policy := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1}) + c := newFakeController(t, []runtime.Object{node}, []runtime.Object{policy}, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + // Wait for the dummy event triggered by BGPPolicy add events,. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + + // Create an Egress. + egress := generateEgress("eg1-4", "192.168.77.200", localNodeName) + _, err := c.crdClient.CrdV1beta1().Egresses().Create(context.TODO(), egress, metav1.CreateOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.200/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update the Egress. + updatedEgress := generateEgress("eg1-4", "192.168.77.201", localNodeName) + _, err = c.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), updatedEgress, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.200/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update the Egress. + updatedEgress = generateEgress("eg1-4", "192.168.77.201", "remote") + _, err = c.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), updatedEgress, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update the Egress. + updatedEgress = generateEgress("eg1-4", "192.168.77.201", localNodeName) + _, err = c.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), updatedEgress, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Delete the Egress. + err = c.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), updatedEgress.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) +} + +func TestBGPSecretUpdate(t *testing.T) { + policy := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv4Peer2, ipv4Peer3}) + c := newFakeController(t, []runtime.Object{node}, []runtime.Object{policy}, true, false) + + c.secretInformer = coreinformers.NewFilteredSecretInformer(c.client, + namespaceKubeSystem, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", types.BGPPolicySecretName).String() + }) + c.secretInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addSecret, + UpdateFunc: c.updateSecret, + DeleteFunc: c.deleteSecret, + }) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + go c.secretInformer.Run(stopCh) + + // Create the Secret. + secret := generateSecret(bgpPeerPasswords) + _, err := c.client.CoreV1().Secrets(namespaceKubeSystem).Create(context.TODO(), secret, metav1.CreateOptions{}) + require.NoError(t, err) + + require.Eventually(t, func() bool { + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + if reflect.DeepEqual(c.bgpPeerPasswords, bgpPeerPasswords) { + return true + } + return false + }, 5*time.Second, 10*time.Millisecond) + + // Wait for the dummy event triggered by BGPPolicy add events. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer3Config) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + + // Update the Secret. + updatedBGPPeerPasswords := map[string]string{ + generateBGPPeerKey(ipv4Peer1Addr, peer1ASN): "updated-" + peer1AuthPassword, + generateBGPPeerKey(ipv4Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv4Peer3Addr, peer3ASN): "updated-" + peer3AuthPassword, + } + updatedSecret := generateSecret(updatedBGPPeerPasswords) + _, err = c.client.CoreV1().Secrets(namespaceKubeSystem).Update(context.TODO(), updatedSecret, metav1.UpdateOptions{}) + require.NoError(t, err) + require.Eventually(t, func() bool { + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + if reflect.DeepEqual(c.bgpPeerPasswords, updatedBGPPeerPasswords) { + return true + } + return false + }, 5*time.Second, 10*time.Millisecond) + + // Wait for the dummy event triggered by Secret update event, and mark it done. + waitAndGetDummyEvent(t, c) + updatedIPv4Peer1Config := ipv4Peer1Config + updatedIPv4Peer3Config := ipv4Peer3Config + updatedIPv4Peer1Config.Password = "updated-" + peer1AuthPassword + updatedIPv4Peer3Config.Password = "updated-" + peer3AuthPassword + mockBGPServer.EXPECT().UpdatePeer(gomock.Any(), updatedIPv4Peer1Config) + mockBGPServer.EXPECT().UpdatePeer(gomock.Any(), updatedIPv4Peer3Config) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) +} + +func TestSyncBGPPolicyFailures(t *testing.T) { + policy1 := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer2}) + policy2 := generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels1, + 1179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1}) + policy3 := generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd2s, + nodeLabels1, + 1179, + 65000, + false, + true, + false, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer2}) + policy4 := generateBGPPolicy(bgpPolicyName4, + creationTimestampAdd3s, + nodeLabels1, + 1179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{updatedIPv4Peer2}) + objects := []runtime.Object{ + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + } + crdObjects := []runtime.Object{ + policy1, + policy2, + policy3, + policy4, + } + + c := newFakeController(t, objects, crdObjects, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Wait for the dummy event triggered by BGPPolicy ADD events. + waitAndGetDummyEvent(t, c) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + + checkBGPPolicyState(t, generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4)}, + []bgp.PeerConfig{ipv4Peer2Config}), + c.bgpPolicyState) + + // Delete the effective BGPPolicy policy1, and BGPPolicy policy2 will be the effective one. + require.NoError(t, c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), policy1.Name, metav1.DeleteOptions{})) + + waitAndGetDummyEvent(t, c) + + // The local ASN of BGPPolicy policy2 is not the same as BGPPolicy policy1, and the current BGP server should be stopped. + // Mock that failing in stopping the current BGP server. + mockBGPServer.EXPECT().Stop(gomock.Any()).Return(fmt.Errorf("failed reason")) + require.EqualError(t, c.syncBGPPolicy(ctx), "failed to stop current BGP server: failed reason") + checkBGPPolicyState(t, generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4)}, + []bgp.PeerConfig{ipv4Peer2Config}), + c.bgpPolicyState) + + // Mock the retry. Stop the current BGP server successfully, but fail in starting a new BGP server. + mockBGPServer.EXPECT().Stop(gomock.Any()) + mockBGPServer.EXPECT().Start(gomock.Any()).Return(fmt.Errorf("failed reason")) + require.EqualError(t, c.syncBGPPolicy(ctx), "failed to start BGP server: failed reason") + checkBGPPolicyState(t, nil, c.bgpPolicyState) + + // Mock the retry. Start BGP server successfully, but fail in adding BGP peer. + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config).Return(fmt.Errorf("failed to add BGP peer")) + require.EqualError(t, c.syncBGPPolicy(ctx), "failed to add BGP peer") + checkBGPPolicyState(t, generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{}, + []bgp.PeerConfig{}), + c.bgpPolicyState) + + // Mock the retry. Add the BGP peer successfully, but fail in advertising routes. + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}).Return(fmt.Errorf("failed to advertise routes")) + require.EqualError(t, c.syncBGPPolicy(ctx), "failed to advertise routes") + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, generateBGPPolicyState( + 1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{}, + []bgp.PeerConfig{ipv4Peer1Config}), + c.bgpPolicyState) + + // Delete the effective BGPPolicy policy2, and BGPPolicy policy3 will be the effective one. The BGP server doesn't need to + // be updated. The peers and routes will be reconciled according to the existing BGPPolicy state. + require.NoError(t, c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), policy2.Name, metav1.DeleteOptions{})) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.EXPECT().RemovePeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + checkBGPPolicyState(t, generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(externalIPv4)}, + []bgp.PeerConfig{ipv4Peer2Config}), + c.bgpPolicyState) + + // Delete the effective BGPPolicy policy3, and BGPPolicy policy4 will be the effective one. The BGP server doesn't need to + // be updated. The peers and routes will be reconciled according to the existing BGPPolicy state. + require.NoError(t, c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), policy3.Name, metav1.DeleteOptions{})) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().UpdatePeer(gomock.Any(), updatedIPv4Peer2Config) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + checkBGPPolicyState(t, generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4)}, + []bgp.PeerConfig{updatedIPv4Peer2Config}), + c.bgpPolicyState) +} + +func generateBGPPolicyState(listenPort int32, + localASN int32, + routerID string, + prefixes []string, + peerConfigs []bgp.PeerConfig) *bgpPolicyState { + routes := sets.New[bgp.Route]() + peerConfigMap := make(map[string]bgp.PeerConfig) + for _, prefix := range prefixes { + routes.Insert(bgp.Route{Prefix: prefix}) + } + for _, peerConfig := range peerConfigs { + peerKey := generateBGPPeerKey(peerConfig.Address, peerConfig.ASN) + peerConfigMap[peerKey] = peerConfig + } + return &bgpPolicyState{ + listenPort: listenPort, + localASN: localASN, + routerID: routerID, + routes: routes, + peerConfigs: peerConfigMap, + } +} + +func deepCopyBGPPolicyState(in *bgpPolicyState) *bgpPolicyState { + peerConfigMap := make(map[string]bgp.PeerConfig) + for _, peerConfig := range in.peerConfigs { + peerKey := generateBGPPeerKey(peerConfig.Address, peerConfig.ASN) + peerConfigMap[peerKey] = peerConfig + } + + return &bgpPolicyState{ + listenPort: in.listenPort, + localASN: in.localASN, + routerID: in.routerID, + routes: in.routes.Union(nil), + peerConfigs: peerConfigMap, + } +} + +func checkBGPPolicyState(t *testing.T, expected, got *bgpPolicyState) { + require.Equal(t, expected != nil, got != nil) + if expected != nil { + assert.Equal(t, expected.listenPort, got.listenPort) + assert.Equal(t, expected.localASN, got.localASN) + assert.Equal(t, expected.routerID, got.routerID) + assert.Equal(t, expected.routes, got.routes) + assert.Equal(t, expected.peerConfigs, got.peerConfigs) + } +} + +func generateBGPPolicy(name string, + creationTimestamp metav1.Time, + nodeSelector map[string]string, + listenPort int32, + localASN int32, + advertiseClusterIP bool, + advertiseExternalIP bool, + advertiseLoadBalancerIP bool, + advertiseEgressIP bool, + advertisePodCIDR bool, + externalPeers []v1alpha1.BGPPeer) *v1alpha1.BGPPolicy { + var advertisement v1alpha1.Advertisements + advertisement.Service = &v1alpha1.ServiceAdvertisement{} + if advertiseClusterIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeClusterIP) + } + if advertiseExternalIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeExternalIP) + } + if advertiseLoadBalancerIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeLoadBalancerIP) + } + if advertiseEgressIP { + advertisement.Egress = &v1alpha1.EgressAdvertisement{} + } + + if advertisePodCIDR { + advertisement.Pod = &v1alpha1.PodAdvertisement{} + } + return &v1alpha1.BGPPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "test-uid", + CreationTimestamp: creationTimestamp, + }, + Spec: v1alpha1.BGPPolicySpec{ + NodeSelector: metav1.LabelSelector{MatchLabels: nodeSelector}, + LocalASN: localASN, + ListenPort: &listenPort, + Advertisements: advertisement, + BGPPeers: externalPeers, + }, + } +} + +func generateService(name string, + svcType corev1.ServiceType, + clusterIP string, + externalIP string, + LoadBalancerIP string, + internalTrafficPolicyLocal bool, + externalTrafficPolicyLocal bool) *corev1.Service { + itp := corev1.ServiceInternalTrafficPolicyCluster + if internalTrafficPolicyLocal { + itp = corev1.ServiceInternalTrafficPolicyLocal + } + etp := corev1.ServiceExternalTrafficPolicyCluster + if externalTrafficPolicyLocal { + etp = corev1.ServiceExternalTrafficPolicyLocal + } + var externalIPs []string + if externalIP != "" { + externalIPs = append(externalIPs, externalIP) + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespaceDefault, + UID: "test-uid", + }, + Spec: corev1.ServiceSpec{ + Type: svcType, + ClusterIP: clusterIP, + Ports: []corev1.ServicePort{{ + Name: "p80", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + ClusterIPs: []string{clusterIP}, + ExternalIPs: externalIPs, + InternalTrafficPolicy: &itp, + ExternalTrafficPolicy: etp, + }, + } + if LoadBalancerIP != "" { + ingress := []corev1.LoadBalancerIngress{{IP: LoadBalancerIP}} + svc.Status.LoadBalancer.Ingress = ingress + } + return svc +} + +func generateEgress(name string, ip string, nodeName string) *crdv1b1.Egress { + return &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "test-uid", + }, + Spec: crdv1b1.EgressSpec{ + EgressIP: ip, + }, + Status: crdv1b1.EgressStatus{ + EgressIP: ip, + EgressNode: nodeName, + }, + } +} + +func generateNode(name string, labels, annotations map[string]string) *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "test-uid", + Labels: labels, + Annotations: annotations, + }, + } +} + +func generateEndpointSlice(svcName string, + suffix string, + isLocal bool, + isIPv6 bool, + endpointIP string) *discovery.EndpointSlice { + addrType := discovery.AddressTypeIPv4 + if isIPv6 { + addrType = discovery.AddressTypeIPv6 + } + var nodeName *string + if isLocal { + nodeName = &localNodeName + } + protocol := corev1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", svcName, suffix), + Namespace: namespaceDefault, + UID: "test-uid", + Labels: map[string]string{ + discovery.LabelServiceName: svcName, + }, + }, + AddressType: addrType, + } + if endpointIP != "" { + endpointSlice.Endpoints = []discovery.Endpoint{{ + Addresses: []string{ + endpointIP, + }, + Conditions: discovery.EndpointConditions{ + Ready: ptr.To(true), + }, + Hostname: nodeName, + NodeName: nodeName, + }} + endpointSlice.Ports = []discovery.EndpointPort{{ + Name: ptr.To("p80"), + Port: ptr.To(int32(80)), + Protocol: &protocol, + }} + } + return endpointSlice +} + +func generateBGPPeer(ip string, asn, port, gracefulRestartTimeSeconds int32) v1alpha1.BGPPeer { + return v1alpha1.BGPPeer{ + Address: ip, + Port: &port, + ASN: asn, + MultihopTTL: ptr.To(int32(1)), + GracefulRestartTimeSeconds: &gracefulRestartTimeSeconds, + } +} + +func generateBGPPeerConfig(peerConfig *v1alpha1.BGPPeer, password string) bgp.PeerConfig { + return bgp.PeerConfig{ + BGPPeer: peerConfig, + Password: password, + } +} + +func generateSecret(rawData map[string]string) *corev1.Secret { + data := make(map[string][]byte) + for k, v := range rawData { + data[k] = []byte(v) + } + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: types.BGPPolicySecretName, + Namespace: namespaceKubeSystem, + UID: "test-uid", + }, + Type: corev1.SecretTypeOpaque, + Data: data, + } +} + +func ipStrToPrefix(ipStr string) string { + if utilnet.IsIPv4String(ipStr) { + return ipStr + ipv4Suffix + } else if utilnet.IsIPv6String(ipStr) { + return ipStr + ipv6Suffix + } + return "" +} + +func waitAndGetDummyEvent(t *testing.T, c *fakeController) { + require.Eventually(t, func() bool { + return c.queue.Len() == 1 + }, 5*time.Second, 10*time.Millisecond) + c.queue.Get() +} +func doneDummyEvent(t *testing.T, c *fakeController) { + c.queue.Done(dummyKey) +} diff --git a/pkg/agent/types/annotations.go b/pkg/agent/types/annotations.go index b5b63e1a8d8..cc74150f280 100644 --- a/pkg/agent/types/annotations.go +++ b/pkg/agent/types/annotations.go @@ -27,6 +27,9 @@ const ( // NodeMaxEgressIPsAnnotationKey represents the key of maximum Egress IP number in the Annotations of the Node. NodeMaxEgressIPsAnnotationKey string = "node.antrea.io/max-egress-ips" + // NodeBGPRouterIDAnnotationKey represents the key of the Node's BGP router ID in the Annotations of the Node. + NodeBGPRouterIDAnnotationKey string = "node.antrea.io/bgp-router-id" + // ServiceExternalIPPoolAnnotationKey is the key of the Service annotation that specifies the Service's desired external IP pool. ServiceExternalIPPoolAnnotationKey string = "service.antrea.io/external-ip-pool" diff --git a/pkg/agent/types/bgppolicy.go b/pkg/agent/types/bgppolicy.go new file mode 100644 index 00000000000..8480797841a --- /dev/null +++ b/pkg/agent/types/bgppolicy.go @@ -0,0 +1,23 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +// BGPPolicySecretName is the name of the Kubernetes Secret used to store BGP peer passwords. Each entry in the Secret +// uses a key which is a concatenated string of BGP peer IP address and ASN. The value is the password for that BGP peer. +// +// Examples of keys: +// - "192.168.77.100-65000" +// - "2001:db8::1-65000" +const BGPPolicySecretName = "antrea-bgp-passwords" // #nosec G101 diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index adb630a1324..4475fcd2c47 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -55,6 +55,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "AntreaIPAM", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "AntreaPolicy", Status: "Disabled", Version: "BETA"}, {Component: "agent", Name: "AntreaProxy", Status: "Enabled", Version: "GA"}, + {Component: "agent", Name: "BGPPolicy", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "CleanupStaleUDPSvcConntrack", Status: cleanupStaleUDPSvcConntrackStatus, Version: "BETA"}, {Component: "agent", Name: "Egress", Status: egressStatus, Version: "BETA"}, {Component: "agent", Name: "EgressSeparateSubnet", Status: "Disabled", Version: "ALPHA"}, diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 05cb51a9df5..e6dd9c9570e 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -163,6 +163,11 @@ const ( // alpha: v2.1 // Enable the NodeLatencyMonitor feature. NodeLatencyMonitor featuregate.Feature = "NodeLatencyMonitor" + + // alpha: v2.1 + // Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress + // IPs to remote BGP peers. + BGPPolicy featuregate.Feature = "BGPPolicy" ) var ( @@ -179,6 +184,7 @@ var ( DefaultAntreaFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ AntreaPolicy: {Default: true, PreRelease: featuregate.Beta}, AntreaProxy: {Default: true, PreRelease: featuregate.GA}, + BGPPolicy: {Default: false, PreRelease: featuregate.Alpha}, Egress: {Default: true, PreRelease: featuregate.Beta}, EndpointSlice: {Default: true, PreRelease: featuregate.GA}, TopologyAwareHints: {Default: true, PreRelease: featuregate.Beta}, @@ -213,6 +219,7 @@ var ( AntreaIPAM, AntreaPolicy, AntreaProxy, + BGPPolicy, CleanupStaleUDPSvcConntrack, Egress, EndpointSlice, @@ -267,8 +274,11 @@ var ( // can have different FeatureSpecs between Linux and Windows, we should // still define a separate defaultAntreaFeatureGates map for Windows. unsupportedFeaturesOnWindows = map[featuregate.Feature]struct{}{ - Egress: {}, - AntreaIPAM: {}, + Egress: {}, + AntreaIPAM: {}, + // BGPPolicy feature is not validated on Windows yet. This can be removed + // in the future if it's fully tested on Windows. + BGPPolicy: {}, Multicast: {}, SecondaryNetwork: {}, ServiceExternalIP: {},