Skip to content

Commit

Permalink
Adding a MigrateSate() function to the cnireconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
behzad-mir committed Feb 8, 2024
1 parent ac43bb2 commit ea0be88
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 41 deletions.
37 changes: 22 additions & 15 deletions cns/cnireconciler/podinfoprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@ import (

// NewCNIPodInfoProvider returns an implementation of cns.PodInfoByIPProvider
// that execs out to the CNI and uses the response to build the PodInfo map.
// if EnableStateMigration flag is set to true it will also returns a map of containerID->EndpointInfo
func NewCNIPodInfoProvider(enableStateMigration bool) (cns.PodInfoByIPProvider, map[string]*restserver.EndpointInfo, error) {
podInfoByIPProvider, cniState, err := newCNIPodInfoProvider(exec.New())
if enableStateMigration {
endpointState := cniStateToCnsEndpointState(cniState)
return podInfoByIPProvider, endpointState, err
}
return podInfoByIPProvider, nil, err
func NewCNIPodInfoProvider() (cns.PodInfoByIPProvider, error) {
return newCNIPodInfoProvider(exec.New())
}

func NewCNSPodInfoProvider(endpointStore store.KeyValueStore) (cns.PodInfoByIPProvider, error) {
Expand All @@ -48,18 +42,15 @@ func newCNSPodInfoProvider(endpointStore store.KeyValueStore) (cns.PodInfoByIPPr
}), nil
}

func newCNIPodInfoProvider(exc exec.Interface) (cns.PodInfoByIPProvider, *api.AzureCNIState, error) {
cli := client.New(exc)
func newCNIPodInfoProvider(exec exec.Interface) (cns.PodInfoByIPProvider, error) {
cli := client.New(exec)
state, err := cli.GetEndpointState()
if err != nil {
return nil, nil, fmt.Errorf("failed to invoke CNI client.GetEndpointState(): %w", err)
}
for containerID, endpointInfo := range state.ContainerInterfaces {
logger.Printf("state dump from CNI: [%+v], [%+v]", containerID, endpointInfo)
return nil, fmt.Errorf("failed to invoke CNI client.GetEndpointState(): %w", err)
}
return cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) {
return cniStateToPodInfoByIP(state)
}), state, err
}), nil
}

// cniStateToPodInfoByIP converts an AzureCNIState dumped from a CNI exec
Expand All @@ -71,6 +62,7 @@ func cniStateToPodInfoByIP(state *api.AzureCNIState) (map[string]cns.PodInfo, er
for _, endpoint := range state.ContainerInterfaces {
for _, epIP := range endpoint.IPAddresses {
podInfo := cns.NewPodInfo(endpoint.ContainerID, endpoint.PodEndpointId, endpoint.PodName, endpoint.PodNamespace)

ipKey := epIP.IP.String()
if prevPodInfo, ok := podInfoByIP[ipKey]; ok {
return nil, errors.Wrapf(cns.ErrDuplicateIP, "duplicate ip %s found for different pods: pod: %+v, pod: %+v", ipKey, podInfo, prevPodInfo)
Expand Down Expand Up @@ -113,6 +105,21 @@ func endpointStateToPodInfoByIP(state map[string]*restserver.EndpointInfo) (map[
return podInfoByIP, nil
}

// MigrateCNISate returns an endpoint state of CNS by reading the CNI state file
func MigrateCNISate() (map[string]*restserver.EndpointInfo, error) {
return migrateCNISate(exec.New())
}

func migrateCNISate(exc exec.Interface) (map[string]*restserver.EndpointInfo, error) {
cli := client.New(exc)
state, err := cli.GetEndpointState()
if err != nil {
return nil, fmt.Errorf("failed to invoke CNI client.GetEndpointState(): %w", err)
}
endpointState := cniStateToCnsEndpointState(state)
return endpointState, nil
}

// cniStateToCnsEndpointState converts an AzureCNIState dumped from a CNI exec
// into a EndpointInfo map, using the containerID as keys in the map.
// The map then will be saved on CNS endpoint state
Expand Down
4 changes: 2 additions & 2 deletions cns/cnireconciler/podinfoprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ func TestNewCNIPodInfoProvider(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
got, endpointState, err := newCNIPodInfoProvider(tt.exec)
got, err := newCNIPodInfoProvider(tt.exec)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
podInfoByIP, _ := got.PodInfoByIP()
assert.Equal(t, tt.want, podInfoByIP, endpointState)
assert.Equal(t, tt.want, podInfoByIP)
})
}
}
Expand Down
46 changes: 22 additions & 24 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,14 +1234,23 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
switch {
case cnsconfig.ManageEndpointState:
logger.Printf("Initializing from self managed endpoint store")
podInfoByIPProvider, err = InitializeStateFromCNS(cnsconfig, httpRestServiceImplementation.EndpointStateStore)
if cnsconfig.EnableStateMigration && !httpRestServiceImplementation.EndpointStateStore.Exists() { // perform state migration
err = PopulateCNSEndpointState(httpRestServiceImplementation.EndpointStateStore)
}
if err != nil {
return errors.Wrap(err, "failed to create CNI PodInfoProvider")
return errors.Wrap(err, "failed to create CNS EndpointState From CNI")
}
podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state")
} else {
return errors.Wrap(err, "failed to create CNS PodInfoProvider")
}
}

case cnsconfig.InitializeFromCNI:
logger.Printf("Initializing from CNI")
podInfoByIPProvider, _, err = cnireconciler.NewCNIPodInfoProvider(false)
podInfoByIPProvider, err = cnireconciler.NewCNIPodInfoProvider()
if err != nil {
return errors.Wrap(err, "failed to create CNI PodInfoProvider")
}
Expand Down Expand Up @@ -1514,27 +1523,16 @@ func createOrUpdateNodeInfoCRD(ctx context.Context, restConfig *rest.Config, nod
return nil
}

// InitializeStateFromCNS initilizes CNS Endpoint State from CNS or perform the Migration of state from statefull CNI.
func InitializeStateFromCNS(cnsconfig *configuration.CNSConfig, endpointStateStore store.KeyValueStore) (cns.PodInfoByIPProvider, error) {
if cnsconfig.EnableStateMigration && !endpointStateStore.Exists() { // initilize form CNI and perform state migration
logger.Printf("StatelessCNI Migration is enabled")
logger.Printf("initializing from Statefull CNI")
var endpointState map[string]*restserver.EndpointInfo
podInfoByIPProvider, endpointState, err := cnireconciler.NewCNIPodInfoProvider(cnsconfig.EnableStateMigration)
if err != nil {
return nil, errors.Wrap(err, "failed to create CNI PodInfoProvider")
}
err = endpointStateStore.Write(restserver.EndpointStoreKey, endpointState)
if err != nil {
return nil, fmt.Errorf("failed to write endpoint state to store: %w", err)
}
return podInfoByIPProvider, nil
// PopulateCNSEndpointState initilizes CNS Endpoint State by Migrating the CNI state.
func PopulateCNSEndpointState(endpointStateStore store.KeyValueStore) error {
logger.Printf("State Migration is enabled")
endpointState, err := cnireconciler.MigrateCNISate()
if err != nil {
return errors.Wrap(err, "failed to create CNS Endpoint state from CNI")
}
// initilize form CNS and avoid state migration
logger.Printf("Initializing from self managed endpoint store")
podInfoByIPProvider, err := cnireconciler.NewCNSPodInfoProvider(endpointStateStore) // get reference to endpoint state store from rest server
err = endpointStateStore.Write(restserver.EndpointStoreKey, endpointState)
if err != nil {
return nil, errors.Wrap(err, "failed to create CNS PodInfoProvider")
return fmt.Errorf("failed to write endpoint state to store: %w", err)
}
return podInfoByIPProvider, nil
return nil
}

0 comments on commit ea0be88

Please sign in to comment.