From dd46ffd135d0389fd30797f332be58966bdae6f2 Mon Sep 17 00:00:00 2001 From: Petri Autero Date: Tue, 25 Jan 2022 18:42:45 +0200 Subject: [PATCH] Fault tolerance for eks deploy (#154) * Initial implementation * Rename variable for clarity * Update README.md * [skip ci] Update function documentation Co-authored-by: Yoriyasu Yano <430092+yorinasub17@users.noreply.github.com> * Refactor deploy_state * [skip ci] Fix logging on state file delete * Documentation improvements and code cleanup * Reduce code complexity, remove extra asg struct * [skip ci] add more logging * use ASG references instead of values * Fix calculation of maxRetries * Remove extra assertion Co-authored-by: Yoriyasu Yano <430092+yorinasub17@users.noreply.github.com> --- README.md | 9 + cmd/eks.go | 9 + eks/asg.go | 55 +++--- eks/deploy.go | 134 ++++----------- eks/deploy_state.go | 360 +++++++++++++++++++++++++++++++++++++++ eks/deploy_state_test.go | 100 +++++++++++ eks/drain.go | 2 +- 7 files changed, 543 insertions(+), 126 deletions(-) create mode 100644 eks/deploy_state.go create mode 100644 eks/deploy_state_test.go diff --git a/README.md b/README.md index c3f46ad..8b166f5 100644 --- a/README.md +++ b/README.md @@ -256,6 +256,15 @@ Currently `kubergrunt` does not implement any checks for these resources to be i plan to bake in checks into the deployment command to verify that all services have a disruption budget set, and warn the user of any services that do not have a check. +**`eks deploy` recovery file** + +Due to the nature of rolling update, the `deploy` subcommand performs multiple sequential actions that +depend on success of the previous operations. To mitigate intermittent failures, the `deploy` subcommand creates a +recovery file in the working directory for storing current deploy state. The recovery file is updated after +each stage and if the `deploy` subcommand fails for some reason, execution resumes from the last successful state. +The existing recovery file can also be ignored with the `--ignore-recovery-file` flag. In this case the recovery +file will be re-initialized. + #### sync-core-components This subcommand will sync the core components of an EKS cluster to match the deployed Kubernetes version by following diff --git a/cmd/eks.go b/cmd/eks.go index e183cd6..e2f84e0 100644 --- a/cmd/eks.go +++ b/cmd/eks.go @@ -24,6 +24,10 @@ var ( Name: "wait", Usage: "Whether or not to wait for the command to succeed.", } + ignoreRecoveryFileFlag = cli.BoolFlag{ + Name: "ignore-recovery-file", + Usage: "Ignore existing recovery file and start deploy process from the beginning.", + } eksKubectlContextNameFlag = cli.StringFlag{ Name: KubectlContextNameFlagName, Usage: "The name to use for the config context that is set up to authenticate with the EKS cluster. Defaults to the cluster ARN.", @@ -230,6 +234,8 @@ Note that to minimize service disruption from this command, your services should This command includes retry loops for certain stages (e.g waiting for the ASG to scale up). This retry loop is configurable with the options --max-retries and --sleep-between-retries. The command will try up to --max-retries times, sleeping for the duration specified by --sleep-between-retries inbetween each failed attempt. If max-retries is unspecified, this command will use a value that translates to a total wait time of 5 minutes per wave of ASG, where each wave is 10 instances. For example, if the number of instances in the ASG is 15 instances, this translates to 2 waves, which leads to a total wait time of 10 minutes. To achieve a 10 minute wait time with the default sleep between retries (15 seconds), the max retries needs to be set to 40. + +As the deploy command contains multiple stages, this command also generates a recovery file (.kubergrunt.state) containing the current deploy state in the working directory. The state file is used to resume the deploy operation from the point of failure, and is automatically deleted upon completion of the command. You can optionally ignore the state file with --ignore-recovery-file flag, which will generate a new recovery file. `, Action: rollOutDeployment, Flags: []cli.Flag{ @@ -245,6 +251,7 @@ If max-retries is unspecified, this command will use a value that translates to deleteLocalDataFlag, waitMaxRetriesFlag, waitSleepBetweenRetriesFlag, + ignoreRecoveryFileFlag, }, }, cli.Command{ @@ -405,6 +412,7 @@ func rollOutDeployment(cliContext *cli.Context) error { drainTimeout := cliContext.Duration(drainTimeoutFlag.Name) deleteLocalData := cliContext.Bool(deleteLocalDataFlag.Name) + ignoreRecoveryFile := cliContext.Bool(ignoreRecoveryFileFlag.Name) waitMaxRetries := cliContext.Int(waitMaxRetriesFlag.Name) waitSleepBetweenRetries := cliContext.Duration(waitSleepBetweenRetriesFlag.Name) @@ -416,6 +424,7 @@ func rollOutDeployment(cliContext *cli.Context) error { deleteLocalData, waitMaxRetries, waitSleepBetweenRetries, + ignoreRecoveryFile, ) } diff --git a/eks/asg.go b/eks/asg.go index 02f4046..b920a66 100644 --- a/eks/asg.go +++ b/eks/asg.go @@ -32,51 +32,67 @@ func GetAsgByName(svc *autoscaling.AutoScaling, asgName string) (*autoscaling.Gr return groups[0], nil } -// scaleUp will scale the ASG up and wait until all the nodes are available. Specifically: -// - Set the desired capacity on the ASG -// - Wait for the capacity in the ASG to meet the desired capacity (instances are launched) -// - Wait for the new instances to be ready in Kubernetes -// - Wait for the new instances to be registered with external load balancers +// scaleUp will scale the ASG up, wait until all the nodes are available and return new instance IDs. func scaleUp( asgSvc *autoscaling.AutoScaling, - ec2Svc *ec2.EC2, - elbSvc *elb.ELB, - elbv2Svc *elbv2.ELBV2, - kubectlOptions *kubectl.KubectlOptions, asgName string, + originalInstanceIds []string, desiredCapacity int64, - oldInstanceIds []string, maxRetries int, sleepBetweenRetries time.Duration, -) error { +) ([]string, error) { logger := logging.GetProjectLogger() + err := setAsgCapacity(asgSvc, asgName, desiredCapacity) if err != nil { logger.Errorf("Failed to set ASG capacity to %d", desiredCapacity) logger.Errorf("If the capacity is set in AWS, undo by lowering back to the original capacity. If the capacity is not yet set, triage the error message below and try again.") - return err + return nil, err } + + // All of the following are read operations and do not affect the state, so it's safe to run these + // each time we execute err = waitForCapacity(asgSvc, asgName, maxRetries, sleepBetweenRetries) if err != nil { logger.Errorf("Timed out waiting for ASG to reach steady state.") - // TODO: can we use stages to pick up from here? logger.Errorf("Undo by terminating all the new instances and trying again") - return err + return nil, err } - newInstanceIds, err := getLaunchedInstanceIds(asgSvc, asgName, oldInstanceIds) + + newInstanceIds, err := getLaunchedInstanceIds(asgSvc, asgName, originalInstanceIds) if err != nil { logger.Errorf("Error retrieving information about the ASG") // TODO: can we use stages to pick up from here? logger.Errorf("Undo by terminating all the new instances and trying again") - return err + return nil, err } - instances, err := instanceDetailsFromIds(ec2Svc, newInstanceIds) + + return newInstanceIds, nil +} + +// waitAndVerifyNewInstances will scale the ASG up and wait until all the nodes are available. Specifically: +// - Wait for the capacity in the ASG to meet the desired capacity (instances are launched) +// - Wait for the new instances to be ready in Kubernetes +// - Wait for the new instances to be registered with external load balancers +func waitAndVerifyNewInstances( + ec2Svc *ec2.EC2, + elbSvc *elb.ELB, + elbv2Svc *elbv2.ELBV2, + instanceIds []string, + kubectlOptions *kubectl.KubectlOptions, + maxRetries int, + sleepBetweenRetries time.Duration, +) error { + logger := logging.GetProjectLogger() + + instances, err := instanceDetailsFromIds(ec2Svc, instanceIds) if err != nil { logger.Errorf("Error retrieving detailed about the instances") // TODO: can we use stages to pick up from here? logger.Errorf("Undo by terminating all the new instances and trying again") return err } + eksKubeNodeNames := kubeNodeNamesFromInstances(instances) err = kubectl.WaitForNodesReady( kubectlOptions, @@ -86,21 +102,18 @@ func scaleUp( ) if err != nil { logger.Errorf("Timed out waiting for the instances to reach ready state in Kubernetes.") - // TODO: can we use stages to pick up from here? logger.Errorf("Undo by terminating all the new instances and trying again") return err } elbs, err := kubectl.GetAWSLoadBalancers(kubectlOptions) if err != nil { logger.Errorf("Error retrieving associated ELB names of the Kubernetes services.") - // TODO: can we use stages to pick up from here? logger.Errorf("Undo by terminating all the new instances and trying again") return err } - err = waitForAnyInstancesRegisteredToELB(elbSvc, elbv2Svc, elbs, newInstanceIds) + err = waitForAnyInstancesRegisteredToELB(elbSvc, elbv2Svc, elbs, instanceIds) if err != nil { logger.Errorf("Timed out waiting for the instances to register to the Service ELBs.") - // TODO: can we use stages to pick up from here? logger.Errorf("Undo by terminating all the new instances and trying again") return err } diff --git a/eks/deploy.go b/eks/deploy.go index a8e540e..b63ee5b 100644 --- a/eks/deploy.go +++ b/eks/deploy.go @@ -2,7 +2,6 @@ package eks import ( "math" - "strings" "time" "github.com/aws/aws-sdk-go/service/autoscaling" @@ -16,12 +15,6 @@ import ( "github.com/gruntwork-io/kubergrunt/logging" ) -type asgInfo struct { - originalCapacity int64 - maxSize int64 - currentInstanceIDs []string -} - // RollOutDeployment will perform a zero downtime roll out of the current launch configuration associated with the // provided ASG in the provided EKS cluster. This is accomplished by: // 1. Double the desired capacity of the Auto Scaling Group that powers the EKS Cluster. This will launch new EKS @@ -32,7 +25,7 @@ type asgInfo struct { // rescheduled on the new EKS workers. // 5. Wait for all the pods to migrate off of the old EKS workers. // 6. Set the desired capacity down to the original value and remove the old EKS workers from the ASG. -// TODO feature request: Break up into stages/checkpoints, and store state along the way so that command can pick up +// The process is broken up into stages/checkpoints, state is stored along the way so that command can pick up // from a stage if something bad happens. func RollOutDeployment( region string, @@ -42,6 +35,7 @@ func RollOutDeployment( deleteLocalData bool, maxRetries int, sleepBetweenRetries time.Duration, + ignoreRecoveryFile bool, ) (returnErr error) { logger := logging.GetProjectLogger() logger.Infof("Beginning roll out for EKS cluster worker group %s in %s", eksAsgName, region) @@ -57,134 +51,66 @@ func RollOutDeployment( elbv2Svc := elbv2.New(sess) logger.Infof("Successfully authenticated with AWS") - // Retrieve the ASG object and gather required info we will need later - asgInfo, err := getAsgInfo(asgSvc, eksAsgName) + stateFile := defaultStateFile + + // Retrieve state if one exists or construct a new one + state, err := initDeployState(stateFile, ignoreRecoveryFile, maxRetries, sleepBetweenRetries) if err != nil { return err } - // Calculate default max retries - if maxRetries == 0 { - maxRetries = getDefaultMaxRetries(asgInfo.originalCapacity, sleepBetweenRetries) - logger.Infof( - "No max retries set. Defaulted to %d based on sleep between retries duration of %s and scale up count %d.", - maxRetries, - sleepBetweenRetries, - asgInfo.originalCapacity, - ) + err = state.gatherASGInfo(asgSvc, []string{eksAsgName}) + if err != nil { + return err } - // Make sure ASG is in steady state - if asgInfo.originalCapacity != int64(len(asgInfo.currentInstanceIDs)) { - logger.Infof("Ensuring ASG is in steady state (current capacity = desired capacity)") - err = waitForCapacity(asgSvc, eksAsgName, maxRetries, sleepBetweenRetries) - if err != nil { - logger.Error("Error waiting for ASG to reach steady state. Try again after the ASG is in a steady state.") - return err - } - logger.Infof("Verified ASG is in steady state (current capacity = desired capacity)") - asgInfo, err = getAsgInfo(asgSvc, eksAsgName) - if err != nil { - return err - } + err = state.setMaxCapacity(asgSvc) + if err != nil { + return err } - // Make sure there is enough max size capacity to scale up - maxCapacityForUpdate := asgInfo.originalCapacity * 2 - if asgInfo.maxSize < maxCapacityForUpdate { - // Make sure we attempt to restore the original ASG max size at the end of the function, regardless of error. - defer func() { - err := setAsgMaxSize(asgSvc, eksAsgName, asgInfo.maxSize) - // Only return error from this routine if we are not already bubbling an error back from previous calls in - // the function. - if err != nil && returnErr == nil { - returnErr = err - } - }() - - // Update the ASG max size to have enough capacity to handle the update. - err := setAsgMaxSize(asgSvc, eksAsgName, maxCapacityForUpdate) - if err != nil { - return err - } + err = state.scaleUp(asgSvc) + if err != nil { + return err } - logger.Infof("Starting with the following list of instances in ASG:") - logger.Infof("%s", strings.Join(asgInfo.currentInstanceIDs, ",")) - - logger.Infof("Launching new nodes with new launch config on ASG %s", eksAsgName) - err = scaleUp( - asgSvc, - ec2Svc, - elbSvc, - elbv2Svc, - kubectlOptions, - eksAsgName, - maxCapacityForUpdate, - asgInfo.currentInstanceIDs, - maxRetries, - sleepBetweenRetries, - ) + err = state.waitForNodes(ec2Svc, elbSvc, elbv2Svc, kubectlOptions) if err != nil { return err } - logger.Infof("Successfully launched new nodes with new launch config on ASG %s", eksAsgName) - logger.Infof("Cordoning old instances in cluster ASG %s to prevent Pod scheduling", eksAsgName) - err = cordonNodesInAsg(ec2Svc, kubectlOptions, asgInfo.currentInstanceIDs) + err = state.cordonNodes(ec2Svc, kubectlOptions) if err != nil { - logger.Errorf("Error while cordoning nodes.") - logger.Errorf("Continue to cordon nodes that failed manually, and then terminate the underlying instances to complete the rollout.") return err } - logger.Infof("Successfully cordoned old instances in cluster ASG %s", eksAsgName) - logger.Infof("Draining Pods on old instances in cluster ASG %s", eksAsgName) - err = drainNodesInAsg(ec2Svc, kubectlOptions, asgInfo.currentInstanceIDs, drainTimeout, deleteLocalData) + err = state.drainNodes(ec2Svc, kubectlOptions, drainTimeout, deleteLocalData) if err != nil { - logger.Errorf("Error while draining nodes.") - logger.Errorf("Continue to drain nodes that failed manually, and then terminate the underlying instances to complete the rollout.") return err } - logger.Infof("Successfully drained all scheduled Pods on old instances in cluster ASG %s", eksAsgName) - logger.Infof("Removing old nodes from ASG %s", eksAsgName) - err = detachInstances(asgSvc, eksAsgName, asgInfo.currentInstanceIDs) + err = state.detachInstances(asgSvc) if err != nil { - logger.Errorf("Error while detaching the old instances.") - logger.Errorf("Continue to detach the old instances and then terminate the underlying instances to complete the rollout.") return err } - err = terminateInstances(ec2Svc, asgInfo.currentInstanceIDs) + + err = state.terminateInstances(ec2Svc) if err != nil { - logger.Errorf("Error while terminating the old instances.") - logger.Errorf("Continue to terminate the underlying instances to complete the rollout.") return err } - logger.Infof("Successfully removed old nodes from ASG %s", eksAsgName) - logger.Infof("Successfully finished roll out for EKS cluster worker group %s in %s", eksAsgName, region) - return nil -} + err = state.restoreCapacity(asgSvc) + if err != nil { + return err + } -// Retrieves current state of the ASG and returns the original Capacity and the IDs of the instances currently -// associated with it. -func getAsgInfo(asgSvc *autoscaling.AutoScaling, asgName string) (asgInfo, error) { - logger := logging.GetProjectLogger() - logger.Infof("Retrieving current ASG info") - asg, err := GetAsgByName(asgSvc, asgName) + err = state.delete() if err != nil { - return asgInfo{}, err + logger.Warnf("Error deleting state file %s: %s", stateFile, err.Error()) + logger.Warn("Remove the file manually") } - originalCapacity := *asg.DesiredCapacity - maxSize := *asg.MaxSize - currentInstances := asg.Instances - currentInstanceIDs := idsFromAsgInstances(currentInstances) - logger.Infof("Successfully retrieved current ASG info.") - logger.Infof("\tCurrent desired capacity: %d", originalCapacity) - logger.Infof("\tCurrent max size: %d", maxSize) - logger.Infof("\tCurrent capacity: %d", len(currentInstances)) - return asgInfo{originalCapacity: originalCapacity, maxSize: maxSize, currentInstanceIDs: currentInstanceIDs}, nil + logger.Infof("Successfully finished roll out for EKS cluster worker group %s in %s", eksAsgName, region) + return nil } // Calculates the default max retries based on a heuristic of 5 minutes per wave. This assumes that the ASG scales up in diff --git a/eks/deploy_state.go b/eks/deploy_state.go new file mode 100644 index 0000000..4e18fa5 --- /dev/null +++ b/eks/deploy_state.go @@ -0,0 +1,360 @@ +package eks + +import ( + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/elb" + "github.com/aws/aws-sdk-go/service/elbv2" + "github.com/gruntwork-io/go-commons/errors" + "github.com/gruntwork-io/kubergrunt/kubectl" + "github.com/gruntwork-io/kubergrunt/logging" + "github.com/sirupsen/logrus" + "io/ioutil" + "k8s.io/apimachinery/pkg/util/json" + "os" + "strings" + "time" +) + +// Store the state to current directory by default +const defaultStateFile = "./.kubergrunt.state" + +// DeployState is a basic state machine representing current state of eks deploy subcommand. +// The entire deploy flow is split into multiple sub-stages and state is persisted after each stage. +type DeployState struct { + GatherASGInfoDone bool + SetMaxCapacityDone bool + ScaleUpDone bool + WaitForNodesDone bool + CordonNodesDone bool + DrainNodesDone bool + DetachInstancesDone bool + TerminateInstancesDone bool + RestoreCapacityDone bool + + Path string + ASGs []ASG + + maxRetries int + sleepBetweenRetries time.Duration + + logger *logrus.Entry +} + +// ASG represents the Auto Scaling Group currently being worked on. +type ASG struct { + Name string + OriginalCapacity int64 + MaxCapacityForUpdate int64 + OriginalMaxCapacity int64 + OriginalInstances []string + NewInstances []string +} + +// initDeployState initializes DeployState struct by either reading existing state file from disk, +// or if one doesn't exist, create a new one. Does not persist the state to disk. +func initDeployState(file string, ignoreExistingFile bool, maxRetries int, sleepBetweenRetries time.Duration) (*DeployState, error) { + logger := logging.GetProjectLogger() + var deployState *DeployState + + if ignoreExistingFile { + logger.Info("Ignore existing state file.") + deployState = newDeployState(file) + } else { + logger.Debugf("Looking for existing recovery file %s", file) + data, err := ioutil.ReadFile(file) + if err != nil { + logger.Debugf("No state present, creating new: %s", err.Error()) + deployState = newDeployState(file) + } else { + var parsedState DeployState + err = json.Unmarshal(data, &parsedState) + if err != nil { + return nil, err + } + deployState = &parsedState + } + } + + deployState.logger = logger + deployState.maxRetries = maxRetries + deployState.sleepBetweenRetries = sleepBetweenRetries + + return deployState, nil +} + +// persist saves the DeployState struct to disk +func (state *DeployState) persist() error { + file := state.Path + state.logger.Debugf("storing state file %s", file) + + data, err := json.Marshal(state) + + if err != nil { + return errors.WithStackTrace(err) + } + + err = ioutil.WriteFile(file, data, 0644) + if err != nil { + return errors.WithStackTrace(err) + } + return nil +} + +// delete deletes the DeployState struct from disk +func (state *DeployState) delete() error { + file := state.Path + state.logger.Debugf("Deleting state file %s", file) + + err := os.Remove(file) + + if err != nil { + return errors.WithStackTrace(err) + } + + return nil +} + +// newDeployState creates an empty DeployState struct +func newDeployState(path string) *DeployState { + return &DeployState{ + Path: path, + ASGs: []ASG{}, + } +} + +// gatherASGInfo gathers information about the Auto Scaling group currently being worked on. It ensures +// that the ASG is fully operational with all requested instances running and saves the original configuration +// (incl. max size, original capacity, instance IDs, etc.) that will be used in subsequent stages +func (state *DeployState) gatherASGInfo(asgSvc *autoscaling.AutoScaling, eksAsgNames []string) error { + if state.GatherASGInfoDone { + // Even when we've collected the ASG info, we have to ensure max retries is set + state.maxRetries = ensureMaxRetries(state.maxRetries, state.sleepBetweenRetries, state.ASGs[0].OriginalCapacity) + state.logger.Debug("ASG Info already gathered - skipping") + return nil + } + eksAsgName := eksAsgNames[0] + // Retrieve the ASG object and gather required info we will need later + asgInfo, err := getAsgInfo(asgSvc, eksAsgName) + if err != nil { + return err + } + + // Calculate default max retries + state.maxRetries = ensureMaxRetries(state.maxRetries, state.sleepBetweenRetries, asgInfo.OriginalCapacity) + + // Make sure ASG is in steady state + if asgInfo.OriginalCapacity != int64(len(asgInfo.OriginalInstances)) { + state.logger.Infof("Ensuring ASG is in steady state (current capacity = desired capacity)") + err = waitForCapacity(asgSvc, eksAsgName, state.maxRetries, state.sleepBetweenRetries) + if err != nil { + state.logger.Error("Error waiting for ASG to reach steady state. Try again after the ASG is in a steady state.") + return err + } + state.logger.Infof("Verified ASG is in steady state (current capacity = desired capacity)") + asgInfo, err = getAsgInfo(asgSvc, eksAsgName) + if err != nil { + return err + } + } + + state.GatherASGInfoDone = true + state.ASGs = append(state.ASGs, asgInfo) + return state.persist() +} + +// ensureMaxRetries ensures we always have a proper value for maxRetries, either set by the end user +// or calculated based on the original capacity +func ensureMaxRetries(maxRetries int, sleepBetweenRetries time.Duration, originalCapacity int64) int { + logger := logging.GetProjectLogger() + // Calculate default max retries + if maxRetries == 0 { + defaultMaxRetries := getDefaultMaxRetries(originalCapacity, sleepBetweenRetries) + logger.Infof( + "No max retries set. Defaulted to %d based on sleep between retries duration of %s and scale up count %d.", + defaultMaxRetries, + sleepBetweenRetries, + originalCapacity, + ) + return defaultMaxRetries + } else { + return maxRetries + } +} + +// setMaxCapacity will set the max size of the auto scaling group. +func (state *DeployState) setMaxCapacity(asgSvc *autoscaling.AutoScaling) error { + if state.SetMaxCapacityDone { + state.logger.Debug("Max capacity already set - skipping") + return nil + } + asg := &state.ASGs[0] + maxCapacityForUpdate := asg.OriginalCapacity * 2 + if asg.OriginalMaxCapacity < maxCapacityForUpdate { + err := setAsgMaxSize(asgSvc, asg.Name, maxCapacityForUpdate) + if err != nil { + return err + } + } + asg.MaxCapacityForUpdate = maxCapacityForUpdate + state.SetMaxCapacityDone = true + return state.persist() +} + +// scaleUp will scale up the ASG and wait until all the nodes are available. +func (state *DeployState) scaleUp(asgSvc *autoscaling.AutoScaling) error { + if state.ScaleUpDone { + state.logger.Debug("Scale up already done - skipping") + return nil + } + asg := &state.ASGs[0] + state.logger.Info("Starting with the following list of instances in ASG:") + state.logger.Infof("%s", strings.Join(asg.OriginalInstances, ",")) + + state.logger.Infof("Launching new nodes with new launch config on ASG %s", asg.Name) + newInstanceIds, err := scaleUp(asgSvc, asg.Name, asg.OriginalInstances, asg.MaxCapacityForUpdate, state.maxRetries, state.sleepBetweenRetries) + if err != nil { + return err + } + state.logger.Infof("Successfully launched new nodes with new launch config on ASG %s", asg.Name) + state.ScaleUpDone = true + asg.NewInstances = newInstanceIds + return state.persist() +} + +// waitForNodes will wait until all the new nodes are available. Specifically: +// - Wait for the capacity in the ASG to meet the desired capacity (instances are launched) +// - Wait for the new instances to be ready in Kubernetes +// - Wait for the new instances to be registered with external load balancers +func (state *DeployState) waitForNodes(ec2Svc *ec2.EC2, elbSvc *elb.ELB, elbv2Svc *elbv2.ELBV2, kubectlOptions *kubectl.KubectlOptions) error { + if state.WaitForNodesDone { + state.logger.Debug("Wait for nodes already done - skipping") + return nil + } + asg := &state.ASGs[0] + err := waitAndVerifyNewInstances(ec2Svc, elbSvc, elbv2Svc, asg.NewInstances, kubectlOptions, state.maxRetries, state.sleepBetweenRetries) + if err != nil { + state.logger.Errorf("Error while waiting for new nodes to be ready.") + state.logger.Errorf("Either resume with the recovery file or terminate the new instances.") + return err + } + state.logger.Infof("Successfully confirmed new nodes were launched with new launch config on ASG %s", asg.Name) + state.WaitForNodesDone = true + return state.persist() +} + +// cordonNodes will cordon all the original nodes in the ASG so that Kubernetes won't schedule new Pods on them. +func (state *DeployState) cordonNodes(ec2Svc *ec2.EC2, kubectlOptions *kubectl.KubectlOptions) error { + if state.CordonNodesDone { + state.logger.Debug("Nodes already cordoned - skipping") + return nil + } + asg := &state.ASGs[0] + state.logger.Infof("Cordoning old instances in cluster ASG %s to prevent Pod scheduling", asg.Name) + err := cordonNodesInAsg(ec2Svc, kubectlOptions, asg.OriginalInstances) + if err != nil { + state.logger.Errorf("Error while cordoning nodes.") + state.logger.Errorf("Either resume with the recovery file or continue to cordon nodes that failed manually, and then terminate the underlying instances to complete the rollout.") + return err + } + state.logger.Infof("Successfully cordoned old instances in cluster ASG %s", asg.Name) + state.CordonNodesDone = true + return state.persist() +} + +// drainNodes drains all the original nodes in Kubernetes. +func (state *DeployState) drainNodes(ec2Svc *ec2.EC2, kubectlOptions *kubectl.KubectlOptions, drainTimeout time.Duration, deleteLocalData bool) error { + if state.DrainNodesDone { + state.logger.Debug("Nodes already drained - skipping") + return nil + } + asg := &state.ASGs[0] + state.logger.Infof("Draining Pods on old instances in cluster ASG %s", asg.Name) + err := drainNodesInAsg(ec2Svc, kubectlOptions, asg.OriginalInstances, drainTimeout, deleteLocalData) + if err != nil { + state.logger.Errorf("Error while draining nodes.") + state.logger.Errorf("Either resume with the recovery file or continue to drain nodes that failed manually, and then terminate the underlying instances to complete the rollout.") + return err + } + state.logger.Infof("Successfully drained all scheduled Pods on old instances in cluster ASG %s", asg.Name) + state.DrainNodesDone = true + return state.persist() +} + +// detachInstances detaches the original instances from the ASG and auto decrements the ASG desired capacity +func (state *DeployState) detachInstances(asgSvc *autoscaling.AutoScaling) error { + if state.DetachInstancesDone { + state.logger.Debug("Instances already detached - skipping") + return nil + } + asg := &state.ASGs[0] + state.logger.Infof("Removing old nodes from ASG %s: %s", asg.Name, strings.Join(asg.OriginalInstances, ",")) + err := detachInstances(asgSvc, asg.Name, asg.OriginalInstances) + if err != nil { + state.logger.Errorf("Error while detaching the old instances.") + state.logger.Errorf("Either resume with the recovery file or continue to detach the old instances and then terminate the underlying instances to complete the rollout.") + return err + } + state.DetachInstancesDone = true + return state.persist() +} + +// terminateInstances terminates the original instances in the ASG. +func (state *DeployState) terminateInstances(ec2Svc *ec2.EC2) error { + if state.TerminateInstancesDone { + state.logger.Debug("Instances already terminated - skipping") + return nil + } + asg := &state.ASGs[0] + state.logger.Infof("Terminating old nodes: %s", strings.Join(asg.OriginalInstances, ",")) + err := terminateInstances(ec2Svc, asg.OriginalInstances) + if err != nil { + state.logger.Errorf("Error while terminating the old instances.") + state.logger.Errorf("Either resume with the recovery file or continue to terminate the underlying instances to complete the rollout.") + return err + } + state.logger.Infof("Successfully removed old nodes from ASG %s", asg.Name) + state.TerminateInstancesDone = true + return state.persist() +} + +// restoreCapacity restores the max size of the ASG to its original value. +func (state *DeployState) restoreCapacity(asgSvc *autoscaling.AutoScaling) error { + if state.RestoreCapacityDone { + state.logger.Debug("Capacity already restored - skipping") + return nil + } + asg := &state.ASGs[0] + err := setAsgMaxSize(asgSvc, asg.Name, asg.OriginalMaxCapacity) + if err != nil { + state.logger.Errorf("Error while restoring ASG %s max size to %v.", asg.Name, asg.OriginalMaxCapacity) + state.logger.Errorf("Either resume with the recovery file or adjust ASG max size manually to complete the rollout.") + return err + } + return state.persist() +} + +// Retrieves current state of the ASG and returns the original Capacity and the IDs of the instances currently +// associated with it. +func getAsgInfo(asgSvc *autoscaling.AutoScaling, asgName string) (ASG, error) { + logger := logging.GetProjectLogger() + logger.Infof("Retrieving current ASG info") + asg, err := GetAsgByName(asgSvc, asgName) + if err != nil { + return ASG{}, err + } + originalCapacity := *asg.DesiredCapacity + maxSize := *asg.MaxSize + currentInstances := asg.Instances + currentInstanceIDs := idsFromAsgInstances(currentInstances) + logger.Infof("Successfully retrieved current ASG info.") + logger.Infof("\tCurrent desired capacity: %d", originalCapacity) + logger.Infof("\tCurrent max size: %d", maxSize) + logger.Infof("\tCurrent capacity: %d", len(currentInstances)) + return ASG{ + Name: asgName, + OriginalCapacity: originalCapacity, + OriginalMaxCapacity: maxSize, + OriginalInstances: currentInstanceIDs, + }, nil +} diff --git a/eks/deploy_state_test.go b/eks/deploy_state_test.go new file mode 100644 index 0000000..2d4860f --- /dev/null +++ b/eks/deploy_state_test.go @@ -0,0 +1,100 @@ +package eks + +import ( + "github.com/gruntwork-io/kubergrunt/logging" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/url" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestParseNonExistingDeployState(t *testing.T) { + t.Parallel() + fileName := "./.na" + state, err := initDeployState(fileName, false, 3, 30*time.Second) + require.NoError(t, err) + defer os.Remove(fileName) + + assert.Equal(t, fileName, state.Path) + assert.Equal(t, 3, state.maxRetries) + assert.Equal(t, 30*time.Second, state.sleepBetweenRetries) + + assert.False(t, state.SetMaxCapacityDone) + assert.False(t, state.TerminateInstancesDone) + assert.False(t, state.GatherASGInfoDone) + assert.False(t, state.RestoreCapacityDone) + assert.False(t, state.DrainNodesDone) + assert.False(t, state.CordonNodesDone) + assert.False(t, state.DetachInstancesDone) + assert.False(t, state.WaitForNodesDone) + assert.False(t, state.ScaleUpDone) +} + +func TestParseExistingDeployState(t *testing.T) { + t.Parallel() + + stateFile := generateTempStateFile(t) + state, err := initDeployState(stateFile, false, 3, 30*time.Second) + require.NoError(t, err) + defer os.Remove(stateFile) + + assert.True(t, state.GatherASGInfoDone) + assert.False(t, state.SetMaxCapacityDone) + assert.Equal(t, 1, len(state.ASGs)) + assert.Equal(t, 3, state.maxRetries) + assert.Equal(t, 30*time.Second, state.sleepBetweenRetries) + + asg := state.ASGs[0] + + assert.Equal(t, "my-test-asg", asg.Name) + assert.Equal(t, int64(2), asg.OriginalCapacity) + assert.Equal(t, int64(4), asg.OriginalMaxCapacity) + assert.Equal(t, 2, len(asg.OriginalInstances)) + assert.Equal(t, 1, len(asg.NewInstances)) +} + +func TestParseExistingDeployStateIgnoreCurrent(t *testing.T) { + t.Parallel() + + stateFile := generateTempStateFile(t) + state, err := initDeployState(stateFile, true, 3, 30*time.Second) + require.NoError(t, err) + defer os.Remove(stateFile) + + assert.False(t, state.GatherASGInfoDone) + assert.Equal(t, 0, len(state.ASGs)) +} + +func generateTempStateFile(t *testing.T) string { + escapedTestName := url.PathEscape(t.Name()) + tmpfile, err := ioutil.TempFile("", escapedTestName) + require.NoError(t, err) + defer tmpfile.Close() + + asg := ASG{ + Name: "my-test-asg", + OriginalCapacity: 2, + OriginalMaxCapacity: 4, + OriginalInstances: []string{ + "instance-1", + "instance-2", + }, + NewInstances: []string{ + "instance-3", + }, + } + + state := &DeployState{ + logger: logging.GetProjectLogger(), + GatherASGInfoDone: true, + Path: tmpfile.Name(), + ASGs: []ASG{asg}, + } + + state.persist() + return tmpfile.Name() +} diff --git a/eks/drain.go b/eks/drain.go index 4f2c55d..a1fee49 100644 --- a/eks/drain.go +++ b/eks/drain.go @@ -41,7 +41,7 @@ func DrainASG( if err != nil { return err } - allInstanceIDs = append(allInstanceIDs, asgInfo.currentInstanceIDs...) + allInstanceIDs = append(allInstanceIDs, asgInfo.OriginalInstances...) } logger.Infof("Found %d instances across all requested ASGs.", len(allInstanceIDs))