From ea0be88aa4f9804500b3b218a4bbe0f82b3915fb Mon Sep 17 00:00:00 2001 From: AzureAhai Date: Thu, 8 Feb 2024 13:50:29 -0800 Subject: [PATCH] Adding a MigrateSate() function to the cnireconciler --- cns/cnireconciler/podinfoprovider.go | 37 ++++++++++-------- cns/cnireconciler/podinfoprovider_test.go | 4 +- cns/service/main.go | 46 +++++++++++------------ 3 files changed, 46 insertions(+), 41 deletions(-) diff --git a/cns/cnireconciler/podinfoprovider.go b/cns/cnireconciler/podinfoprovider.go index 3b81bf7f737..69bb7db519e 100644 --- a/cns/cnireconciler/podinfoprovider.go +++ b/cns/cnireconciler/podinfoprovider.go @@ -17,14 +17,8 @@ import ( // NewCNIPodInfoProvider returns an implementation of cns.PodInfoByIPProvider // that execs out to the CNI and uses the response to build the PodInfo map. -// if EnableStateMigration flag is set to true it will also returns a map of containerID->EndpointInfo -func NewCNIPodInfoProvider(enableStateMigration bool) (cns.PodInfoByIPProvider, map[string]*restserver.EndpointInfo, error) { - podInfoByIPProvider, cniState, err := newCNIPodInfoProvider(exec.New()) - if enableStateMigration { - endpointState := cniStateToCnsEndpointState(cniState) - return podInfoByIPProvider, endpointState, err - } - return podInfoByIPProvider, nil, err +func NewCNIPodInfoProvider() (cns.PodInfoByIPProvider, error) { + return newCNIPodInfoProvider(exec.New()) } func NewCNSPodInfoProvider(endpointStore store.KeyValueStore) (cns.PodInfoByIPProvider, error) { @@ -48,18 +42,15 @@ func newCNSPodInfoProvider(endpointStore store.KeyValueStore) (cns.PodInfoByIPPr }), nil } -func newCNIPodInfoProvider(exc exec.Interface) (cns.PodInfoByIPProvider, *api.AzureCNIState, error) { - cli := client.New(exc) +func newCNIPodInfoProvider(exec exec.Interface) (cns.PodInfoByIPProvider, error) { + cli := client.New(exec) state, err := cli.GetEndpointState() if err != nil { - return nil, nil, fmt.Errorf("failed to invoke CNI client.GetEndpointState(): %w", err) - } - for containerID, endpointInfo := range state.ContainerInterfaces { - logger.Printf("state dump from CNI: [%+v], [%+v]", containerID, endpointInfo) + return nil, fmt.Errorf("failed to invoke CNI client.GetEndpointState(): %w", err) } return cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) { return cniStateToPodInfoByIP(state) - }), state, err + }), nil } // cniStateToPodInfoByIP converts an AzureCNIState dumped from a CNI exec @@ -71,6 +62,7 @@ func cniStateToPodInfoByIP(state *api.AzureCNIState) (map[string]cns.PodInfo, er for _, endpoint := range state.ContainerInterfaces { for _, epIP := range endpoint.IPAddresses { podInfo := cns.NewPodInfo(endpoint.ContainerID, endpoint.PodEndpointId, endpoint.PodName, endpoint.PodNamespace) + ipKey := epIP.IP.String() if prevPodInfo, ok := podInfoByIP[ipKey]; ok { return nil, errors.Wrapf(cns.ErrDuplicateIP, "duplicate ip %s found for different pods: pod: %+v, pod: %+v", ipKey, podInfo, prevPodInfo) @@ -113,6 +105,21 @@ func endpointStateToPodInfoByIP(state map[string]*restserver.EndpointInfo) (map[ return podInfoByIP, nil } +// MigrateCNISate returns an endpoint state of CNS by reading the CNI state file +func MigrateCNISate() (map[string]*restserver.EndpointInfo, error) { + return migrateCNISate(exec.New()) +} + +func migrateCNISate(exc exec.Interface) (map[string]*restserver.EndpointInfo, error) { + cli := client.New(exc) + state, err := cli.GetEndpointState() + if err != nil { + return nil, fmt.Errorf("failed to invoke CNI client.GetEndpointState(): %w", err) + } + endpointState := cniStateToCnsEndpointState(state) + return endpointState, nil +} + // cniStateToCnsEndpointState converts an AzureCNIState dumped from a CNI exec // into a EndpointInfo map, using the containerID as keys in the map. // The map then will be saved on CNS endpoint state diff --git a/cns/cnireconciler/podinfoprovider_test.go b/cns/cnireconciler/podinfoprovider_test.go index da51b840e86..8d10b1c5865 100644 --- a/cns/cnireconciler/podinfoprovider_test.go +++ b/cns/cnireconciler/podinfoprovider_test.go @@ -76,14 +76,14 @@ func TestNewCNIPodInfoProvider(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - got, endpointState, err := newCNIPodInfoProvider(tt.exec) + got, err := newCNIPodInfoProvider(tt.exec) if tt.wantErr { assert.Error(t, err) return } assert.NoError(t, err) podInfoByIP, _ := got.PodInfoByIP() - assert.Equal(t, tt.want, podInfoByIP, endpointState) + assert.Equal(t, tt.want, podInfoByIP) }) } } diff --git a/cns/service/main.go b/cns/service/main.go index f0b2cade4c3..aa83aba12cb 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1234,14 +1234,23 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn switch { case cnsconfig.ManageEndpointState: logger.Printf("Initializing from self managed endpoint store") - podInfoByIPProvider, err = InitializeStateFromCNS(cnsconfig, httpRestServiceImplementation.EndpointStateStore) + if cnsconfig.EnableStateMigration && !httpRestServiceImplementation.EndpointStateStore.Exists() { // perform state migration + err = PopulateCNSEndpointState(httpRestServiceImplementation.EndpointStateStore) + } if err != nil { - return errors.Wrap(err, "failed to create CNI PodInfoProvider") + return errors.Wrap(err, "failed to create CNS EndpointState From CNI") + } + podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state") + } else { + return errors.Wrap(err, "failed to create CNS PodInfoProvider") + } } - case cnsconfig.InitializeFromCNI: logger.Printf("Initializing from CNI") - podInfoByIPProvider, _, err = cnireconciler.NewCNIPodInfoProvider(false) + podInfoByIPProvider, err = cnireconciler.NewCNIPodInfoProvider() if err != nil { return errors.Wrap(err, "failed to create CNI PodInfoProvider") } @@ -1514,27 +1523,16 @@ func createOrUpdateNodeInfoCRD(ctx context.Context, restConfig *rest.Config, nod return nil } -// InitializeStateFromCNS initilizes CNS Endpoint State from CNS or perform the Migration of state from statefull CNI. -func InitializeStateFromCNS(cnsconfig *configuration.CNSConfig, endpointStateStore store.KeyValueStore) (cns.PodInfoByIPProvider, error) { - if cnsconfig.EnableStateMigration && !endpointStateStore.Exists() { // initilize form CNI and perform state migration - logger.Printf("StatelessCNI Migration is enabled") - logger.Printf("initializing from Statefull CNI") - var endpointState map[string]*restserver.EndpointInfo - podInfoByIPProvider, endpointState, err := cnireconciler.NewCNIPodInfoProvider(cnsconfig.EnableStateMigration) - if err != nil { - return nil, errors.Wrap(err, "failed to create CNI PodInfoProvider") - } - err = endpointStateStore.Write(restserver.EndpointStoreKey, endpointState) - if err != nil { - return nil, fmt.Errorf("failed to write endpoint state to store: %w", err) - } - return podInfoByIPProvider, nil +// PopulateCNSEndpointState initilizes CNS Endpoint State by Migrating the CNI state. +func PopulateCNSEndpointState(endpointStateStore store.KeyValueStore) error { + logger.Printf("State Migration is enabled") + endpointState, err := cnireconciler.MigrateCNISate() + if err != nil { + return errors.Wrap(err, "failed to create CNS Endpoint state from CNI") } - // initilize form CNS and avoid state migration - logger.Printf("Initializing from self managed endpoint store") - podInfoByIPProvider, err := cnireconciler.NewCNSPodInfoProvider(endpointStateStore) // get reference to endpoint state store from rest server + err = endpointStateStore.Write(restserver.EndpointStoreKey, endpointState) if err != nil { - return nil, errors.Wrap(err, "failed to create CNS PodInfoProvider") + return fmt.Errorf("failed to write endpoint state to store: %w", err) } - return podInfoByIPProvider, nil + return nil }