Skip to content

Commit

Permalink
Addressing the comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
behzad-mir committed Jan 26, 2024
1 parent 4aea862 commit 195356c
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ACN_VERSION ?= $(shell git describe --exclude "azure-ipam*" --exclude "dropgz
AZURE_IPAM_VERSION ?= $(notdir $(shell git describe --match "azure-ipam*" --tags --always))
CNI_VERSION ?= $(ACN_VERSION)
CNI_DROPGZ_VERSION ?= $(notdir $(shell git describe --match "dropgz*" --tags --always))
CNS_VERSION ?= $(ACN_VERSION)
CNS_VERSION ?= $(ACN_VERSION)-p
NPM_VERSION ?= $(ACN_VERSION)
ZAPAI_VERSION ?= $(notdir $(shell git describe --match "zapai*" --tags --always))

Expand Down
4 changes: 2 additions & 2 deletions cni/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ func (plugin *NetPlugin) Get(args *cniSkel.CmdArgs) error {
}

// Query the endpoint.
if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID); err != nil {
if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID, args.IfName); err != nil {
logger.Error("Failed to query endpoint", zap.Error(err))
return err
}
Expand Down Expand Up @@ -1051,7 +1051,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {

endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
// Query the endpoint.
if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID); err != nil {
if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID, args.IfName); err != nil {
logger.Info("GetEndpoint",
zap.String("endpoint", endpointID),
zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion cni/network/network_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
func (plugin *NetPlugin) handleConsecutiveAdd(args *cniSkel.CmdArgs, endpointId string, networkId string,
nwInfo *network.NetworkInfo, nwCfg *cni.NetworkConfig,
) (*cniTypesCurr.Result, error) {
epInfo, _ := plugin.nm.GetEndpointInfo(networkId, endpointId)
epInfo, _ := plugin.nm.GetEndpointInfo(networkId, endpointId, "")
if epInfo == nil {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cnm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (plugin *netPlugin) endpointOperInfo(w http.ResponseWriter, r *http.Request
}

// Process request.
epInfo, err := plugin.nm.GetEndpointInfo(req.NetworkID, req.EndpointID)
epInfo, err := plugin.nm.GetEndpointInfo(req.NetworkID, req.EndpointID, "")
if err != nil {
plugin.SendErrorResponse(w, err)
return
Expand Down
5 changes: 2 additions & 3 deletions cns/cnireconciler/podinfoprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func cniStateToPodInfoByIP(state *api.AzureCNIState) (map[string]cns.PodInfo, er
for _, endpoint := range state.ContainerInterfaces {
for _, epIP := range endpoint.IPAddresses {
podInfo := cns.NewPodInfo(endpoint.ContainerID, endpoint.PodEndpointId, endpoint.PodName, endpoint.PodNamespace)
logger.Printf("podInfoByIp [%+v]", podInfoByIP)
ipKey := epIP.IP.String()
if prevPodInfo, ok := podInfoByIP[ipKey]; ok {
return nil, errors.Wrapf(cns.ErrDuplicateIP, "duplicate ip %s found for different pods: pod: %+v, pod: %+v", ipKey, podInfo, prevPodInfo)
Expand All @@ -78,7 +77,6 @@ func cniStateToPodInfoByIP(state *api.AzureCNIState) (map[string]cns.PodInfo, er
podInfoByIP[ipKey] = podInfo
}
}
logger.Printf("podInfoByIP [%+v]", podInfoByIP)
return podInfoByIP, nil
}

Expand Down Expand Up @@ -117,7 +115,7 @@ func endpointStateToPodInfoByIP(state map[string]*restserver.EndpointInfo) (map[
// into a EndpointInfo map, using the containerID as keys in the map.
// The map then will be saved on CNS endpoint state
func cniStateToCnsEndpointState(state *api.AzureCNIState) (map[string]*restserver.EndpointInfo, error) {
logger.Printf("Generating CNS ENdpoint State")
logger.Printf("Generating CNS Endpoint State")
endpointState := map[string]*restserver.EndpointInfo{}
for epID, endpoint := range state.ContainerInterfaces {
endpointInfo := &restserver.EndpointInfo{PodName: endpoint.PodName, PodNamespace: endpoint.PodNamespace, IfnameToIPMap: make(map[string]*restserver.IPInfo)}
Expand Down Expand Up @@ -147,6 +145,7 @@ func cniStateToCnsEndpointState(state *api.AzureCNIState) (map[string]*restserve
endpointID, Ifname := extractEndpointInfo(epID, endpoint.ContainerID)
endpointInfo.IfnameToIPMap[Ifname] = ipInfo
endpointState[endpointID] = endpointInfo
logger.Printf("CNS endpoint state extracted from CNI: [%+v]", *endpointInfo)
}
return endpointState, nil
}
Expand Down
8 changes: 7 additions & 1 deletion cns/cnireconciler/podinfoprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ func TestNewCNIPodInfoProvider(t *testing.T) {
{
name: "good",
exec: newCNIStateFakeExec(
`{"ContainerInterfaces":{"3f813b02-eth0":{"PodName":"metrics-server-77c8679d7d-6ksdh","IfName":"eth0","PodNamespace":"kube-system","PodEndpointID":"3f813b02-eth0","ContainerID":"3f813b029429b4e41a09ab33b6f6d365d2ed704017524c78d1d0dece33cdaf46","IPAddresses":[{"IP":"10.241.0.17","Mask":"//8AAA=="}]},"6e688597-eth0":{"PodName":"tunnelfront-5d96f9b987-65xbn","IfName":"eth0","PodNamespace":"kube-system","PodEndpointID":"6e688597-eth0","ContainerID":"6e688597eafb97c83c84e402cc72b299bfb8aeb02021e4c99307a037352c0bed","IPAddresses":[{"IP":"10.241.0.13","Mask":"//8AAA=="}]}}}`,
`{"ContainerInterfaces":{"3f813b02-eth0":{"PodName":"metrics-server-77c8679d7d-6ksdh","IfName":"eth0",
"PodNamespace":"kube-system","PodEndpointID":"3f813b02-eth0",
"ContainerID":"3f813b029429b4e41a09ab33b6f6d365d2ed704017524c78d1d0dece33cdaf46",
"IPAddresses":[{"IP":"10.241.0.17","Mask":"//8AAA=="}]},
"6e688597-eth0":{"PodName":"tunnelfront-5d96f9b987-65xbn","IfName":"eth0","PodNamespace":"kube-system",
"PodEndpointID":"6e688597-eth0","ContainerID":"6e688597eafb97c83c84e402cc72b299bfb8aeb02021e4c99307a037352c0bed",
"IPAddresses":[{"IP":"10.241.0.13","Mask":"//8AAA=="}]}}}`,
),
want: map[string]cns.PodInfo{
"10.241.0.13": cns.NewPodInfo("6e688597eafb97c83c84e402cc72b299bfb8aeb02021e4c99307a037352c0bed", "6e688597-eth0", "tunnelfront-5d96f9b987-65xbn", "kube-system"),
Expand Down
52 changes: 31 additions & 21 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ func main() {

logger.Printf("Set GlobalPodInfoScheme %v (InitializeFromCNI=%t)", cns.GlobalPodInfoScheme, cnsconfig.InitializeFromCNI)

err = InitializeCRDState(rootCtx, httpRestService, cnsconfig, endpointStateStore)
err = InitializeCRDState(rootCtx, httpRestService, cnsconfig)
if err != nil {
logger.Errorf("Failed to start CRD Controller, err:%v.\n", err)
return
Expand Down Expand Up @@ -1176,7 +1176,7 @@ func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter,
}

// InitializeCRDState builds and starts the CRD controllers.
func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig, endpointStateStore store.KeyValueStore) error {
func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error {
// convert interface type to implementation type
httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService)
if !ok {
Expand Down Expand Up @@ -1228,26 +1228,9 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
switch {
case cnsconfig.ManageEndpointState:
logger.Printf("Initializing from self managed endpoint store")
podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server
podInfoByIPProvider, err = InitializeStateFromCNS(cnsconfig, httpRestServiceImplementation.EndpointStateStore)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state")
if cnsconfig.StatelessCNIMigration {
logger.Printf("StatelessCNI Migration is enabled")
logger.Printf("initializing from Statefull CNI")
var endpointState map[string]*restserver.EndpointInfo
podInfoByIPProvider, endpointState, err = cnireconciler.NewCNIPodInfoProvider()
if err != nil {
return errors.Wrap(err, "failed to create CNI PodInfoProvider")
}
err = endpointStateStore.Write(restserver.EndpointStoreKey, endpointState)
if err != nil {
return fmt.Errorf("failed to write endpoint state to store: %w", err)
}
}
} else {
return errors.Wrap(err, "failed to create CNS PodInfoProvider")
}
return errors.Wrap(err, "failed to create CNI PodInfoProvider")
}

case cnsconfig.InitializeFromCNI:
Expand Down Expand Up @@ -1515,3 +1498,30 @@ func createOrUpdateNodeInfoCRD(ctx context.Context, restConfig *rest.Config, nod

return nil
}

// InitializeStateFromCNS initilizes CNS Endpoint State from CNS or perform the Migration of state from statefull CNI.
func InitializeStateFromCNS(cnsconfig *configuration.CNSConfig, endpointStateStore store.KeyValueStore) (cns.PodInfoByIPProvider, error) {
logger.Printf("Initializing from self managed endpoint store")
podInfoByIPProvider, err := cnireconciler.NewCNSPodInfoProvider(endpointStateStore) // get reference to endpoint state store from rest server
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state")
if cnsconfig.StatelessCNIMigration {
logger.Printf("StatelessCNI Migration is enabled")
logger.Printf("initializing from Statefull CNI")
var endpointState map[string]*restserver.EndpointInfo
podInfoByIPProvider, endpointState, err = cnireconciler.NewCNIPodInfoProvider()
if err != nil {
return nil, errors.Wrap(err, "failed to create CNI PodInfoProvider")
}
err = endpointStateStore.Write(restserver.EndpointStoreKey, endpointState)
if err != nil {
return nil, fmt.Errorf("failed to write endpoint state to store: %w", err)
}
}
} else {
return nil, errors.Wrap(err, "failed to create CNS PodInfoProvider")
}
}
return podInfoByIPProvider, nil
}
2 changes: 1 addition & 1 deletion network/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (epInfo *EndpointInfo) GetEndpointInfoByIP(ipAddresses []net.IPNet, network
return endpointInfo, nil
}

// IsEndpointStateComplete returns true if both HNSEndpointID and HostVethName are missing.
// IsEndpointStateInComplete returns true if both HNSEndpointID and HostVethName are missing.
func (epInfo *EndpointInfo) IsEndpointStateIncomplete() bool {
if epInfo.HNSEndpointID == "" && epInfo.IfName == "" {
return true
Expand Down
8 changes: 4 additions & 4 deletions network/endpoint_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package network

import (
"context"
"errors"
"fmt"
"net"
"strings"
Expand All @@ -16,6 +15,7 @@ import (
"github.com/Azure/azure-container-networking/platform"
"github.com/Microsoft/hcsshim"
"github.com/Microsoft/hcsshim/hcn"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -502,11 +502,11 @@ func (epInfo *EndpointInfo) GetEndpointInfoByIPImpl(ipAddresses []net.IPNet, net
// check if network exists, only create the network does not exist
hnsResponse, err := Hnsv2.GetNetworkByName(networkID)
if err != nil {
return epInfo, errors.Wrap(err, "HNS Network not found")
return epInfo, errors.Wrapf(err, "HNS Network not found")
}
hcnEndpoints, err := Hnsv2.ListEndpointsOfNetwork(hnsResponse.Id)
if err != nil {
return epInfo, errors.Wrap(err, "failed to fetch HNS endpoints for the given network")
return epInfo, errors.Wrapf(err, "failed to fetch HNS endpoints for the given network")
}
for _, hcnEndpoint := range hcnEndpoints {

Check failure on line 511 in network/endpoint_windows.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

rangeValCopy: each iteration copies 392 bytes (consider pointers or indexing) (gocritic)
for _, ipConfiguration := range hcnEndpoint.IpConfigurations {
Expand All @@ -519,5 +519,5 @@ func (epInfo *EndpointInfo) GetEndpointInfoByIPImpl(ipAddresses []net.IPNet, net
}
}
}
return epInfo, errors.wrap(err, "No HNSEndpointID matches the IPAddress: "+ipAddresses[0].IP.String())
return epInfo, errors.Wrapf(err, "No HNSEndpointID matches the IPAddress: "+ipAddresses[0].IP.String())
}
69 changes: 38 additions & 31 deletions network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type NetworkManager interface {

CreateEndpoint(client apipaClient, networkID string, epInfo []*EndpointInfo) error
DeleteEndpoint(networkID string, endpointID string, epInfo *EndpointInfo) error
GetEndpointInfo(networkID string, endpointID string) (*EndpointInfo, error)
GetEndpointInfo(networkID string, endpointID string, ifName string) (*EndpointInfo, error)
GetAllEndpoints(networkID string) (map[string]*EndpointInfo, error)
GetEndpointInfoBasedOnPODDetails(networkID string, podName string, podNameSpace string, doExactMatchForPodName bool) (*EndpointInfo, error)
AttachEndpoint(networkID string, endpointID string, sandboxKey string) (*endpoint, error)
Expand Down Expand Up @@ -420,6 +420,38 @@ func (nm *networkManager) UpdateEndpointState(ep *endpoint) error {
return nil
}

// GetEndpointState will make a call to CNS GetEndpointState API in the stateless CNI mode to fetch the endpointInfo
func (nm *networkManager) GetEndpointState(networkID, endpointID, ifName string) (*EndpointInfo, error) {
endpointResponse, err := nm.CnsClient.GetEndpoint(context.TODO(), endpointID, ifName)
if err != nil {
return nil, errors.Wrapf(err, "Get endpoint API returend with error")
}
epInfo := &EndpointInfo{
Id: endpointID,
IfIndex: EndpointIfIndex, // Azure CNI supports only one interface
IfName: endpointResponse.EndpointInfo.HostVethName,
ContainerID: endpointID,
PODName: endpointResponse.EndpointInfo.PodName,
PODNameSpace: endpointResponse.EndpointInfo.PodNamespace,
NetworkContainerID: endpointID,
HNSEndpointID: endpointResponse.EndpointInfo.HnsEndpointID,
}

for _, ip := range endpointResponse.EndpointInfo.IfnameToIPMap {
epInfo.IPAddresses = ip.IPv4
epInfo.IPAddresses = append(epInfo.IPAddresses, ip.IPv6...)

}
if epInfo.IsEndpointStateIncomplete() {
epInfo, err = epInfo.GetEndpointInfoByIP(epInfo.IPAddresses, networkID)
if err != nil {
return nil, errors.Wrapf(err, "Get endpoint API returend with error")
}
}
logger.Info("returning getEndpoint API with", zap.String("Endpoint Info: ", epInfo.PrettyString()), zap.String("HNISID : ", epInfo.HNSEndpointID))
return epInfo, nil
}

// DeleteEndpoint deletes an existing container endpoint.
func (nm *networkManager) DeleteEndpoint(networkID, endpointID string, epInfo *EndpointInfo) error {
nm.Lock()
Expand Down Expand Up @@ -475,48 +507,23 @@ func (nm *networkManager) DeleteEndpointState(networkID string, epInfo *Endpoint
}

// GetEndpointInfo returns information about the given endpoint.
func (nm *networkManager) GetEndpointInfo(networkId string, endpointId string) (*EndpointInfo, error) {
func (nm *networkManager) GetEndpointInfo(networkID, endpointID, ifName string) (*EndpointInfo, error) {
nm.Lock()
defer nm.Unlock()

if nm.IsStatelessCNIMode() {
logger.Info("calling cns getEndpoint API")
endpointResponse, err := nm.CnsClient.GetEndpoint(context.TODO(), endpointId, "eth0")
if err != nil {
return nil, errors.Wrapf(err, "Get endpoint API returend with error")
}
epInfo := &EndpointInfo{
Id: endpointId,
IfIndex: EndpointIfIndex, // Azure CNI supports only one interface
IfName: endpointResponse.EndpointInfo.HostVethName,
ContainerID: endpointId,
PODName: endpointResponse.EndpointInfo.PodName,
PODNameSpace: endpointResponse.EndpointInfo.PodNamespace,
NetworkContainerID: endpointId,
HNSEndpointID: endpointResponse.EndpointInfo.HnsEndpointID,
}

for _, ip := range endpointResponse.EndpointInfo.IfnameToIPMap {
epInfo.IPAddresses = ip.IPv4
epInfo.IPAddresses = append(epInfo.IPAddresses, ip.IPv6...)
epInfo, err := nm.GetEndpointState(networkID, endpointID, ifName)

}
if epInfo.IsEndpointStateIncomplete() {
epInfo, err = epInfo.GetEndpointInfoByIP(epInfo.IPAddresses, networkId)
if err != nil {
return nil, errors.Wrapf(err, "Get endpoint API returend with error")
}
}
logger.Info("returning getEndpoint API with", zap.String("Endpoint Info: ", epInfo.PrettyString()), zap.String("HNISID : ", epInfo.HNSEndpointID))
return epInfo, nil
return epInfo, err
}

nw, err := nm.getNetwork(networkId)
nw, err := nm.getNetwork(networkID)
if err != nil {
return nil, err
}

ep, err := nw.getEndpoint(endpointId)
ep, err := nw.getEndpoint(endpointID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion network/manager_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (nm *MockNetworkManager) GetAllEndpoints(networkID string) (map[string]*End
}

// GetEndpointInfo mock
func (nm *MockNetworkManager) GetEndpointInfo(networkID string, endpointID string) (*EndpointInfo, error) {
func (nm *MockNetworkManager) GetEndpointInfo(networkID, endpointID, _ string) (*EndpointInfo, error) {

Check warning on line 102 in network/manager_mock.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

unused-parameter: parameter 'networkID' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 102 in network/manager_mock.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

unused-parameter: parameter 'networkID' seems to be unused, consider removing or renaming it as _ (revive)
if info, exists := nm.TestEndpointInfoMap[endpointID]; exists {
return info, nil
}
Expand Down

0 comments on commit 195356c

Please sign in to comment.