Skip to content

Commit

Permalink
[Windows] CNI Server installs OpenFlow entries after PortStatus messa…
Browse files Browse the repository at this point in the history
…ge is received

This change has introduced a worker in `podConfigurator` to listen for the
OpenFlow PortStatus message when a new OpenFlow port is allocated in OVS. After
receiving the message, antrea-agent Windows will install Pod related OpenFlow
entries. If the OpenFlow port is not allocated within 30s after the CmdAdd
request is responded, an event with type "NetworkNotReady" is added on the Pod;
Whenever the Pod networking forwarding rules are installed, an event with type
"NetworkIsReady" is added.

Signed-off-by: Wenying Dong <wenyingd@vmware.com>
  • Loading branch information
wenyingd committed Nov 6, 2024
1 parent 92454fb commit 136785b
Show file tree
Hide file tree
Showing 25 changed files with 1,137 additions and 146 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func run(o *Options) error {
o.config.CNISocket,
o.config.HostProcPathPrefix,
nodeConfig,
localPodInformer.Get(),
k8sClient,
routeClient,
isChaining,
Expand Down
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module antrea.io/antrea
go 1.23.0

require (
antrea.io/libOpenflow v0.14.0
antrea.io/ofnet v0.12.0
antrea.io/libOpenflow v0.15.0
antrea.io/ofnet v0.14.0
github.com/ClickHouse/clickhouse-go/v2 v2.6.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Mellanox/sriovnet v1.1.0
Expand Down Expand Up @@ -113,10 +113,9 @@ require (
github.com/aws/smithy-go v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cenkalti/hub v1.0.2 // indirect
github.com/cenkalti/rpc2 v1.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand Down
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE=
antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0=
antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao=
antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg=
antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI=
antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ=
antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc=
antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -114,14 +114,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA=
github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg=
github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8=
github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU=
github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k=
github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
3 changes: 2 additions & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ function generate_antrea_client_code {
"${ANTREA_PKG}/pkg/apis/crd/v1alpha2" \
"${ANTREA_PKG}/pkg/apis/crd/v1beta1" \
"${ANTREA_PKG}/pkg/apis/stats" \
"${ANTREA_PKG}/pkg/apis/stats/v1alpha1"
"${ANTREA_PKG}/pkg/apis/stats/v1alpha1" \
"${ANTREA_PKG}/pkg/agent/interfacestore"

$GOPATH/bin/conversion-gen \
--output-file zz_generated.conversion.go \
Expand Down
155 changes: 146 additions & 9 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@
package cniserver

import (
"bytes"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"time"

"antrea.io/libOpenflow/openflow15"
current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/cni/pkg/version"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -61,6 +69,10 @@ const (

var (
getNSPath = util.GetNSPath
// retryInterval is the interval to re-install Pod OpenFlow entries if any error happened.
// Note, using a variable rather than constant for retryInterval because we may use a shorter time in the
// test code.
retryInterval = 5 * time.Second
)

type podConfigurator struct {
Expand All @@ -76,9 +88,19 @@ type podConfigurator struct {
// isSecondaryNetwork is true if this instance of podConfigurator is used to configure
// Pod secondary network interfaces.
isSecondaryNetwork bool

containerAccess *containerAccessArbitrator
eventBroadcaster record.EventBroadcaster
recorder record.EventRecorder
podListerSynced cache.InformerSynced
podLister v1.PodLister
kubeClient clientset.Interface
unreadyPortQueue workqueue.TypedDelayingInterface[string]
statusCh chan *openflow15.PortStatus
}

func newPodConfigurator(
kubeClient clientset.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
routeClient route.Interface,
Expand All @@ -88,20 +110,27 @@ func newPodConfigurator(
isOvsHardwareOffloadEnabled bool,
disableTXChecksumOffload bool,
podUpdateNotifier channel.Notifier,
podInformer cache.SharedIndexInformer,
containerAccess *containerAccessArbitrator,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload)
if err != nil {
return nil, err
}
return &podConfigurator{
pc := &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
}, nil
kubeClient: kubeClient,
containerAccess: containerAccess,
}
// Initiate the PortStatus message listener. This function is a no-op except on Windows.
pc.initPortStatusMonitor(podInformer)
return pc, nil
}

func parseContainerIPs(ipcs []*current.IPConfig) ([]net.IP, error) {
Expand Down Expand Up @@ -166,13 +195,13 @@ func getContainerIPsString(ips []net.IP) string {
// not created for a Pod interface.
func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
if portData.ExternalIDs == nil {
klog.V(2).Infof("OVS port %s has no external_ids", portData.Name)
klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name)
return nil
}

containerID, found := portData.ExternalIDs[ovsExternalIDContainerID]
if !found {
klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID)
klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name)
return nil
}

Expand All @@ -187,8 +216,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in

containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC])
if err != nil {
klog.Errorf("Failed to parse MAC address from OVS external config %s: %v",
portData.ExternalIDs[ovsExternalIDMAC], err)
klog.ErrorS(err, "Failed to parse MAC address from OVS external config")
}
podName, _ := portData.ExternalIDs[ovsExternalIDPodName]
podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace]
Expand Down Expand Up @@ -279,7 +307,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s
func (pc *podConfigurator) removeInterfaces(containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}

Expand Down Expand Up @@ -498,7 +526,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int.
func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error {
containerID := containerConfig.ContainerID
klog.V(2).Infof("Deleting Openflow entries for container %s", containerID)
klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID)
if !pc.isSecondaryNetwork {
if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil {
return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err)
Expand All @@ -513,6 +541,7 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface
if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil {
return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err)
}

// Remove container configuration from cache.
pc.ifaceStore.DeleteInterface(containerConfig)
if !pc.isSecondaryNetwork {
Expand Down Expand Up @@ -558,7 +587,7 @@ func (pc *podConfigurator) connectInterceptedInterface(
func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}
for _, ip := range containerConfig.IPs {
Expand All @@ -570,3 +599,111 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace,
return pc.disconnectInterfaceFromOVS(containerConfig)
// TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy??
}

func (pc *podConfigurator) processNextWorkItem() bool {
key, quit := pc.unreadyPortQueue.Get()
if quit {
return false
}
defer pc.unreadyPortQueue.Done(key)

if err := pc.updateUnreadyPod(key); err != nil {
klog.ErrorS(err, "Failed install OpenFlow entries for OVS port interface", "name", key)
// Put the item back on the workqueue to handle any transient errors.
pc.unreadyPortQueue.AddAfter(key, retryInterval)
}
return true
}

func (pc *podConfigurator) updateUnreadyPod(ovsPort string) error {
ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort)
return nil
}

pc.containerAccess.lockContainer(ifConfig.ContainerID)
defer pc.containerAccess.unlockContainer(ifConfig.ContainerID)
// Get the InterfaceConfig again after the lock to avoid race conditions.
ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort)
return nil
}

if ifConfig.OFPort == 0 {
// Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated.
// Returns error so that we can have a retry after 5s.
pc.recordPodEvent(ifConfig, false)
return fmt.Errorf("pod's OpenFlow port is not ready yet")
}

// Install OpenFlow entries for the Pod.
klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ovsPort)
if err := pc.ofClient.InstallPodFlows(ovsPort, ifConfig.IPs, ifConfig.MAC, uint32(ifConfig.OFPort), ifConfig.VLANID, nil); err != nil {
// Add Pod not-ready event if the pod flows installation fails.
// Returns error so that we can have a retry after 5s.
pc.recordPodEvent(ifConfig, false)
return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ovsPort, err)
}

// Notify the Pod update event to required components.
event := agenttypes.PodUpdate{
PodName: ifConfig.PodName,
PodNamespace: ifConfig.PodNamespace,
IsAdd: true,
ContainerID: ifConfig.ContainerID,
}
pc.podUpdateNotifier.Notify(event)

pc.recordPodEvent(ifConfig, true)
return nil
}

func (pc *podConfigurator) recordPodEvent(ifConfig *interfacestore.InterfaceConfig, installed bool) {
pod, err := pc.podLister.Pods(ifConfig.PodNamespace).Get(ifConfig.PodName)
if err != nil {
klog.InfoS("Unable to get Pod, skip recording Pod event", "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName))
return
}

if installed {
// Add normal event to record Pod network is ready.
pc.recorder.Eventf(pod, corev1.EventTypeNormal, "NetworkReady", "Installed Pod network forwarding rules")
return
}

pc.recorder.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod network forwarding rules not installed")
}

func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) {
// Update Pod OpenFlow entries only after the OpenFlow port state is live.
if status.Desc.State != openflow15.PS_LIVE {
return
}
ovsPort := string(bytes.Trim(status.Desc.Name, "\x00"))
ofPort := status.Desc.PortNo

ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found", "ovsPort", ovsPort)
return
}

func() {
pc.containerAccess.lockContainer(ifConfig.ContainerID)
defer pc.containerAccess.unlockContainer(ifConfig.ContainerID)
// Get the InterfaceConfig again after the lock to avoid race conditions.
ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found", "ovsPort", ovsPort)
return
}
// Update interface config with the ofPort.
newIfConfig := ifConfig.DeepCopy()
newIfConfig.OVSPortConfig.OFPort = int32(ofPort)
pc.ifaceStore.UpdateInterface(newIfConfig)
}()

pc.unreadyPortQueue.Add(ovsPort)
}
9 changes: 9 additions & 0 deletions pkg/agent/cniserver/pod_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -113,3 +114,11 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte
klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName)
}
}

func (pc *podConfigurator) initPortStatusMonitor(_ cache.SharedIndexInformer) {

}

func (pc *podConfigurator) Run(stopCh <-chan struct{}) {
<-stopCh
}
2 changes: 1 addition & 1 deletion pkg/agent/cniserver/pod_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator
mockOFClient = openflowtest.NewMockClient(controller)
ifaceStore = interfacestore.NewInterfaceStore()
mockRoute = routetest.NewMockInterface(controller)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100))
configurator, _ := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil)
configurator.ifConfigurator = testIfaceConfigurator
return configurator
}
Expand Down
Loading

0 comments on commit 136785b

Please sign in to comment.