Skip to content

Commit

Permalink
Applying changes to CNIReonciler
Browse files Browse the repository at this point in the history
  • Loading branch information
behzad-mir committed Jan 29, 2024
1 parent b3ccc93 commit 153c841
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ACN_VERSION ?= $(shell git describe --exclude "azure-ipam*" --exclude "dropgz
AZURE_IPAM_VERSION ?= $(notdir $(shell git describe --match "azure-ipam*" --tags --always))
CNI_VERSION ?= $(ACN_VERSION)
CNI_DROPGZ_VERSION ?= $(notdir $(shell git describe --match "dropgz*" --tags --always))
CNS_VERSION ?= $(ACN_VERSION)-m
CNS_VERSION ?= $(ACN_VERSION)
NPM_VERSION ?= $(ACN_VERSION)
ZAPAI_VERSION ?= $(notdir $(shell git describe --match "zapai*" --tags --always))

Expand Down
3 changes: 0 additions & 3 deletions cni/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ type PodNetworkInterfaceInfo struct {
PodEndpointId string
ContainerID string
IPAddresses []net.IPNet
//HostIfName string
//HNSEndpointID string
//IfName string
}

type AzureCNIState struct {
Expand Down
4 changes: 1 addition & 3 deletions cni/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ func (c *client) GetEndpointState() (*api.AzureCNIState, error) {
if err := json.Unmarshal(output, state); err != nil {
return nil, fmt.Errorf("failed to decode response from Azure CNI when retrieving state: [%w], response from CNI: [%s]", err, string(output))
}
for containerID, endpointInfo := range state.ContainerInterfaces {
logger.Info("writing endpoint state from stateful CNI ", zap.String("podname:", containerID), zap.String("podname:", endpointInfo.PodName), zap.Any("endpoint:", endpointInfo))
}

return state, nil
}

Expand Down
3 changes: 0 additions & 3 deletions cni/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@ func (plugin *NetPlugin) GetAllEndpointState(networkid string) (*api.AzureCNISta
PodEndpointId: ep.Id,
ContainerID: ep.ContainerID,
IPAddresses: ep.IPAddresses,
//HostIfName: ep.HostIfName,
//HNSEndpointID: ep.HNSEndpointID,
//IfName: ep.IfName,
}

st.ContainerInterfaces[id] = info
Expand Down
1 change: 1 addition & 0 deletions cns/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,5 @@ type GetHomeAzResponse struct {
type EndpointRequest struct {
HnsEndpointID string `json:"hnsEndpointID"`
HostVethName string `json:"hostVethName"`
IFName string `json:"IFName"`
}
16 changes: 13 additions & 3 deletions cns/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,11 +1024,20 @@ func (c *Client) GetHomeAz(ctx context.Context) (*cns.GetHomeAzResponse, error)
}

// GetEndpoint calls the EndpointHandlerAPI in CNS to retrieve the state of a given EndpointID
func (c *Client) GetEndpoint(ctx context.Context, endpointID string) (*restserver.GetEndpointResponse, error) {
func (c *Client) GetEndpoint(ctx context.Context, endpointID, ifName string) (*restserver.GetEndpointResponse, error) {
// build the request
getEndpoint := cns.EndpointRequest{
IFName: ifName,
}
var body bytes.Buffer

if err := json.NewEncoder(&body).Encode(getEndpoint); err != nil {
return nil, errors.Wrap(err, "failed to encode getEndpoint")
}

u := c.routes[cns.EndpointAPI]
uString := u.String() + endpointID
req, err := http.NewRequestWithContext(ctx, http.MethodGet, uString, http.NoBody)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, uString, &body)
if err != nil {
return nil, errors.Wrap(err, "failed to build request")
}
Expand Down Expand Up @@ -1059,11 +1068,12 @@ func (c *Client) GetEndpoint(ctx context.Context, endpointID string) (*restserve

// UpdateEndpoint calls the EndpointHandlerAPI in CNS
// to update the state of a given EndpointID with either HNSEndpointID or HostVethName
func (c *Client) UpdateEndpoint(ctx context.Context, endpointID, hnsID, vethName string) (*cns.Response, error) {
func (c *Client) UpdateEndpoint(ctx context.Context, endpointID, hnsID, vethName, ifName string) (*cns.Response, error) {
// build the request
updateEndpoint := cns.EndpointRequest{
HnsEndpointID: hnsID,
HostVethName: vethName,
IFName: ifName,
}
var body bytes.Buffer

Expand Down
15 changes: 13 additions & 2 deletions cns/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,7 @@ func TestUpdateEndpoint(t *testing.T) {
containerID string
hnsID string
vethName string
ifName string
response *RequestCapture
expReq *cns.EndpointRequest
shouldErr bool
Expand All @@ -2786,6 +2787,7 @@ func TestUpdateEndpoint(t *testing.T) {
"",
"",
"",
"",
&RequestCapture{
Next: &mockdo{},
},
Expand All @@ -2797,13 +2799,15 @@ func TestUpdateEndpoint(t *testing.T) {
"foo",
"bar",
"",
"too",
&RequestCapture{
Next: &mockdo{
httpStatusCodeToReturn: http.StatusOK,
},
},
&cns.EndpointRequest{
HnsEndpointID: "bar",
IFName: "too",
},
false,
},
Expand All @@ -2812,13 +2816,15 @@ func TestUpdateEndpoint(t *testing.T) {
"foo",
"",
"bar",
"too",
&RequestCapture{
Next: &mockdo{
httpStatusCodeToReturn: http.StatusOK,
},
},
&cns.EndpointRequest{
HostVethName: "bar",
IFName: "too",
},
false,
},
Expand All @@ -2827,6 +2833,7 @@ func TestUpdateEndpoint(t *testing.T) {
"foo",
"",
"bar",
"",
&RequestCapture{
Next: &mockdo{
httpStatusCodeToReturn: http.StatusBadRequest,
Expand All @@ -2851,7 +2858,7 @@ func TestUpdateEndpoint(t *testing.T) {
}

// execute the method under test
res, err := client.UpdateEndpoint(context.TODO(), test.containerID, test.hnsID, test.vethName)
res, err := client.UpdateEndpoint(context.TODO(), test.containerID, test.hnsID, test.vethName, test.ifName)
if err != nil && !test.shouldErr {
t.Fatal("unexpected error: err: ", err, res.Message)
}
Expand Down Expand Up @@ -2897,12 +2904,14 @@ func TestGetEndpoint(t *testing.T) {
getEndpointTests := []struct {
name string
containerID string
ifName string
response *RequestCapture
shouldErr bool
}{
{
"empty",
"",
"",
&RequestCapture{
Next: &mockdo{},
},
Expand All @@ -2911,6 +2920,7 @@ func TestGetEndpoint(t *testing.T) {
{
"with EndpointID",
"foo",
"foo",
&RequestCapture{
Next: &mockdo{
httpStatusCodeToReturn: http.StatusOK,
Expand All @@ -2921,6 +2931,7 @@ func TestGetEndpoint(t *testing.T) {
{
"Bad Request",
"foo",
"foo",
&RequestCapture{
Next: &mockdo{
httpStatusCodeToReturn: http.StatusBadRequest,
Expand All @@ -2942,7 +2953,7 @@ func TestGetEndpoint(t *testing.T) {
}

// execute the method under test
res, err := client.GetEndpoint(context.TODO(), test.containerID)
res, err := client.GetEndpoint(context.TODO(), test.containerID, test.ifName)
if err != nil && !test.shouldErr {
t.Fatal("unexpected error: err: ", err, res.Response.Message)
}
Expand Down
47 changes: 31 additions & 16 deletions cns/cnireconciler/podinfoprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cnireconciler
import (
"fmt"
"net"
"strings"

"github.com/Azure/azure-container-networking/cni/api"
"github.com/Azure/azure-container-networking/cni/client"
Expand All @@ -14,6 +15,8 @@ import (
"k8s.io/utils/exec"
)

const InterfaceName = "eth0"

// NewCNIPodInfoProvider returns an implementation of cns.PodInfoByIPProvider
// that execs out to the CNI and uses the response to build the PodInfo map.
func NewCNIPodInfoProvider() (cns.PodInfoByIPProvider, map[string]*restserver.EndpointInfo, error) {
Expand Down Expand Up @@ -41,16 +44,20 @@ func newCNSPodInfoProvider(endpointStore store.KeyValueStore) (cns.PodInfoByIPPr
}), nil
}

func newCNIPodInfoProvider(exec exec.Interface) (cns.PodInfoByIPProvider, map[string]*restserver.EndpointInfo, error) {
cli := client.New(exec)
func newCNIPodInfoProvider(exc exec.Interface) (cns.PodInfoByIPProvider, map[string]*restserver.EndpointInfo, error) {
cli := client.New(exc)
state, err := cli.GetEndpointState()
if err != nil {
return nil, nil, fmt.Errorf("failed to invoke CNI client.GetEndpointState(): %w", err)
}
endpointState, err := cniStateToCnsEndpointState(state)
for containerID, endpointInfo := range state.ContainerInterfaces {
logger.Printf("state dump from CNI: [%+v], [%+v]", containerID, endpointInfo)
}
var endpointState map[string]*restserver.EndpointInfo
endpointState, err = cniStateToCnsEndpointState(state)
return cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) {
return cniStateToPodInfoByIP(state)
}), endpointState, nil
}), endpointState, err
}

// cniStateToPodInfoByIP converts an AzureCNIState dumped from a CNI exec
Expand Down Expand Up @@ -112,16 +119,16 @@ func endpointStateToPodInfoByIP(state map[string]*restserver.EndpointInfo) (map[
func cniStateToCnsEndpointState(state *api.AzureCNIState) (map[string]*restserver.EndpointInfo, error) {
logger.Printf("Generating CNS ENdpoint State")
endpointState := map[string]*restserver.EndpointInfo{}
for _, endpoint := range state.ContainerInterfaces {
for epID, endpoint := range state.ContainerInterfaces {
endpointInfo := &restserver.EndpointInfo{PodName: endpoint.PodName, PodNamespace: endpoint.PodNamespace, IfnameToIPMap: make(map[string]*restserver.IPInfo)}
ipInfo := &restserver.IPInfo{}
for _, epIP := range endpoint.IPAddresses {
if epIP.IP.To4() == nil { // is an ipv6 address
ipconfig := net.IPNet{IP: epIP.IP, Mask: epIP.Mask}
for _, ipconf := range ipInfo.IPv6 {
if ipconf.IP.Equal(ipconfig.IP) {
logger.Printf("Found existing ipv6 ipconfig for infra container %s", endpoint.ContainerID)
return nil, nil
logger.Errorf("Found existing ipv6 ipconfig for infra container %s", endpoint.ContainerID)
return nil, restserver.ErrExistingIpconfigFound
}
}
ipInfo.IPv6 = append(ipInfo.IPv6, ipconfig)
Expand All @@ -130,20 +137,28 @@ func cniStateToCnsEndpointState(state *api.AzureCNIState) (map[string]*restserve
ipconfig := net.IPNet{IP: epIP.IP, Mask: epIP.Mask}
for _, ipconf := range ipInfo.IPv4 {
if ipconf.IP.Equal(ipconfig.IP) {
logger.Printf("Found existing ipv4 ipconfig for infra container %s", endpoint.ContainerID)
return nil, nil
logger.Errorf("Found existing ipv4 ipconfig for infra container %s", endpoint.ContainerID)
return nil, restserver.ErrExistingIpconfigFound
}
}
ipInfo.IPv4 = append(ipInfo.IPv4, ipconfig)
}
}
endpointInfo.IfnameToIPMap["eth0"] = ipInfo
logger.Printf("writing endpoint podName from stateful CNI %v", endpoint.PodName)
logger.Printf("writing endpoint info from stateful CNI [%+v]", *endpointInfo)
endpointState[endpoint.ContainerID] = endpointInfo
}
for containerID, endpointInfo := range endpointState {
logger.Printf("writing endpoint state from stateful CNI [%+v]:[%+v]", containerID, *endpointInfo)
endpointID, Ifname := extractEndpointInfo(epID, endpoint.ContainerID)
endpointInfo.IfnameToIPMap[Ifname] = ipInfo
endpointState[endpointID] = endpointInfo
}
return endpointState, nil
}

// extractEndpointInfo extract Interface Name and endpointID for each endpoint based the CNI state
func extractEndpointInfo(epID, containerID string) (endpointID, interfaceName string) {
ifName := InterfaceName
if strings.Contains(epID, "-eth") {
ifName = epID[len(epID)-4:]
}
if containerID == "" {
return epID, ifName
}
return containerID, ifName
}
27 changes: 24 additions & 3 deletions cns/restserver/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ var (
ErrNoNCs = errors.New("no NCs found in the CNS internal state")
ErrOptManageEndpointState = errors.New("CNS is not set to manage the endpoint state")
ErrEndpointStateNotFound = errors.New("endpoint state could not be found in the statefile")
ErrExistingIpconfigFound = errors.New("Found existing ipconfig for infra container")
)

const ContainerIDLength = 8

// requestIPConfigHandlerHelper validates the request, assign IPs and return the IPConfigs
func (service *HTTPRestService) requestIPConfigHandlerHelper(ctx context.Context, ipconfigsRequest cns.IPConfigsRequest) (*cns.IPConfigsResponse, error) {
// For SWIFT v2 scenario, the validator function will also modify the ipconfigsRequest.
Expand Down Expand Up @@ -1008,9 +1011,22 @@ func (service *HTTPRestService) EndpointHandlerAPI(w http.ResponseWriter, r *htt
// GetEndpointHandler handles the incoming GetEndpoint requests with http Get method
func (service *HTTPRestService) GetEndpointHandler(w http.ResponseWriter, r *http.Request) {
logger.Printf("[GetEndpointState] GetEndpoint for %s", r.URL.Path)

var req cns.EndpointRequest
err := service.Listener.Decode(w, r, &req)
endpointID := strings.TrimPrefix(r.URL.Path, cns.EndpointPath)
endpointInfo, err := service.GetEndpointHelper(endpointID)
logger.Request(service.Name, &req, err)
// Check if the request is valid
if err != nil || req.IFName == "" {
response := cns.Response{
ReturnCode: types.InvalidRequest,
Message: fmt.Sprintf("[getEndpoint] getEndpoint failed with error: %s", err.Error()),
}
w.Header().Set(cnsReturnCode, response.ReturnCode.String())
err = service.Listener.Encode(w, &response)
logger.Response(service.Name, response, response.ReturnCode, err)
return
}
endpointInfo, err := service.GetEndpointHelper(endpointID, req)
if err != nil {
response := GetEndpointResponse{
Response: Response{
Expand Down Expand Up @@ -1044,7 +1060,7 @@ func (service *HTTPRestService) GetEndpointHandler(w http.ResponseWriter, r *htt
}

// GetEndpointHelper returns the state of the given endpointId
func (service *HTTPRestService) GetEndpointHelper(endpointID string) (*EndpointInfo, error) {
func (service *HTTPRestService) GetEndpointHelper(endpointID string, req cns.EndpointRequest) (*EndpointInfo, error) {
logger.Printf("[GetEndpointState] Get endpoint state for infra container %s", endpointID)

// Skip if a store is not provided.
Expand All @@ -1068,6 +1084,11 @@ func (service *HTTPRestService) GetEndpointHelper(endpointID string) (*EndpointI
logger.Warnf("[GetEndpointState] Found existing endpoint state for container %s", endpointID)
return endpointInfo, nil
}
legacyEndpointID := endpointID[:ContainerIDLength] + "-" + req.IFName
if endpointInfo, ok := service.EndpointState[legacyEndpointID]; ok {
logger.Warnf("[GetEndpointState] Found existing endpoint state for container %s", legacyEndpointID)
return endpointInfo, nil
}
return nil, ErrEndpointStateNotFound
}

Expand Down
6 changes: 1 addition & 5 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,11 +1246,7 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
if err != nil {
return errors.Wrap(err, "failed to create CNI PodInfoProvider")
}
logger.Printf("writing endpoint info from stateful CNI [%+v]", endpointState)
for containerID, endpointInfo := range endpointState {
logger.Printf("writing endpoint state from stateful CNI [%+v]:[%+v]", containerID, *endpointInfo)
}
err := endpointStateStore.Write(restserver.EndpointStoreKey, endpointState)
err = endpointStateStore.Write(restserver.EndpointStoreKey, endpointState)
if err != nil {
return fmt.Errorf("failed to write endpoint state to store: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions network/endpoint_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,11 @@ func (epInfo *EndpointInfo) GetEndpointInfoByIPImpl(ipAddresses []net.IPNet, net
// check if network exists, only create the network does not exist
hnsResponse, err := Hnsv2.GetNetworkByName(networkID)
if err != nil {
return epInfo, err
return epInfo, errors.Wrap(err, "HNS Network not found")
}
hcnEndpoints, err := Hnsv2.ListEndpointsOfNetwork(hnsResponse.Id)
if err != nil {
return epInfo, err
return epInfo, errors.Wrap(err, "failed to fetch HNS endpoints for the given network")
}
for _, hcnEndpoint := range hcnEndpoints {
for _, ipConfiguration := range hcnEndpoint.IpConfigurations {
Expand All @@ -519,5 +519,5 @@ func (epInfo *EndpointInfo) GetEndpointInfoByIPImpl(ipAddresses []net.IPNet, net
}
}
}
return epInfo, errors.New("No HNSEndpointID matches the IPAddress: " + ipAddresses[0].IP.String())
return epInfo, errors.wrap(err, "No HNSEndpointID matches the IPAddress: "+ipAddresses[0].IP.String())
}
4 changes: 2 additions & 2 deletions network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (nm *networkManager) CreateEndpoint(cli apipaClient, networkID string, epIn
// It will add HNSEndpointID or HostVeth name to the endpoint state
func (nm *networkManager) UpdateEndpointState(ep *endpoint) error {
logger.Info("Calling cns updateEndpoint API with ", zap.String("containerID: ", ep.ContainerID), zap.String("HnsId: ", ep.HnsId), zap.String("HostIfName: ", ep.HostIfName))
response, err := nm.CnsClient.UpdateEndpoint(context.TODO(), ep.ContainerID, ep.HnsId, ep.HostIfName)
response, err := nm.CnsClient.UpdateEndpoint(context.TODO(), ep.ContainerID, ep.HnsId, ep.HostIfName, ep.IfName)
if err != nil {
return errors.Wrapf(err, "Update endpoint API returend with error")
}
Expand Down Expand Up @@ -481,7 +481,7 @@ func (nm *networkManager) GetEndpointInfo(networkId string, endpointId string) (

if nm.IsStatelessCNIMode() {
logger.Info("calling cns getEndpoint API")
endpointResponse, err := nm.CnsClient.GetEndpoint(context.TODO(), endpointId)
endpointResponse, err := nm.CnsClient.GetEndpoint(context.TODO(), endpointId, "eth0")
if err != nil {
return nil, errors.Wrapf(err, "Get endpoint API returend with error")
}
Expand Down

0 comments on commit 153c841

Please sign in to comment.