From afaec803c14cd63481d705e3465060f3d53fb06d Mon Sep 17 00:00:00 2001 From: Wenying Dong Date: Wed, 25 Sep 2024 14:29:26 +0800 Subject: [PATCH] [Windows] CNI Server installs OpenFlow entries after PortStatus message is received Signed-off-by: Wenying Dong --- cmd/antrea-agent/agent.go | 1 + go.mod | 9 +- go.sum | 18 +- pkg/agent/cniserver/pod_configuration.go | 150 +++++++- .../cniserver/pod_configuration_linux.go | 12 + .../cniserver/pod_configuration_linux_test.go | 2 +- pkg/agent/cniserver/pod_configuration_test.go | 323 ++++++++++++++++++ .../cniserver/pod_configuration_windows.go | 47 ++- pkg/agent/cniserver/pod_monitor.go | 111 ++++++ pkg/agent/cniserver/pod_monitor_linux.go | 33 ++ pkg/agent/cniserver/pod_monitor_test.go | 301 ++++++++++++++++ pkg/agent/cniserver/pod_monitor_windows.go | 56 +++ pkg/agent/cniserver/secondary.go | 2 +- pkg/agent/cniserver/server.go | 19 +- pkg/agent/cniserver/server_linux_test.go | 12 +- pkg/agent/cniserver/server_windows_test.go | 163 +++++---- pkg/agent/interfacestore/interface_cache.go | 5 + .../testing/mock_interfacestore.go | 12 + pkg/agent/interfacestore/types.go | 1 + pkg/agent/openflow/client.go | 7 + pkg/agent/openflow/client_test.go | 11 + pkg/agent/openflow/testing/mock_openflow.go | 13 + pkg/agent/types/annotations.go | 4 + pkg/ovs/openflow/interfaces.go | 3 + pkg/ovs/openflow/ofctrl_bridge.go | 24 ++ pkg/ovs/openflow/testing/mock_openflow.go | 12 + test/integration/agent/cniserver_test.go | 39 ++- 27 files changed, 1254 insertions(+), 136 deletions(-) create mode 100644 pkg/agent/cniserver/pod_configuration_test.go create mode 100644 pkg/agent/cniserver/pod_monitor.go create mode 100644 pkg/agent/cniserver/pod_monitor_linux.go create mode 100644 pkg/agent/cniserver/pod_monitor_test.go create mode 100644 pkg/agent/cniserver/pod_monitor_windows.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 60bd61503aa..162e4458237 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -600,6 +600,7 @@ func run(o *Options) error { o.config.CNISocket, o.config.HostProcPathPrefix, nodeConfig, + localPodInformer.Get(), k8sClient, routeClient, isChaining, diff --git a/go.mod b/go.mod index 64498a17633..609bf4ac347 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module antrea.io/antrea go 1.23.0 require ( - antrea.io/libOpenflow v0.14.0 - antrea.io/ofnet v0.12.0 + antrea.io/libOpenflow v0.15.0 + antrea.io/ofnet v0.14.0 github.com/ClickHouse/clickhouse-go/v2 v2.6.1 github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Mellanox/sriovnet v1.1.0 @@ -113,10 +113,9 @@ require ( github.com/aws/smithy-go v1.12.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cenk/hub v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cenkalti/hub v1.0.1 // indirect - github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect + github.com/cenkalti/hub v1.0.2 // indirect + github.com/cenkalti/rpc2 v1.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/go.sum b/go.sum index d74fb535f20..5b902c99d61 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE= -antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0= -antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao= -antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg= +antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI= +antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ= +antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc= +antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -114,14 +114,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= -github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA= -github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg= -github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M= +github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8= +github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU= +github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k= +github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index e44e739fd61..1a306df0989 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -20,11 +20,18 @@ import ( "net" "strings" "sync" + "time" current "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/cni/pkg/version" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + k8swait "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -61,6 +68,10 @@ const ( var ( getNSPath = util.GetNSPath + // retryInterval is the interval to re-install Pod OpenFlow entries if any error happened. + // Note, using a variable rather than constant for retryInterval because we may use a shorter time in the + // test code. + retryInterval = 5 * time.Second ) type podConfigurator struct { @@ -76,9 +87,17 @@ type podConfigurator struct { // isSecondaryNetwork is true if this instance of podConfigurator is used to configure // Pod secondary network interfaces. isSecondaryNetwork bool + podIfMonitor *podIfaceMonitor + + eventBroadcaster record.EventBroadcaster + record record.EventRecorder + podLister v1.PodLister + kubeClient clientset.Interface + queue workqueue.TypedDelayingInterface[string] } func newPodConfigurator( + kubeClient clientset.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -88,11 +107,23 @@ func newPodConfigurator( isOvsHardwareOffloadEnabled bool, disableTXChecksumOffload bool, podUpdateNotifier channel.Notifier, + podLister v1.PodLister, ) (*podConfigurator, error) { ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload) if err != nil { return nil, err } + queue := workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: "podMonitor", + }, + ) + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: "AntreaAgentPodConfigurator"}, + ) + ifMonitor := newPodInterfaceMonitor(ofClient, ifaceStore, queue) return &podConfigurator{ ovsBridgeClient: ovsBridgeClient, ofClient: ofClient, @@ -101,6 +132,12 @@ func newPodConfigurator( gatewayMAC: gatewayMAC, ifConfigurator: ifConfigurator, podUpdateNotifier: podUpdateNotifier, + podIfMonitor: ifMonitor, + eventBroadcaster: eventBroadcaster, + record: recorder, + kubeClient: kubeClient, + podLister: podLister, + queue: queue, }, nil } @@ -166,13 +203,13 @@ func getContainerIPsString(ips []net.IP) string { // not created for a Pod interface. func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig { if portData.ExternalIDs == nil { - klog.V(2).Infof("OVS port %s has no external_ids", portData.Name) + klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name) return nil } containerID, found := portData.ExternalIDs[ovsExternalIDContainerID] if !found { - klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID) + klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name) return nil } @@ -187,8 +224,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC]) if err != nil { - klog.Errorf("Failed to parse MAC address from OVS external config %s: %v", - portData.ExternalIDs[ovsExternalIDMAC], err) + klog.ErrorS(err, "Failed to parse MAC address from OVS external config") } podName, _ := portData.ExternalIDs[ovsExternalIDPodName] podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace] @@ -279,7 +315,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s func (pc *podConfigurator) removeInterfaces(containerID string) error { containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if !found { - klog.V(2).Infof("Did not find the port for container %s in local cache", containerID) + klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID) return nil } @@ -498,7 +534,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int. func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error { containerID := containerConfig.ContainerID - klog.V(2).Infof("Deleting Openflow entries for container %s", containerID) + klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID) if !pc.isSecondaryNetwork { if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil { return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err) @@ -513,6 +549,12 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil { return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err) } + + // Remove unready Pod info from local cache when Pod is deleted. This is called only on Windows. + if pc.podIfMonitor != nil { + pc.podIfMonitor.deleteUnreadyPod(containerConfig.InterfaceName) + } + // Remove container configuration from cache. pc.ifaceStore.DeleteInterface(containerConfig) if !pc.isSecondaryNetwork { @@ -558,7 +600,7 @@ func (pc *podConfigurator) connectInterceptedInterface( func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error { containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if !found { - klog.V(2).Infof("Did not find the port for container %s in local cache", containerID) + klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID) return nil } for _, ip := range containerConfig.IPs { @@ -570,3 +612,97 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, return pc.disconnectInterfaceFromOVS(containerConfig) // TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy?? } + +func (pc *podConfigurator) Run(stopCh <-chan struct{}) { + defer pc.queue.ShutDown() + pc.startEventBroadcaster(stopCh) + go k8swait.Until(pc.worker, time.Second, stopCh) + pc.podIfMonitor.Run(stopCh) + + <-stopCh +} + +func (pc *podConfigurator) processNextWorkItem() bool { + key, quit := pc.queue.Get() + if quit { + return false + } + defer pc.queue.Done(key) + + if err := pc.updateUnreadyPod(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + pc.queue.AddAfter(key, retryInterval) + } + return true +} + +// worker is a long-running function that will continually call the processNextWorkItem function in +// order to read and process a message on the workqueue. +func (pc *podConfigurator) worker() { + for pc.processNextWorkItem() { + } +} + +func (pc *podConfigurator) updateUnreadyPod(ovsPort string) error { + if !pc.podIfMonitor.unreadyInterfaceExists(ovsPort) { + klog.InfoS("Interface does not exist in un-ready state", "name", ovsPort) + return nil + } + + ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found in local cache, remove from unready cache", "name", ovsPort) + pc.podIfMonitor.deleteUnreadyPod(ovsPort) + return nil + } + + if ifConfig.OFPort == 0 { + // Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated. + // Returns error so that we can have a retry after 5s. + _ = pc.processPodEvents(ifConfig, false) + return fmt.Errorf("pod's OpenFlow port is not ready yet") + } + + if !pc.podIfMonitor.interfaceFlowsInstalled(ovsPort) { + // Install OpenFlow entries for the Pod. + klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ovsPort) + if err := pc.ofClient.InstallPodFlows(ovsPort, ifConfig.IPs, ifConfig.MAC, uint32(ifConfig.OFPort), ifConfig.VLANID, nil); err != nil { + // Add Pod not-ready event if the pod flows installation are failed. + // Returns error so that we can have a retry after 5s. + _ = pc.processPodEvents(ifConfig, false) + return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ovsPort, err) + } + + // Notify the Pod update event to required components. + event := agenttypes.PodUpdate{ + PodName: ifConfig.PodName, + PodNamespace: ifConfig.PodNamespace, + IsAdd: true, + ContainerID: ifConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) + } + + if err := pc.processPodEvents(ifConfig, true); err != nil { + return err + } + pc.podIfMonitor.deleteUnreadyPod(ovsPort) + return nil +} + +func (pc *podConfigurator) processPodEvents(ifConfig *interfacestore.InterfaceConfig, installed bool) error { + pod, err := pc.podLister.Pods(ifConfig.PodNamespace).Get(ifConfig.PodName) + if err != nil { + klog.ErrorS(err, "Failed to get Pod, retrying", "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName)) + return err + } + + if installed { + // Add normal event to record Pod network is ready. + pc.record.Eventf(pod, corev1.EventTypeNormal, "NetworkIsReady", "Installed Pod '%s/%s' network forwarding rules", ifConfig.PodNamespace, ifConfig.PodName) + return nil + } + + pc.record.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod '%s/%s' network forwarding rules not installed", ifConfig.PodNamespace, ifConfig.PodName) + return nil +} diff --git a/pkg/agent/cniserver/pod_configuration_linux.go b/pkg/agent/cniserver/pod_configuration_linux.go index fe281f06330..74c865240e9 100644 --- a/pkg/agent/cniserver/pod_configuration_linux.go +++ b/pkg/agent/cniserver/pod_configuration_linux.go @@ -21,6 +21,7 @@ import ( "fmt" current "github.com/containernetworking/cni/pkg/types/100" + typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -113,3 +114,14 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName) } } + +func (pc *podConfigurator) startEventBroadcaster(stopCh <-chan struct{}) { + go func() { + defer pc.eventBroadcaster.Shutdown() + pc.eventBroadcaster.StartStructuredLogging(0) + pc.eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{ + Interface: pc.kubeClient.CoreV1().Events(""), + }) + <-stopCh + }() +} diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 4f51bd4892b..3c7d6a347d2 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -687,7 +687,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + configurator, _ := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) configurator.ifConfigurator = testIfaceConfigurator return configurator } diff --git a/pkg/agent/cniserver/pod_configuration_test.go b/pkg/agent/cniserver/pod_configuration_test.go new file mode 100644 index 00000000000..c09fdc6833b --- /dev/null +++ b/pkg/agent/cniserver/pod_configuration_test.go @@ -0,0 +1,323 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "fmt" + "testing" + "time" + + "antrea.io/libOpenflow/openflow15" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + fakeclientset "k8s.io/client-go/kubernetes/fake" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "antrea.io/antrea/pkg/agent/interfacestore" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" +) + +type mockClients struct { + kubeClient *fakeclientset.Clientset + informerFactory informers.SharedInformerFactory + localPodInformer cache.SharedIndexInformer + podLister corelisters.PodLister + ofClient *openflowtest.MockClient + recorder *record.FakeRecorder +} + +func newMockClients(ctrl *gomock.Controller, nodeName string, objects ...runtime.Object) *mockClients { + kubeClient := fakeclientset.NewClientset() + for _, obj := range objects { + if obj != nil { + kubeClient.Tracker().Add(obj) + } + } + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + + localPodInformer := coreinformers.NewFilteredPodInformer( + kubeClient, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() + }, + ) + podLister := corelisters.NewPodLister(localPodInformer.GetIndexer()) + ofClient := openflowtest.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(100) + recorder.IncludeObject = false + + return &mockClients{ + kubeClient: kubeClient, + informerFactory: informerFactory, + localPodInformer: localPodInformer, + podLister: podLister, + ofClient: ofClient, + recorder: recorder, + } +} + +func (c *mockClients) startInformers(stopCh chan struct{}) { + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + go c.localPodInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, c.localPodInformer.HasSynced) +} + +func mockRetryInterval() func() { + oriRetryInterval := retryInterval + retryInterval = time.Millisecond * 500 + return func() { + retryInterval = oriRetryInterval + } +} + +func assertEqualEvents(t *testing.T, expected string, actual <-chan string) { + c := time.After(wait.ForeverTestTimeout) + select { + case a := <-actual: + require.Equal(t, expected, a) + case <-c: + t.Errorf("Expected event %q, got nothing", expected) + // continue iterating to print all expected events + } +} + +func newTestPodConfigurator(testClients *mockClients, waiter *asyncWaiter) *podConfigurator { + interfaceStore := interfacestore.NewInterfaceStore() + eventBroadcaster := record.NewBroadcaster() + queue := workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: "podConfigurator", + }, + ) + podCfg := &podConfigurator{ + kubeClient: testClients.kubeClient, + ofClient: testClients.ofClient, + podLister: testClients.podLister, + ifaceStore: interfaceStore, + eventBroadcaster: eventBroadcaster, + record: testClients.recorder, + queue: queue, + podIfMonitor: &podIfaceMonitor{ + ifaceStore: interfacestore.NewInterfaceStore(), + unreadyInterfaces: make(map[string]bool), + statusCh: make(chan *openflow15.PortStatus), + queue: queue, + }, + } + if waiter != nil { + podCfg.podUpdateNotifier = waiter.notifier + } + return podCfg +} + +func TestUpdateUnreadyPod(t *testing.T) { + defer mockRetryInterval()() + + for _, tc := range []struct { + name string + podIfaceUnReady bool + ofPortAssigned bool + podIfaceIsCached bool + installFlow bool + flowInstalled bool + installOpenFlowErr error + expErr string + expEvent string + expDeleted bool + }{ + { + name: "updated Port is not in unready state", + podIfaceUnReady: false, + podIfaceIsCached: true, + installFlow: false, + expDeleted: true, + }, { + name: "updated Port is not in interface store", + podIfaceUnReady: true, + podIfaceIsCached: false, + expDeleted: true, + installFlow: false, + }, { + name: "OpenFlow port is not assigned", + podIfaceUnReady: true, + podIfaceIsCached: true, + ofPortAssigned: false, + installFlow: false, + expDeleted: false, + expErr: "pod's OpenFlow port is not ready yet", + }, { + name: "failed to install OpenFlow entries for updated Port", + podIfaceUnReady: true, + podIfaceIsCached: true, + ofPortAssigned: true, + installFlow: true, + installOpenFlowErr: fmt.Errorf("failure to install flow"), + expDeleted: false, + expErr: "failed to add Openflow entries for OVS port test: failure to install flow", + expEvent: "Warning NetworkNotReady Pod 'test/A-1' network forwarding rules not installed", + }, { + name: "succeeded", + podIfaceUnReady: true, + podIfaceIsCached: true, + ofPortAssigned: true, + installFlow: true, + installOpenFlowErr: nil, + expEvent: "Normal NetworkIsReady Installed Pod 'test/A-1' network forwarding rules", + expDeleted: true, + }, { + name: "retry to add event after OpenFlow is installed", + podIfaceUnReady: true, + podIfaceIsCached: true, + ofPortAssigned: true, + flowInstalled: true, + installFlow: false, + installOpenFlowErr: nil, + expEvent: "Normal NetworkIsReady Installed Pod 'test/A-1' network forwarding rules", + expDeleted: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + waiter := newAsyncWaiter(testPodNameA, podInfraContainerID) + defer waiter.close() + + testClients := newMockClients(controller, nodeName, pod) + testClients.startInformers(stopCh) + fakeOFClient := testClients.ofClient + + configurator := newTestPodConfigurator(testClients, waiter) + + flowInstalled := false + + ifConfig := *podIfaceConfig + if tc.ofPortAssigned { + ifConfig.OVSPortConfig.OFPort = int32(1) + } + + if tc.podIfaceUnReady { + configurator.podIfMonitor.addUnreadyPodInterface(&ifConfig) + if tc.flowInstalled { + configurator.podIfMonitor.markInterfaceFlowsInstalled(podIfName) + } + } + + if tc.podIfaceIsCached { + configurator.ifaceStore.AddInterface(&ifConfig) + } + + if tc.installFlow { + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + if tc.installOpenFlowErr == nil { + flowInstalled = true + } + } + + err := configurator.updateUnreadyPod(podIfName) + if tc.expErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expErr) + } + + if flowInstalled { + waiter.wait() + } + + found := configurator.podIfMonitor.unreadyInterfaceExists(podIfName) + if tc.expDeleted { + require.False(t, found) + } else { + require.True(t, found) + } + + if tc.expEvent != "" { + assertEqualEvents(t, tc.expEvent, testClients.recorder.Events) + } + }) + } +} + +func TestProcessNextWorkItem(t *testing.T) { + defer mockRetryInterval()() + + for _, tc := range []struct { + name string + installOpenFlowErr error + expEvent string + expRequeue bool + }{ + { + name: "failed to install OpenFlow entries for updated Port", + installOpenFlowErr: fmt.Errorf("failure to install flow"), + expRequeue: true, + }, { + name: "succeeded", + installOpenFlowErr: nil, + expRequeue: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + waiter := newAsyncWaiter(testPodNameA, podInfraContainerID) + defer waiter.close() + + testClients := newMockClients(controller, nodeName, pod) + testClients.startInformers(stopCh) + fakeOFClient := testClients.ofClient + + configurator := newTestPodConfigurator(testClients, waiter) + defer configurator.queue.ShutDown() + + ifConfig := *podIfaceConfig + ifConfig.OVSPortConfig.OFPort = int32(portStatusMsg.Desc.PortNo) + configurator.ifaceStore.AddInterface(&ifConfig) + configurator.podIfMonitor.addUnreadyPodInterface(&ifConfig) + + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + configurator.queue.Add(podIfName) + + configurator.processNextWorkItem() + if tc.installOpenFlowErr != nil { + key, shutDown := configurator.queue.Get() + require.False(t, shutDown) + assert.Equal(t, key, podIfName) + } else { + waiter.wait() + exists := configurator.podIfMonitor.unreadyInterfaceExists(podIfName) + require.False(t, exists) + } + }) + } +} diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index 44734e4f20e..39ae7684dfe 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -18,15 +18,12 @@ package cniserver import ( - "fmt" - current "github.com/containernetworking/cni/pkg/types/100" + typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/interfacestore" - "antrea.io/antrea/pkg/agent/types" - "antrea.io/antrea/pkg/util/k8s" ) // connectInterfaceToOVSAsync waits for an interface to be created and connects it to OVS br-int asynchronously @@ -34,29 +31,15 @@ import ( // CNI call completes. func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) error { ovsPortName := ifConfig.InterfaceName + // Add the OVS port into unreadyPorts. This operation is performed before we update OVSDB, otherwise we + // need to think about the race condition between the current goroutine with the listener. + // Note, we may keep the OVS port into the "unReadyOVSPorts" map even if the update OVSDB operation fails, + // because it is also a case that the Pod's networking is not ready. + pc.podIfMonitor.addUnreadyPodInterface(ifConfig) return pc.ifConfigurator.addPostInterfaceCreateHook(ifConfig.ContainerID, ovsPortName, containerAccess, func() error { if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, "internal"); err != nil { return err } - ofPort, err := pc.ovsBridgeClient.GetOFPort(ovsPortName, true) - if err != nil { - return err - } - containerID := ifConfig.ContainerID - klog.V(2).Infof("Setting up Openflow entries for container %s", containerID) - if err := pc.ofClient.InstallPodFlows(ovsPortName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil { - return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err) - } - // Update interface config with the ofPort. - ifConfig.OVSPortConfig.OFPort = ofPort - // Notify the Pod update event to required components. - event := types.PodUpdate{ - PodName: ifConfig.PodName, - PodNamespace: ifConfig.PodNamespace, - IsAdd: true, - ContainerID: ifConfig.ContainerID, - } - pc.podUpdateNotifier.Notify(event) return nil }) } @@ -75,7 +58,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( // Because of this, we need to wait asynchronously for the interface to be created: we create the OVS port // and set the OVS Interface type "" first, and change the OVS Interface type to "internal" to connect to the // container interface after it is created. After OVS connects to the container interface, an OFPort is allocated. - klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID) + klog.V(2).InfoS("Adding OVS port for container", "port", ovsPortName, "container", containerID) ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig) portUUID, err := pc.createOVSPort(ovsPortName, ovsAttachInfo, containerConfig.VLANID) if err != nil { @@ -105,7 +88,7 @@ func (pc *podConfigurator) configureInterfaces( // See: https://github.com/kubernetes/kubernetes/issues/57253#issuecomment-358897721. interfaceConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if found { - klog.V(2).Infof("Found an existing OVS port for container %s, returning", containerID) + klog.V(2).InfoS("Found an existing OVS port for container, returning", "container", containerID) mac := interfaceConfig.MAC.String() hostIface := ¤t.Interface{ Name: interfaceConfig.InterfaceName, @@ -128,9 +111,19 @@ func (pc *podConfigurator) configureInterfaces( func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) { for i := range ifConfigs { ifaceConfig := ifConfigs[i] - pod := k8s.NamespacedName(ifaceConfig.PodNamespace, ifaceConfig.PodName) if err := pc.connectInterfaceToOVSAsync(ifaceConfig, containerAccess); err != nil { - klog.Errorf("Failed to reconcile Pod %s: %v", pod, err) + klog.ErrorS(err, "Failed to reconcile Pod", "Pod", klog.KRef(ifaceConfig.PodNamespace, ifaceConfig.PodNamespace)) } } } + +func (pc *podConfigurator) startEventBroadcaster(stopCh <-chan struct{}) { + go func() { + defer pc.eventBroadcaster.Shutdown() + pc.eventBroadcaster.StartStructuredLogging(0) + pc.eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{ + Interface: pc.kubeClient.CoreV1().Events(""), + }) + <-stopCh + }() +} diff --git a/pkg/agent/cniserver/pod_monitor.go b/pkg/agent/cniserver/pod_monitor.go new file mode 100644 index 00000000000..1fb74da3d41 --- /dev/null +++ b/pkg/agent/cniserver/pod_monitor.go @@ -0,0 +1,111 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "bytes" + "sync" + "time" + + "antrea.io/libOpenflow/openflow15" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/interfacestore" +) + +const ( + podNotReadyTimeInSeconds = 30 * time.Second +) + +type podIfaceMonitor struct { + ifaceStore interfacestore.InterfaceStore + + // unreadyInterfaces is a map to store the OVS ports which is waiting for the PortStatus from OpenFlow switch. + // Key is the OVS port, and value is the OVS port's flows are installed or not. + // It is used only on Windows now. + unreadyInterfaces map[string]bool + // unreadyInterfaceMutex is a mutex to guard unreadyInterfaces. + unreadyInterfaceMutex sync.Mutex + statusCh chan *openflow15.PortStatus + + queue workqueue.TypedDelayingInterface[string] +} + +func (m *podIfaceMonitor) processPortStatusMessage(status *openflow15.PortStatus) { + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + if status.Desc.State != openflow15.PS_LIVE { + return + } + ovsPort := string(bytes.Trim(status.Desc.Name, "\x00")) + ofPort := status.Desc.PortNo + + exist := m.unreadyInterfaceExists(ovsPort) + if !exist { + return + } + + ifConfig, found := m.ifaceStore.GetInterfaceByName(ovsPort) + if found { + // Update interface config with the ofPort. + ifConfig.OVSPortConfig.OFPort = int32(ofPort) + m.ifaceStore.UpdateInterface(ifConfig) + } + // Add the ovsPort into queue even if there is no related InterfaceConfig in the store, then + // the worker can delete the unreadyPod if exists. + m.queue.Add(ovsPort) +} + +func (m *podIfaceMonitor) addUnreadyPodInterface(ifConfig *interfacestore.InterfaceConfig) { + klog.InfoS("Added OVS port into unready interfaces", "ovsPort", ifConfig.InterfaceName, + "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName)) + m.unreadyInterfaceMutex.Lock() + defer m.unreadyInterfaceMutex.Unlock() + m.unreadyInterfaces[ifConfig.InterfaceName] = false + m.queue.AddAfter(ifConfig.InterfaceName, podNotReadyTimeInSeconds) +} + +func (m *podIfaceMonitor) deleteUnreadyPod(port string) { + m.unreadyInterfaceMutex.Lock() + defer m.unreadyInterfaceMutex.Unlock() + delete(m.unreadyInterfaces, port) +} + +func (m *podIfaceMonitor) unreadyInterfaceExists(port string) bool { + m.unreadyInterfaceMutex.Lock() + defer m.unreadyInterfaceMutex.Unlock() + _, found := m.unreadyInterfaces[port] + return found +} + +func (m *podIfaceMonitor) markInterfaceFlowsInstalled(port string) { + m.unreadyInterfaceMutex.Lock() + defer m.unreadyInterfaceMutex.Unlock() + _, found := m.unreadyInterfaces[port] + if !found { + return + } + m.unreadyInterfaces[port] = true +} + +func (m *podIfaceMonitor) interfaceFlowsInstalled(port string) bool { + m.unreadyInterfaceMutex.Lock() + defer m.unreadyInterfaceMutex.Unlock() + installed, found := m.unreadyInterfaces[port] + if !found { + return false + } + return installed +} diff --git a/pkg/agent/cniserver/pod_monitor_linux.go b/pkg/agent/cniserver/pod_monitor_linux.go new file mode 100644 index 00000000000..70526b8a45e --- /dev/null +++ b/pkg/agent/cniserver/pod_monitor_linux.go @@ -0,0 +1,33 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "k8s.io/client-go/util/workqueue" + + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" +) + +func newPodInterfaceMonitor(_ openflow.Client, + _ interfacestore.InterfaceStore, + _ workqueue.TypedDelayingInterface[string], +) *podIfaceMonitor { + return &podIfaceMonitor{} +} + +func (m *podIfaceMonitor) Run(stopCh <-chan struct{}) { + return +} diff --git a/pkg/agent/cniserver/pod_monitor_test.go b/pkg/agent/cniserver/pod_monitor_test.go new file mode 100644 index 00000000000..0767b83ee06 --- /dev/null +++ b/pkg/agent/cniserver/pod_monitor_test.go @@ -0,0 +1,301 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "fmt" + "net" + "sync" + "testing" + + "antrea.io/libOpenflow/openflow15" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/util/channel" +) + +var ( + podIfName = "test" + podIPs = []net.IP{net.ParseIP("192.168.9.10")} + podMac, _ = net.ParseMAC("00:15:5D:B2:6F:38") + podInfraContainerID = "261a1970-5b6c-11ed-8caf-000c294e5d03" + podIfaceConfig = &interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + }, + IPs: podIPs, + MAC: podMac, + } + + pod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPodNameA, + Namespace: testPodNamespace, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + } + + portStatusMsg = &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LIVE, + }, + } +) + +type asyncWaiter struct { + podName string + containerID string + waitCh chan struct{} + stopCh chan struct{} + notifier *channel.SubscribableChannel +} + +func (w *asyncWaiter) notify(e interface{}) { + podUpdate := e.(types.PodUpdate) + if podUpdate.PodName == w.podName && podUpdate.ContainerID == w.containerID { + w.waitCh <- struct{}{} + } +} + +func (w *asyncWaiter) wait() { + <-w.waitCh +} + +func (w *asyncWaiter) close() { + close(w.waitCh) + close(w.stopCh) +} + +func newAsyncWaiter(podName, containerID string) *asyncWaiter { + waiter := &asyncWaiter{ + podName: podName, + containerID: containerID, + waitCh: make(chan struct{}), + stopCh: make(chan struct{}), + notifier: channel.NewSubscribableChannel("PodUpdate", 100), + } + waiter.notifier.Subscribe(waiter.notify) + go waiter.notifier.Run(waiter.stopCh) + return waiter +} + +func TestMultiAccessOnUnreadyInterface(t *testing.T) { + p1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p1", + Namespace: testPodNamespace, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + } + p2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p2", + Namespace: testPodNamespace, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + } + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + clients := newMockClients(controller, nodeName, p1, p2) + clients.startInformers(stopCh) + monitor := &podIfaceMonitor{ + ifaceStore: interfacestore.NewInterfaceStore(), + unreadyInterfaces: make(map[string]bool), + statusCh: make(chan *openflow15.PortStatus), + queue: workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: "podConfigurator", + }, + ), + } + monitor.addUnreadyPodInterface(&interfacestore.InterfaceConfig{ + InterfaceName: "port1", + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: p1.Name, + PodNamespace: p1.Namespace, + }, + }) + + wg := sync.WaitGroup{} + wg.Add(3) + go func() { + defer wg.Done() + monitor.addUnreadyPodInterface(&interfacestore.InterfaceConfig{ + InterfaceName: "port2", + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: p2.Name, + PodNamespace: p2.Namespace, + }, + }) + }() + + go func() { + defer wg.Done() + monitor.deleteUnreadyPod("port1") + }() + + go func() { + defer wg.Done() + monitor.processPortStatusMessage(portStatusMsg) + }() + + wg.Wait() +} + +func TestProcessPortStatusMessage(t *testing.T) { + for _, tc := range []struct { + name string + status *openflow15.PortStatus + ovsPortName string + ifaceUnready bool + ifaceInStore bool + expEnqueue bool + }{ + { + name: "Add OF port if port status is live", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceUnready: true, + ifaceInStore: true, + expEnqueue: true, + }, { + name: "Add OF port with suffix in name", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceUnready: true, + ifaceInStore: true, + expEnqueue: true, + }, { + name: "Ignore OF port if port is not live", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LINK_DOWN, + }, + }, + ovsPortName: podIfName, + ifaceUnready: true, + ifaceInStore: false, + expEnqueue: false, + }, { + name: "Ignore OF port if port is ready", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceUnready: false, + ifaceInStore: false, + expEnqueue: false, + }, { + name: "Enqueue OF port even if not existing in interface store", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceUnready: true, + ifaceInStore: false, + expEnqueue: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + queue := workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: "podMonitor", + }) + monitor := &podIfaceMonitor{ + ifaceStore: interfacestore.NewInterfaceStore(), + unreadyInterfaces: make(map[string]bool), + statusCh: make(chan *openflow15.PortStatus), + queue: queue, + } + defer monitor.queue.ShutDown() + + if tc.ifaceUnready { + monitor.addUnreadyPodInterface(podIfaceConfig) + } + if tc.ifaceInStore { + monitor.ifaceStore.AddInterface(podIfaceConfig) + } + + monitor.processPortStatusMessage(tc.status) + if tc.expEnqueue { + key, shutdown := queue.Get() + assert.Equal(t, tc.ovsPortName, key) + assert.False(t, shutdown) + } + + if tc.ifaceInStore { + ifaceCfg, ok := monitor.ifaceStore.GetInterfaceByName(podIfaceConfig.InterfaceName) + require.True(t, ok) + if tc.ifaceUnready { + assert.Equal(t, ifaceCfg.OFPort, int32(tc.status.Desc.PortNo)) + } else { + assert.Equal(t, int32(0), int32(tc.status.Desc.PortNo)) + } + } + }) + } +} diff --git a/pkg/agent/cniserver/pod_monitor_windows.go b/pkg/agent/cniserver/pod_monitor_windows.go new file mode 100644 index 00000000000..a39d5fbda3c --- /dev/null +++ b/pkg/agent/cniserver/pod_monitor_windows.go @@ -0,0 +1,56 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "time" + + "antrea.io/libOpenflow/openflow15" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" +) + +func newPodInterfaceMonitor(ofClient openflow.Client, + ifaceStore interfacestore.InterfaceStore, + queue workqueue.TypedDelayingInterface[string], +) *podIfaceMonitor { + statusCh := make(chan *openflow15.PortStatus) + ofClient.SubscribeOFPortStatusMessage(statusCh) + return &podIfaceMonitor{ + ifaceStore: ifaceStore, + unreadyInterfaces: make(map[string]bool), + statusCh: statusCh, + queue: queue, + } +} + +func (m *podIfaceMonitor) Run(stopCh <-chan struct{}) { + klog.Info("Starting the monitor to wait for new OpenFlow ports") + + go wait.Until(func() { + for { + select { + case status := <-m.statusCh: + klog.V(2).InfoS("Received PortStatus message", "message", status) + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + m.processPortStatusMessage(status) + } + } + }, time.Second, stopCh) +} diff --git a/pkg/agent/cniserver/secondary.go b/pkg/agent/cniserver/secondary.go index a4176baba15..d20487f3f28 100644 --- a/pkg/agent/cniserver/secondary.go +++ b/pkg/agent/cniserver/secondary.go @@ -26,7 +26,7 @@ import ( ) func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient, interfaceStore interfacestore.InterfaceStore) (*podConfigurator, error) { - pc, err := newPodConfigurator(ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil) + pc, err := newPodConfigurator(nil, ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil, nil) if err == nil { pc.isSecondaryNetwork = true } diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index bc77dcc7224..99d4176a06c 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -31,6 +31,8 @@ import ( "google.golang.org/grpc" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -111,6 +113,9 @@ type CNIServer struct { serverVersion string nodeConfig *config.NodeConfig hostProcPathPrefix string + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + podListerSynced cache.InformerSynced kubeClient clientset.Interface containerAccess *containerAccessArbitrator podConfigurator *podConfigurator @@ -628,6 +633,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) ( func New( cniSocket, hostProcPathPrefix string, nodeConfig *config.NodeConfig, + podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, @@ -639,6 +645,9 @@ func New( serverVersion: cni.AntreaCNIVersion, nodeConfig: nodeConfig, hostProcPathPrefix: hostProcPathPrefix, + podInformer: podInformer, + podLister: corelisters.NewPodLister(podInformer.GetIndexer()), + podListerSynced: podInformer.HasSynced, kubeClient: kubeClient, containerAccess: newContainerAccessArbitrator(), routeClient: routeClient, @@ -660,9 +669,9 @@ func (s *CNIServer) Initialize( ) error { var err error s.podConfigurator, err = newPodConfigurator( - ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, + s.kubeClient, ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), - s.disableTXChecksumOffload, podUpdateNotifier) + s.disableTXChecksumOffload, podUpdateNotifier, s.podLister) if err != nil { return fmt.Errorf("error during initialize podConfigurator: %v", err) } @@ -676,6 +685,12 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) { klog.InfoS("Starting CNI server") defer klog.InfoS("Shutting down CNI server") + if !cache.WaitForNamedCacheSync("AntreaCNIServer", stopCh, s.podListerSynced) { + return + } + + go s.podConfigurator.Run(stopCh) + listener, err := util.ListenLocalSocket(s.cniSocket) if err != nil { klog.Fatalf("Failed to bind on %s: %v", s.cniSocket, err) diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 856024dd32c..f4777cb67e7 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -95,7 +95,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Ifname = ifname cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "" - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -106,7 +106,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "0000:03:00.6" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -119,7 +119,7 @@ func TestRemoveInterface(t *testing.T) { ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + podConfigurator, err := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) require.Nil(t, err, "No error expected in podConfigurator constructor") containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") @@ -201,7 +201,7 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ip gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) cniServer.enableSecondaryNetworkIPAM = enableSecondaryNetworkIPAM cniServer.isChaining = isChaining cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} @@ -612,18 +612,18 @@ func TestCmdCheck(t *testing.T) { func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + kubeClient := fakeclientset.NewClientset(pod1, pod2, pod3) mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, } - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 70487b083e8..3d89c001579 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -22,13 +22,15 @@ import ( "testing" "time" + "antrea.io/libOpenflow/openflow15" "github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim/hcn" cnitypes "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" - "k8s.io/apimachinery/pkg/util/wait" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fakeclientset "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -37,12 +39,11 @@ import ( "antrea.io/antrea/pkg/agent/cniserver/types" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" - openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" - agenttypes "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" winnettest "antrea.io/antrea/pkg/agent/util/winnet/testing" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" + "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" ) @@ -173,17 +174,15 @@ type hnsTestUtil struct { hnsEndpoint *hcsshim.HNSEndpoint hcnEndpoint *hcn.HostComputeEndpoint isDocker bool - isAttached bool hnsEndpointCreatErr error endpointAttachErr error } -func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker, isAttached bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { +func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { return &hnsTestUtil{ endpointID: endpointID, existingHnsEndpoints: existingHnsEndpoints, isDocker: isDocker, - isAttached: isAttached, hnsEndpointCreatErr: hnsEndpointCreatErr, endpointAttachErr: endpointAttachErr, } @@ -217,9 +216,6 @@ func (t *hnsTestUtil) deleteHnsEndpoint(endpoint *hcsshim.HNSEndpoint) (*hcsshim func (t *hnsTestUtil) attachEndpointInNamespace(ep *hcn.HostComputeEndpoint, namespace string) error { t.hcnEndpoint.HostComputeNamespace = namespace - if t.endpointAttachErr == nil { - t.addHostInterface() - } return t.endpointAttachErr } @@ -257,9 +253,10 @@ func (t *hnsTestUtil) addHostInterface() { }() } -func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { +func newMockCNIServer(t *testing.T, controller *gomock.Controller, clients *mockClients, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { + kubeClient := fakeclientset.NewClientset() mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) - mockOFClient = openflowtest.NewMockClient(controller) + mockOFClient = clients.ofClient mockRoute = routetest.NewMockInterface(controller) mockWinnet = winnettest.NewMockInterface(controller) ifaceStore = interfacestore.NewInterfaceStore() @@ -269,7 +266,8 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNoti gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier) + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier, clients.podLister) cniServer.podConfigurator.ifConfigurator.(*ifConfigurator).winnet = mockWinnet return cniServer } @@ -315,7 +313,6 @@ func TestCmdAdd(t *testing.T) { hnsEndpointCreateErr error endpointAttachErr error ifaceExist bool - isAttached bool existingHnsEndpoints []hcsshim.HNSEndpoint endpointExists bool connectOVS bool @@ -341,7 +338,6 @@ func TestCmdAdd(t *testing.T) { oriIPAMResult: oriIPAMResult, connectOVS: true, containerIfaceExist: true, - isAttached: true, }, { name: "containerd-attach-failure", podName: "pod10", @@ -364,13 +360,22 @@ func TestCmdAdd(t *testing.T) { ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) + stopCh := make(chan struct{}) + defer close(stopCh) isDocker := isDockerContainer(tc.netns) - testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) + testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.hnsEndpointCreateErr, tc.endpointAttachErr) testUtil.setFunctions() defer testUtil.restore() waiter := newAsyncWaiter(tc.podName, tc.infraContainerID) - server := newMockCNIServer(t, controller, waiter.notifier) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + + server := newMockCNIServer(t, controller, clients, waiter.notifier) + server.podConfigurator.kubeClient.CoreV1().Pods(testPodNamespace).Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: tc.podName, Namespace: testPodNamespace}, Spec: corev1.PodSpec{NodeName: nodeName}}, metav1.CreateOptions{}) + go server.podConfigurator.Run(stopCh) + requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.infraContainerID, tc.netns, nil) if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(), ovsPortName)) @@ -389,10 +394,31 @@ func TestCmdAdd(t *testing.T) { } ovsPortID := generateUUID() if tc.connectOVS { + ofPortNumber := uint32(100) + portStatusCh := server.podConfigurator.podIfMonitor.statusCh mockOVSBridgeClient.EXPECT().CreatePort(ovsPortName, ovsPortName, gomock.Any()).Return(ovsPortID, nil).Times(1) - mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(ovsPortName, true).Return(int32(100), nil).Times(1) - mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + go func() { + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + select { + case <-time.After(time.Millisecond * 50): + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: ofPortNumber, + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + } + }() + return nil + }, + ) + mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), uint32(ofPortNumber), gomock.Any(), gomock.Any()).Return(nil) mockRoute.EXPECT().AddLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) } resp, err := server.CmdAdd(ctx, requestMsg) @@ -421,17 +447,8 @@ func TestCmdAdd(t *testing.T) { _, exists := ifaceStore.GetContainerInterface(containerID) assert.Equal(t, exists, tc.containerIfaceExist) if tc.connectOVS { + testUtil.addHostInterface() waiter.wait() - // Wait for the completion of async function "setInterfaceMTUFunc", otherwise it may lead to the - // race condition failure. - wait.PollUntilContextTimeout(context.Background(), time.Millisecond*10, time.Second, true, - func(ctx context.Context) (done bool, err error) { - mtuSet, exist := hostIfaces.Load(ovsPortName) - if !exist { - return false, nil - } - return mtuSet.(bool), nil - }) } waiter.close() }) @@ -480,6 +497,8 @@ func TestCmdDel(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) @@ -491,11 +510,13 @@ func TestCmdDel(t *testing.T) { if tc.endpointExists { existingHnsEndpoints = append(existingHnsEndpoints, *hnsEndpoint) } - testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, true, nil, nil) + testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, nil, nil) testUtil.setFunctions() defer testUtil.restore() waiter := newAsyncWaiter(testPodNameA, containerID) - server := newMockCNIServer(t, controller, waiter.notifier) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + server := newMockCNIServer(t, controller, clients, waiter.notifier) ovsPortID := generateUUID() if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(hnsEndpoint) @@ -665,12 +686,16 @@ func TestCmdCheck(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) defer mockGetNetInterfaceByName(tc.netInterface)() - cniserver := newMockCNIServer(t, controller, channel.NewSubscribableChannel("podUpdate", 100)) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + cniserver := newMockCNIServer(t, controller, clients, channel.NewSubscribableChannel("podUpdate", 100)) requestMsg, _ := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.containerID, tc.netns, tc.prevResult) ipamMock.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(1) ifaceStore.AddInterface(tc.existingIface) @@ -685,68 +710,38 @@ func TestCmdCheck(t *testing.T) { } } -type asyncWaiter struct { - podName string - containerID string - waitCh chan struct{} - stopCh chan struct{} - notifier *channel.SubscribableChannel -} - -func (w *asyncWaiter) notify(e interface{}) { - podUpdate := e.(agenttypes.PodUpdate) - if podUpdate.PodName == w.podName && podUpdate.ContainerID == w.containerID { - w.waitCh <- struct{}{} - } -} - -func (w *asyncWaiter) wait() { - <-w.waitCh -} - -func (w *asyncWaiter) close() { - close(w.waitCh) - close(w.stopCh) -} - -func newAsyncWaiter(podName, containerID string) *asyncWaiter { - waiter := &asyncWaiter{ - podName: podName, - containerID: containerID, - waitCh: make(chan struct{}), - stopCh: make(chan struct{}), - notifier: channel.NewSubscribableChannel("PodUpdate", 100), - } - waiter.notifier.Subscribe(waiter.notify) - go waiter.notifier.Run(waiter.stopCh) - return waiter -} - func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + clients := newMockClients(controller, nodeName, pod1, pod2, pod3) + clients.startInformers(stopCh) + kubeClient := clients.kubeClient mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) - mockOFClient = openflowtest.NewMockClient(controller) + mockOFClient = clients.ofClient ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() missingEndpoint := getHnsEndpoint(generateUUID(), "iface4") - testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, true, nil, nil) + testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, nil, nil) testUtil.createHnsEndpoint(missingEndpoint) testUtil.setFunctions() defer testUtil.restore() + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID) - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier) + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier, clients.podLister) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} + go cniServer.podConfigurator.Run(stopCh) // Re-install Pod1 flows podFlowsInstalled := make(chan string, 2) @@ -760,8 +755,26 @@ func TestReconcile(t *testing.T) { mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) // Re-connect to Pod4 hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", unconnectedInterface.InterfaceName), true) - mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(unconnectedInterface.InterfaceName, true).Return(int32(5), nil).Times(1) + mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + select { + case <-time.After(time.Millisecond * 50): + portStatusCh := cniServer.podConfigurator.podIfMonitor.statusCh + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: uint32(5), + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + } + return nil + }, + ) mockOFClient.EXPECT().InstallPodFlows(unconnectedInterface.InterfaceName, unconnectedInterface.IPs, unconnectedInterface.MAC, uint32(5), uint16(0), nil). Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { podFlowsInstalled <- interfaceName diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index bba8cf2068a..5971ee4f008 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -105,6 +105,11 @@ func (c *interfaceCache) AddInterface(interfaceConfig *InterfaceConfig) { } } +// UpdateInterface updates interfaceConfig into local cache. +func (c *interfaceCache) UpdateInterface(interfaceConfig *InterfaceConfig) { + c.cache.Update(interfaceConfig) +} + // DeleteInterface deletes interface from local cache. func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) { c.cache.Delete(interfaceConfig) diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index cf6ad7f2c2d..64347d6709b 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -278,3 +278,15 @@ func (mr *MockInterfaceStoreMockRecorder) ListInterfaces() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListInterfaces", reflect.TypeOf((*MockInterfaceStore)(nil).ListInterfaces)) } + +// UpdateInterface mocks base method. +func (m *MockInterfaceStore) UpdateInterface(interfaceConfig *interfacestore.InterfaceConfig) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateInterface", interfaceConfig) +} + +// UpdateInterface indicates an expected call of UpdateInterface. +func (mr *MockInterfaceStoreMockRecorder) UpdateInterface(interfaceConfig any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInterface", reflect.TypeOf((*MockInterfaceStore)(nil).UpdateInterface), interfaceConfig) +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index 9d0ca2afe57..7f0b26c9fc7 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -113,6 +113,7 @@ type InterfaceConfig struct { type InterfaceStore interface { Initialize(interfaces []*InterfaceConfig) AddInterface(interfaceConfig *InterfaceConfig) + UpdateInterface(interfaceConfig *InterfaceConfig) ListInterfaces() []*InterfaceConfig DeleteInterface(interfaceConfig *InterfaceConfig) GetInterface(interfaceKey string) (*InterfaceConfig, bool) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index e6e77d0cebd..4cd1134a457 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -408,6 +408,9 @@ type Client interface { // or ip, port, protocol and direction. It is used to bypass NetworkPolicy enforcement on a VM for the particular // traffic. InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error + + // SubscribeOFPortStatusMessage registers a channel to listen the OpenFlow PortStatus message. + SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) } // GetFlowTableStatus returns an array of flow table status. @@ -1697,3 +1700,7 @@ func (c *client) getMeterStats() { klog.ErrorS(err, "Failed to get OVS meter stats") } } + +func (c *client) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + c.bridge.SubscribePortStatusConsumer(statusCh) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 658193b1220..ab5bd20dcca 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2906,3 +2906,14 @@ func TestCachedFlowIsDrop(t *testing.T) { msg = flows[0].GetMessage().(*openflow15.FlowMod) assert.False(t, isDropFlow(msg)) } + +func TestSubscribeOFPortStatusMessage(t *testing.T) { + ctrl := gomock.NewController(t) + ch := make(chan *openflow15.PortStatus) + bridge := ovsoftest.NewMockBridge(ctrl) + c := client{ + bridge: bridge, + } + bridge.EXPECT().SubscribePortStatusConsumer(ch).Times(1) + c.SubscribeOFPortStatusMessage(ch) +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index bd3ee7b835a..eafaa90f344 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -36,6 +36,7 @@ import ( openflow0 "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" proxy "antrea.io/antrea/third_party/proxy" + openflow15 "antrea.io/libOpenflow/openflow15" protocol "antrea.io/libOpenflow/protocol" util "antrea.io/libOpenflow/util" ofctrl "antrea.io/ofnet/ofctrl" @@ -849,6 +850,18 @@ func (mr *MockClientMockRecorder) StartPacketInHandler(stopCh any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartPacketInHandler", reflect.TypeOf((*MockClient)(nil).StartPacketInHandler), stopCh) } +// SubscribeOFPortStatusMessage mocks base method. +func (m *MockClient) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribeOFPortStatusMessage", statusCh) +} + +// SubscribeOFPortStatusMessage indicates an expected call of SubscribeOFPortStatusMessage. +func (mr *MockClientMockRecorder) SubscribeOFPortStatusMessage(statusCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeOFPortStatusMessage", reflect.TypeOf((*MockClient)(nil).SubscribeOFPortStatusMessage), statusCh) +} + // SubscribePacketIn mocks base method. func (m *MockClient) SubscribePacketIn(reason uint8, pktInQueue *openflow0.PacketInQueue) error { m.ctrl.T.Helper() diff --git a/pkg/agent/types/annotations.go b/pkg/agent/types/annotations.go index cc74150f280..cc685682229 100644 --- a/pkg/agent/types/annotations.go +++ b/pkg/agent/types/annotations.go @@ -41,4 +41,8 @@ const ( // L7FlowExporterAnnotationKey is the key of the L7 network flow export annotation that enables L7 network flow export for annotated Pod or Namespace based on the value of annotation which is direction of traffic. L7FlowExporterAnnotationKey string = "visibility.antrea.io/l7-export" + + // PodNotReadyAnnotationKey represents the key of the Pod annotation that specifies the Pod's networking is not ready. + // This annotation is used only on Windows. + PodNotReadyAnnotationKey string = "pod.antrea.io/not-ready" ) diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 1c5c60fd22d..edf8ced802b 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -146,6 +146,9 @@ type Bridge interface { ResumePacket(packetIn *ofctrl.PacketIn) error // BuildPacketOut returns a new PacketOutBuilder. BuildPacketOut() PacketOutBuilder + // SubscribePortStatusConsumer registers a consumer to listen to OpenFlow PortStatus message. + // We only support a single consumer for now. + SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) } // TableStatus represents the status of a specific flow table. The status is useful for debugging. diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index c0f6ede4daa..e2f60e098ca 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -202,6 +202,9 @@ type OFBridge struct { // pktConsumers is a map from PacketIn category to the channel that is used to publish the PacketIn message. pktConsumers sync.Map + // portStatusConsumerCh is a channel to notify agent a PortStatus message is received + portStatusConsumerCh chan *openflow15.PortStatus + mpReplyChsMutex sync.RWMutex mpReplyChs map[uint32]chan *openflow15.MultipartReply // tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metedata, @@ -718,6 +721,27 @@ func (b *OFBridge) RetryInterval() time.Duration { return b.retryInterval } +func (b *OFBridge) PortStatusRcvd(status *openflow15.PortStatus) { + if b.portStatusConsumerCh == nil { + return + } + // Correspond to MessageStream.outbound log level. + if klog.V(7).Enabled() { + klog.InfoS("Received PortStatus", "portStatus", status) + } else { + klog.V(4).InfoS("Received PortStatus") + } + switch status.Reason { + // We only process add/modified status for now. + case openflow15.PR_ADD, openflow15.PR_MODIFY: + b.portStatusConsumerCh <- status + } +} + +func (b *OFBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + b.portStatusConsumerCh = statusCh +} + func (b *OFBridge) setPacketInFormatTo2() { b.ofSwitch.SetPacketInFormat(openflow15.OFPUTIL_PACKET_IN_NXT2) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index af5033b0b9a..6cc0ab612da 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -342,6 +342,18 @@ func (mr *MockBridgeMockRecorder) SubscribePacketIn(category, pktInQueue any) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePacketIn", reflect.TypeOf((*MockBridge)(nil).SubscribePacketIn), category, pktInQueue) } +// SubscribePortStatusConsumer mocks base method. +func (m *MockBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribePortStatusConsumer", statusCh) +} + +// SubscribePortStatusConsumer indicates an expected call of SubscribePortStatusConsumer. +func (mr *MockBridgeMockRecorder) SubscribePortStatusConsumer(statusCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePortStatusConsumer", reflect.TypeOf((*MockBridge)(nil).SubscribePortStatusConsumer), statusCh) +} + // MockTable is a mock of Table interface. type MockTable struct { ctrl *gomock.Controller diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 3ce87ad6b10..ffe20e11c4d 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -44,7 +44,10 @@ import ( mock "go.uber.org/mock/gomock" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + coreinformers "k8s.io/client-go/informers/core/v1" k8sFake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/metrics/legacyregistry" "antrea.io/antrea/pkg/agent/cniserver" @@ -572,10 +575,21 @@ func newTester() *cmdAddDelTester { ifaceStore := interfacestore.NewInterfaceStore() tester.podNetworkWait = wait.NewGroup() tester.flowRestoreCompleteWait = wait.NewGroup() + kubeClient := k8sFake.NewSimpleClientset() + localPodInformer := coreinformers.NewFilteredPodInformer( + kubeClient, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", "fakeNode").String() + }, + ) tester.server = cniserver.New(testSock, "", getTestNodeConfig(false), - k8sFake.NewSimpleClientset(), + localPodInformer, + kubeClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, tester.podNetworkWait.Increment(), @@ -740,10 +754,21 @@ func setupChainTest( routeMock = routetest.NewMockInterface(controller) podNetworkWait := wait.NewGroup() flowRestoreCompleteWait := wait.NewGroup() + kubeClient := k8sFake.NewSimpleClientset() + localPodInformer := coreinformers.NewFilteredPodInformer( + kubeClient, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", "fakeNode").String() + }, + ) server = cniserver.New(testSock, "", testNodeConfig, - k8sFake.NewSimpleClientset(), + localPodInformer, + kubeClient, routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, podNetworkWait, flowRestoreCompleteWait) @@ -929,10 +954,20 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { podNetworkWait := wait.NewGroup() flowRestoreCompleteWait := wait.NewGroup() k8sClient := k8sFake.NewSimpleClientset(pod) + localPodInformer := coreinformers.NewFilteredPodInformer( + k8sClient, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", "fakeNode").String() + }, + ) server := cniserver.New( testSock, "", testNodeConfig, + localPodInformer, k8sClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450},