Skip to content

Commit

Permalink
Fault tolerance for eks deploy (#154)
Browse files Browse the repository at this point in the history
* Initial implementation

* Rename variable for clarity

* Update README.md

* [skip ci] Update function documentation

Co-authored-by: Yoriyasu Yano <430092+yorinasub17@users.noreply.github.com>

* Refactor deploy_state

* [skip ci] Fix logging on state file delete

* Documentation improvements and code cleanup

* Reduce code complexity, remove extra asg struct

* [skip ci] add more logging

* use ASG references instead of values

* Fix calculation of maxRetries

* Remove extra assertion

Co-authored-by: Yoriyasu Yano <430092+yorinasub17@users.noreply.github.com>
  • Loading branch information
autero1 and yorinasub17 authored Jan 25, 2022
1 parent f1dba88 commit dd46ffd
Show file tree
Hide file tree
Showing 7 changed files with 543 additions and 126 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@ Currently `kubergrunt` does not implement any checks for these resources to be i
plan to bake in checks into the deployment command to verify that all services have a disruption budget set, and warn
the user of any services that do not have a check.

**`eks deploy` recovery file**

Due to the nature of rolling update, the `deploy` subcommand performs multiple sequential actions that
depend on success of the previous operations. To mitigate intermittent failures, the `deploy` subcommand creates a
recovery file in the working directory for storing current deploy state. The recovery file is updated after
each stage and if the `deploy` subcommand fails for some reason, execution resumes from the last successful state.
The existing recovery file can also be ignored with the `--ignore-recovery-file` flag. In this case the recovery
file will be re-initialized.

#### sync-core-components

This subcommand will sync the core components of an EKS cluster to match the deployed Kubernetes version by following
Expand Down
9 changes: 9 additions & 0 deletions cmd/eks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ var (
Name: "wait",
Usage: "Whether or not to wait for the command to succeed.",
}
ignoreRecoveryFileFlag = cli.BoolFlag{
Name: "ignore-recovery-file",
Usage: "Ignore existing recovery file and start deploy process from the beginning.",
}
eksKubectlContextNameFlag = cli.StringFlag{
Name: KubectlContextNameFlagName,
Usage: "The name to use for the config context that is set up to authenticate with the EKS cluster. Defaults to the cluster ARN.",
Expand Down Expand Up @@ -230,6 +234,8 @@ Note that to minimize service disruption from this command, your services should
This command includes retry loops for certain stages (e.g waiting for the ASG to scale up). This retry loop is configurable with the options --max-retries and --sleep-between-retries. The command will try up to --max-retries times, sleeping for the duration specified by --sleep-between-retries inbetween each failed attempt.
If max-retries is unspecified, this command will use a value that translates to a total wait time of 5 minutes per wave of ASG, where each wave is 10 instances. For example, if the number of instances in the ASG is 15 instances, this translates to 2 waves, which leads to a total wait time of 10 minutes. To achieve a 10 minute wait time with the default sleep between retries (15 seconds), the max retries needs to be set to 40.
As the deploy command contains multiple stages, this command also generates a recovery file (.kubergrunt.state) containing the current deploy state in the working directory. The state file is used to resume the deploy operation from the point of failure, and is automatically deleted upon completion of the command. You can optionally ignore the state file with --ignore-recovery-file flag, which will generate a new recovery file.
`,
Action: rollOutDeployment,
Flags: []cli.Flag{
Expand All @@ -245,6 +251,7 @@ If max-retries is unspecified, this command will use a value that translates to
deleteLocalDataFlag,
waitMaxRetriesFlag,
waitSleepBetweenRetriesFlag,
ignoreRecoveryFileFlag,
},
},
cli.Command{
Expand Down Expand Up @@ -405,6 +412,7 @@ func rollOutDeployment(cliContext *cli.Context) error {

drainTimeout := cliContext.Duration(drainTimeoutFlag.Name)
deleteLocalData := cliContext.Bool(deleteLocalDataFlag.Name)
ignoreRecoveryFile := cliContext.Bool(ignoreRecoveryFileFlag.Name)
waitMaxRetries := cliContext.Int(waitMaxRetriesFlag.Name)
waitSleepBetweenRetries := cliContext.Duration(waitSleepBetweenRetriesFlag.Name)

Expand All @@ -416,6 +424,7 @@ func rollOutDeployment(cliContext *cli.Context) error {
deleteLocalData,
waitMaxRetries,
waitSleepBetweenRetries,
ignoreRecoveryFile,
)
}

Expand Down
55 changes: 34 additions & 21 deletions eks/asg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,51 +32,67 @@ func GetAsgByName(svc *autoscaling.AutoScaling, asgName string) (*autoscaling.Gr
return groups[0], nil
}

// scaleUp will scale the ASG up and wait until all the nodes are available. Specifically:
// - Set the desired capacity on the ASG
// - Wait for the capacity in the ASG to meet the desired capacity (instances are launched)
// - Wait for the new instances to be ready in Kubernetes
// - Wait for the new instances to be registered with external load balancers
// scaleUp will scale the ASG up, wait until all the nodes are available and return new instance IDs.
func scaleUp(
asgSvc *autoscaling.AutoScaling,
ec2Svc *ec2.EC2,
elbSvc *elb.ELB,
elbv2Svc *elbv2.ELBV2,
kubectlOptions *kubectl.KubectlOptions,
asgName string,
originalInstanceIds []string,
desiredCapacity int64,
oldInstanceIds []string,
maxRetries int,
sleepBetweenRetries time.Duration,
) error {
) ([]string, error) {
logger := logging.GetProjectLogger()

err := setAsgCapacity(asgSvc, asgName, desiredCapacity)
if err != nil {
logger.Errorf("Failed to set ASG capacity to %d", desiredCapacity)
logger.Errorf("If the capacity is set in AWS, undo by lowering back to the original capacity. If the capacity is not yet set, triage the error message below and try again.")
return err
return nil, err
}

// All of the following are read operations and do not affect the state, so it's safe to run these
// each time we execute
err = waitForCapacity(asgSvc, asgName, maxRetries, sleepBetweenRetries)
if err != nil {
logger.Errorf("Timed out waiting for ASG to reach steady state.")
// TODO: can we use stages to pick up from here?
logger.Errorf("Undo by terminating all the new instances and trying again")
return err
return nil, err
}
newInstanceIds, err := getLaunchedInstanceIds(asgSvc, asgName, oldInstanceIds)

newInstanceIds, err := getLaunchedInstanceIds(asgSvc, asgName, originalInstanceIds)
if err != nil {
logger.Errorf("Error retrieving information about the ASG")
// TODO: can we use stages to pick up from here?
logger.Errorf("Undo by terminating all the new instances and trying again")
return err
return nil, err
}
instances, err := instanceDetailsFromIds(ec2Svc, newInstanceIds)

return newInstanceIds, nil
}

// waitAndVerifyNewInstances will scale the ASG up and wait until all the nodes are available. Specifically:
// - Wait for the capacity in the ASG to meet the desired capacity (instances are launched)
// - Wait for the new instances to be ready in Kubernetes
// - Wait for the new instances to be registered with external load balancers
func waitAndVerifyNewInstances(
ec2Svc *ec2.EC2,
elbSvc *elb.ELB,
elbv2Svc *elbv2.ELBV2,
instanceIds []string,
kubectlOptions *kubectl.KubectlOptions,
maxRetries int,
sleepBetweenRetries time.Duration,
) error {
logger := logging.GetProjectLogger()

instances, err := instanceDetailsFromIds(ec2Svc, instanceIds)
if err != nil {
logger.Errorf("Error retrieving detailed about the instances")
// TODO: can we use stages to pick up from here?
logger.Errorf("Undo by terminating all the new instances and trying again")
return err
}

eksKubeNodeNames := kubeNodeNamesFromInstances(instances)
err = kubectl.WaitForNodesReady(
kubectlOptions,
Expand All @@ -86,21 +102,18 @@ func scaleUp(
)
if err != nil {
logger.Errorf("Timed out waiting for the instances to reach ready state in Kubernetes.")
// TODO: can we use stages to pick up from here?
logger.Errorf("Undo by terminating all the new instances and trying again")
return err
}
elbs, err := kubectl.GetAWSLoadBalancers(kubectlOptions)
if err != nil {
logger.Errorf("Error retrieving associated ELB names of the Kubernetes services.")
// TODO: can we use stages to pick up from here?
logger.Errorf("Undo by terminating all the new instances and trying again")
return err
}
err = waitForAnyInstancesRegisteredToELB(elbSvc, elbv2Svc, elbs, newInstanceIds)
err = waitForAnyInstancesRegisteredToELB(elbSvc, elbv2Svc, elbs, instanceIds)
if err != nil {
logger.Errorf("Timed out waiting for the instances to register to the Service ELBs.")
// TODO: can we use stages to pick up from here?
logger.Errorf("Undo by terminating all the new instances and trying again")
return err
}
Expand Down
134 changes: 30 additions & 104 deletions eks/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package eks

import (
"math"
"strings"
"time"

"github.com/aws/aws-sdk-go/service/autoscaling"
Expand All @@ -16,12 +15,6 @@ import (
"github.com/gruntwork-io/kubergrunt/logging"
)

type asgInfo struct {
originalCapacity int64
maxSize int64
currentInstanceIDs []string
}

// RollOutDeployment will perform a zero downtime roll out of the current launch configuration associated with the
// provided ASG in the provided EKS cluster. This is accomplished by:
// 1. Double the desired capacity of the Auto Scaling Group that powers the EKS Cluster. This will launch new EKS
Expand All @@ -32,7 +25,7 @@ type asgInfo struct {
// rescheduled on the new EKS workers.
// 5. Wait for all the pods to migrate off of the old EKS workers.
// 6. Set the desired capacity down to the original value and remove the old EKS workers from the ASG.
// TODO feature request: Break up into stages/checkpoints, and store state along the way so that command can pick up
// The process is broken up into stages/checkpoints, state is stored along the way so that command can pick up
// from a stage if something bad happens.
func RollOutDeployment(
region string,
Expand All @@ -42,6 +35,7 @@ func RollOutDeployment(
deleteLocalData bool,
maxRetries int,
sleepBetweenRetries time.Duration,
ignoreRecoveryFile bool,
) (returnErr error) {
logger := logging.GetProjectLogger()
logger.Infof("Beginning roll out for EKS cluster worker group %s in %s", eksAsgName, region)
Expand All @@ -57,134 +51,66 @@ func RollOutDeployment(
elbv2Svc := elbv2.New(sess)
logger.Infof("Successfully authenticated with AWS")

// Retrieve the ASG object and gather required info we will need later
asgInfo, err := getAsgInfo(asgSvc, eksAsgName)
stateFile := defaultStateFile

// Retrieve state if one exists or construct a new one
state, err := initDeployState(stateFile, ignoreRecoveryFile, maxRetries, sleepBetweenRetries)
if err != nil {
return err
}

// Calculate default max retries
if maxRetries == 0 {
maxRetries = getDefaultMaxRetries(asgInfo.originalCapacity, sleepBetweenRetries)
logger.Infof(
"No max retries set. Defaulted to %d based on sleep between retries duration of %s and scale up count %d.",
maxRetries,
sleepBetweenRetries,
asgInfo.originalCapacity,
)
err = state.gatherASGInfo(asgSvc, []string{eksAsgName})
if err != nil {
return err
}

// Make sure ASG is in steady state
if asgInfo.originalCapacity != int64(len(asgInfo.currentInstanceIDs)) {
logger.Infof("Ensuring ASG is in steady state (current capacity = desired capacity)")
err = waitForCapacity(asgSvc, eksAsgName, maxRetries, sleepBetweenRetries)
if err != nil {
logger.Error("Error waiting for ASG to reach steady state. Try again after the ASG is in a steady state.")
return err
}
logger.Infof("Verified ASG is in steady state (current capacity = desired capacity)")
asgInfo, err = getAsgInfo(asgSvc, eksAsgName)
if err != nil {
return err
}
err = state.setMaxCapacity(asgSvc)
if err != nil {
return err
}

// Make sure there is enough max size capacity to scale up
maxCapacityForUpdate := asgInfo.originalCapacity * 2
if asgInfo.maxSize < maxCapacityForUpdate {
// Make sure we attempt to restore the original ASG max size at the end of the function, regardless of error.
defer func() {
err := setAsgMaxSize(asgSvc, eksAsgName, asgInfo.maxSize)
// Only return error from this routine if we are not already bubbling an error back from previous calls in
// the function.
if err != nil && returnErr == nil {
returnErr = err
}
}()

// Update the ASG max size to have enough capacity to handle the update.
err := setAsgMaxSize(asgSvc, eksAsgName, maxCapacityForUpdate)
if err != nil {
return err
}
err = state.scaleUp(asgSvc)
if err != nil {
return err
}

logger.Infof("Starting with the following list of instances in ASG:")
logger.Infof("%s", strings.Join(asgInfo.currentInstanceIDs, ","))

logger.Infof("Launching new nodes with new launch config on ASG %s", eksAsgName)
err = scaleUp(
asgSvc,
ec2Svc,
elbSvc,
elbv2Svc,
kubectlOptions,
eksAsgName,
maxCapacityForUpdate,
asgInfo.currentInstanceIDs,
maxRetries,
sleepBetweenRetries,
)
err = state.waitForNodes(ec2Svc, elbSvc, elbv2Svc, kubectlOptions)
if err != nil {
return err
}
logger.Infof("Successfully launched new nodes with new launch config on ASG %s", eksAsgName)

logger.Infof("Cordoning old instances in cluster ASG %s to prevent Pod scheduling", eksAsgName)
err = cordonNodesInAsg(ec2Svc, kubectlOptions, asgInfo.currentInstanceIDs)
err = state.cordonNodes(ec2Svc, kubectlOptions)
if err != nil {
logger.Errorf("Error while cordoning nodes.")
logger.Errorf("Continue to cordon nodes that failed manually, and then terminate the underlying instances to complete the rollout.")
return err
}
logger.Infof("Successfully cordoned old instances in cluster ASG %s", eksAsgName)

logger.Infof("Draining Pods on old instances in cluster ASG %s", eksAsgName)
err = drainNodesInAsg(ec2Svc, kubectlOptions, asgInfo.currentInstanceIDs, drainTimeout, deleteLocalData)
err = state.drainNodes(ec2Svc, kubectlOptions, drainTimeout, deleteLocalData)
if err != nil {
logger.Errorf("Error while draining nodes.")
logger.Errorf("Continue to drain nodes that failed manually, and then terminate the underlying instances to complete the rollout.")
return err
}
logger.Infof("Successfully drained all scheduled Pods on old instances in cluster ASG %s", eksAsgName)

logger.Infof("Removing old nodes from ASG %s", eksAsgName)
err = detachInstances(asgSvc, eksAsgName, asgInfo.currentInstanceIDs)
err = state.detachInstances(asgSvc)
if err != nil {
logger.Errorf("Error while detaching the old instances.")
logger.Errorf("Continue to detach the old instances and then terminate the underlying instances to complete the rollout.")
return err
}
err = terminateInstances(ec2Svc, asgInfo.currentInstanceIDs)

err = state.terminateInstances(ec2Svc)
if err != nil {
logger.Errorf("Error while terminating the old instances.")
logger.Errorf("Continue to terminate the underlying instances to complete the rollout.")
return err
}
logger.Infof("Successfully removed old nodes from ASG %s", eksAsgName)

logger.Infof("Successfully finished roll out for EKS cluster worker group %s in %s", eksAsgName, region)
return nil
}
err = state.restoreCapacity(asgSvc)
if err != nil {
return err
}

// Retrieves current state of the ASG and returns the original Capacity and the IDs of the instances currently
// associated with it.
func getAsgInfo(asgSvc *autoscaling.AutoScaling, asgName string) (asgInfo, error) {
logger := logging.GetProjectLogger()
logger.Infof("Retrieving current ASG info")
asg, err := GetAsgByName(asgSvc, asgName)
err = state.delete()
if err != nil {
return asgInfo{}, err
logger.Warnf("Error deleting state file %s: %s", stateFile, err.Error())
logger.Warn("Remove the file manually")
}
originalCapacity := *asg.DesiredCapacity
maxSize := *asg.MaxSize
currentInstances := asg.Instances
currentInstanceIDs := idsFromAsgInstances(currentInstances)
logger.Infof("Successfully retrieved current ASG info.")
logger.Infof("\tCurrent desired capacity: %d", originalCapacity)
logger.Infof("\tCurrent max size: %d", maxSize)
logger.Infof("\tCurrent capacity: %d", len(currentInstances))
return asgInfo{originalCapacity: originalCapacity, maxSize: maxSize, currentInstanceIDs: currentInstanceIDs}, nil
logger.Infof("Successfully finished roll out for EKS cluster worker group %s in %s", eksAsgName, region)
return nil
}

// Calculates the default max retries based on a heuristic of 5 minutes per wave. This assumes that the ASG scales up in
Expand Down
Loading

0 comments on commit dd46ffd

Please sign in to comment.