Skip to content

Commit

Permalink
[Windows] CNI Server installs OpenFlow entries after PortStatus messa…
Browse files Browse the repository at this point in the history
…ge is received

This change has introduced a worker in `podConfigurator` to listen for the
OpenFlow PortStatus message when a new OpenFlow port is allocated in OVS. After
receiving the message, antrea-agent Windows will install Pod related OpenFlow
entries. If the OpenFlow port is not allocated within 30s after the CmdAdd
request is responded, an event with type "NetworkNotReady" is added on the Pod;
Whenever the Pod networking forwarding rules are installed, an event with type
"NetworkIsReady" is added.

Signed-off-by: Wenying Dong <wenyingd@vmware.com>
  • Loading branch information
wenyingd committed Nov 1, 2024
1 parent 71d57f1 commit 84227c7
Show file tree
Hide file tree
Showing 24 changed files with 1,027 additions and 138 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func run(o *Options) error {
o.config.CNISocket,
o.config.HostProcPathPrefix,
nodeConfig,
localPodInformer.Get(),
k8sClient,
routeClient,
isChaining,
Expand Down
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module antrea.io/antrea
go 1.23.0

require (
antrea.io/libOpenflow v0.14.0
antrea.io/ofnet v0.12.0
antrea.io/libOpenflow v0.15.0
antrea.io/ofnet v0.14.0
github.com/ClickHouse/clickhouse-go/v2 v2.6.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Mellanox/sriovnet v1.1.0
Expand Down Expand Up @@ -113,10 +113,9 @@ require (
github.com/aws/smithy-go v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cenkalti/hub v1.0.2 // indirect
github.com/cenkalti/rpc2 v1.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand Down
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE=
antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0=
antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao=
antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg=
antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI=
antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ=
antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc=
antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -114,14 +114,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA=
github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg=
github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8=
github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU=
github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k=
github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/cniserver/interface_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (

const (
notFoundHNSEndpoint = "The endpoint was not found"

podNotReadyTimeInSeconds = 30 * time.Second
)

var (
Expand Down
135 changes: 126 additions & 9 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
package cniserver

import (
"bytes"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"time"

"antrea.io/libOpenflow/openflow15"
current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/cni/pkg/version"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -61,6 +68,10 @@ const (

var (
getNSPath = util.GetNSPath
// retryInterval is the interval to re-install Pod OpenFlow entries if any error happened.
// Note, using a variable rather than constant for retryInterval because we may use a shorter time in the
// test code.
retryInterval = 5 * time.Second
)

type podConfigurator struct {
Expand All @@ -76,9 +87,18 @@ type podConfigurator struct {
// isSecondaryNetwork is true if this instance of podConfigurator is used to configure
// Pod secondary network interfaces.
isSecondaryNetwork bool

containerAccess *containerAccessArbitrator
eventBroadcaster record.EventBroadcaster
record record.EventRecorder
podLister v1.PodLister
kubeClient clientset.Interface
unreadyPortQueue workqueue.TypedDelayingInterface[string]
statusCh chan *openflow15.PortStatus
}

func newPodConfigurator(
kubeClient clientset.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
routeClient route.Interface,
Expand All @@ -88,20 +108,27 @@ func newPodConfigurator(
isOvsHardwareOffloadEnabled bool,
disableTXChecksumOffload bool,
podUpdateNotifier channel.Notifier,
podLister v1.PodLister,
containerAccess *containerAccessArbitrator,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload)
if err != nil {
return nil, err
}
return &podConfigurator{
pc := &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
}, nil
kubeClient: kubeClient,
podLister: podLister,
containerAccess: containerAccess,
}
pc.initForOS()
return pc, nil
}

func parseContainerIPs(ipcs []*current.IPConfig) ([]net.IP, error) {
Expand Down Expand Up @@ -166,13 +193,13 @@ func getContainerIPsString(ips []net.IP) string {
// not created for a Pod interface.
func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
if portData.ExternalIDs == nil {
klog.V(2).Infof("OVS port %s has no external_ids", portData.Name)
klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name)
return nil
}

containerID, found := portData.ExternalIDs[ovsExternalIDContainerID]
if !found {
klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID)
klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name)
return nil
}

Expand All @@ -187,8 +214,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in

containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC])
if err != nil {
klog.Errorf("Failed to parse MAC address from OVS external config %s: %v",
portData.ExternalIDs[ovsExternalIDMAC], err)
klog.ErrorS(err, "Failed to parse MAC address from OVS external config")
}
podName, _ := portData.ExternalIDs[ovsExternalIDPodName]
podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace]
Expand Down Expand Up @@ -279,7 +305,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s
func (pc *podConfigurator) removeInterfaces(containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}

Expand Down Expand Up @@ -498,7 +524,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int.
func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error {
containerID := containerConfig.ContainerID
klog.V(2).Infof("Deleting Openflow entries for container %s", containerID)
klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID)
if !pc.isSecondaryNetwork {
if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil {
return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err)
Expand All @@ -513,6 +539,7 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface
if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil {
return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err)
}

// Remove container configuration from cache.
pc.ifaceStore.DeleteInterface(containerConfig)
if !pc.isSecondaryNetwork {
Expand Down Expand Up @@ -558,7 +585,7 @@ func (pc *podConfigurator) connectInterceptedInterface(
func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}
for _, ip := range containerConfig.IPs {
Expand All @@ -570,3 +597,93 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace,
return pc.disconnectInterfaceFromOVS(containerConfig)
// TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy??
}

func (pc *podConfigurator) processNextWorkItem() bool {
key, quit := pc.unreadyPortQueue.Get()
if quit {
return false
}
defer pc.unreadyPortQueue.Done(key)

if err := pc.updateUnreadyPod(key); err != nil {
klog.ErrorS(err, "Failed install OpenFlow entries for OVS port interface", "name", key)
// Put the item back on the workqueue to handle any transient errors.
pc.unreadyPortQueue.AddAfter(key, retryInterval)
}
return true
}

func (pc *podConfigurator) updateUnreadyPod(ovsPort string) error {
ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort)
return nil
}

if ifConfig.OFPort == 0 {
// Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated.
// Returns error so that we can have a retry after 5s.
pc.processPodEvents(ifConfig, false)
return fmt.Errorf("pod's OpenFlow port is not ready yet")
}

// Install OpenFlow entries for the Pod.
klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ovsPort)
if err := pc.ofClient.InstallPodFlows(ovsPort, ifConfig.IPs, ifConfig.MAC, uint32(ifConfig.OFPort), ifConfig.VLANID, nil); err != nil {
// Add Pod not-ready event if the pod flows installation fails.
// Returns error so that we can have a retry after 5s.
pc.processPodEvents(ifConfig, false)
return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ovsPort, err)
}

// Notify the Pod update event to required components.
event := agenttypes.PodUpdate{
PodName: ifConfig.PodName,
PodNamespace: ifConfig.PodNamespace,
IsAdd: true,
ContainerID: ifConfig.ContainerID,
}
pc.podUpdateNotifier.Notify(event)

// Ignore the error here since we get the Pod from local cache, it means that Pod is deleted in the cache
// when an error is returned, and no need to retry on such event.
pc.processPodEvents(ifConfig, true)
return nil
}

func (pc *podConfigurator) processPodEvents(ifConfig *interfacestore.InterfaceConfig, installed bool) {
pod, err := pc.podLister.Pods(ifConfig.PodNamespace).Get(ifConfig.PodName)
if err != nil {
klog.ErrorS(err, "Failed to get Pod", "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName))
return
}

if installed {
// Add normal event to record Pod network is ready.
pc.record.Eventf(pod, corev1.EventTypeNormal, "NetworkReady", "Installed Pod network forwarding rules")
return
}

pc.record.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod network forwarding rules not installed")
}

func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) {
// Update Pod OpenFlow entries only after the OpenFlow port state is live.
if status.Desc.State != openflow15.PS_LIVE {
return
}
ovsPort := string(bytes.Trim(status.Desc.Name, "\x00"))
ofPort := status.Desc.PortNo

ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
return
}
pc.containerAccess.lockContainer(ifConfig.ContainerID)
defer pc.containerAccess.unlockContainer(ifConfig.ContainerID)
// Update interface config with the ofPort.
newIfConfig := ifConfig.DeepCopy()
newIfConfig.OVSPortConfig.OFPort = int32(ofPort)
pc.ifaceStore.UpdateInterface(newIfConfig)
pc.unreadyPortQueue.Add(ovsPort)
}
8 changes: 8 additions & 0 deletions pkg/agent/cniserver/pod_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,11 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte
klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName)
}
}

func (pc *podConfigurator) initForOS() {

}

func (pc *podConfigurator) Run(stopCh <-chan struct{}) {
<-stopCh
}
2 changes: 1 addition & 1 deletion pkg/agent/cniserver/pod_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator
mockOFClient = openflowtest.NewMockClient(controller)
ifaceStore = interfacestore.NewInterfaceStore()
mockRoute = routetest.NewMockInterface(controller)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100))
configurator, _ := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil)
configurator.ifConfigurator = testIfaceConfigurator
return configurator
}
Expand Down
Loading

0 comments on commit 84227c7

Please sign in to comment.