Skip to content

Commit

Permalink
Refactored upgrade process
Browse files Browse the repository at this point in the history
  • Loading branch information
willie-yao committed Aug 10, 2023
1 parent f415da6 commit a4da10e
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 96 deletions.
101 changes: 52 additions & 49 deletions internal/controllers/topology/cluster/desired_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *Reconciler) computeDesiredState(ctx context.Context, s *scope.Scope) (*
// If required, compute the desired state of the MachinePools from the list of MachinePoolTopologies
// defined in the cluster.
if s.Blueprint.HasMachinePools() {
desiredState.MachinePools, err = computeMachinePools(ctx, s, desiredState.ControlPlane)
desiredState.MachinePools, err = r.computeMachinePools(ctx, s)
if err != nil {
return nil, errors.Wrapf(err, "failed to compute MachinePools")
}
Expand Down Expand Up @@ -881,17 +881,12 @@ func isMachineDeploymentDeferred(clusterTopology *clusterv1.Topology, mdTopology
}

// computeMachinePools computes the desired state of the list of MachinePools.
func computeMachinePools(ctx context.Context, s *scope.Scope, desiredControlPlaneState *scope.ControlPlaneState) (scope.MachinePoolsStateMap, error) {
// Mark all the machine pools that are currently rolling out.
// This captured information will be used for
// - Building the TopologyReconciled condition.
// - Making upgrade decisions on machine pools.
s.UpgradeTracker.MachinePools.MarkRollingOut(s.Current.MachinePools.RollingOut()...)
func (r *Reconciler) computeMachinePools(ctx context.Context, s *scope.Scope) (scope.MachinePoolsStateMap, error) {
machinePoolsStateMap := make(scope.MachinePoolsStateMap)
for _, mpTopology := range s.Blueprint.Topology.Workers.MachinePools {
desiredMachinePool, err := computeMachinePool(ctx, s, desiredControlPlaneState, mpTopology)
desiredMachinePool, err := computeMachinePool(ctx, s, mpTopology)
if err != nil {
return nil, errors.Wrapf(err, "failed to compute MachinePool for topology %q", mpTopology.Name)
return nil, errors.Wrapf(err, "failed to compute MachineDepoyment for topology %q", mpTopology.Name)
}
machinePoolsStateMap[mpTopology.Name] = desiredMachinePool
}
Expand All @@ -901,7 +896,7 @@ func computeMachinePools(ctx context.Context, s *scope.Scope, desiredControlPlan
// computeMachinePool computes the desired state for a MachinePoolTopology.
// The generated machinePool object is calculated using the values from the machinePoolTopology and
// the machinePool class.
func computeMachinePool(_ context.Context, s *scope.Scope, desiredControlPlaneState *scope.ControlPlaneState, machinePoolTopology clusterv1.MachinePoolTopology) (*scope.MachinePoolState, error) {
func computeMachinePool(_ context.Context, s *scope.Scope, machinePoolTopology clusterv1.MachinePoolTopology) (*scope.MachinePoolState, error) {
desiredMachinePool := &scope.MachinePoolState{}

// Gets the blueprint for the MachinePool class.
Expand Down Expand Up @@ -972,7 +967,7 @@ func computeMachinePool(_ context.Context, s *scope.Scope, desiredControlPlaneSt
// Add ClusterTopologyMachinePoolLabel to the generated InfrastructureMachinePool object
infraMachinePoolObjectLabels[clusterv1.ClusterTopologyMachinePoolNameLabel] = machinePoolTopology.Name
desiredMachinePool.InfrastructureMachinePoolObject.SetLabels(infraMachinePoolObjectLabels)
version := computeMachinePoolVersion(s, desiredControlPlaneState, machinePoolTopology, currentMachinePool)
version := computeMachinePoolVersion(s, machinePoolTopology, currentMachinePool)

// Compute values that can be set both in the MachinePoolClass and in the MachinePoolTopology
failureDomains := machinePoolClass.FailureDomains
Expand Down Expand Up @@ -1070,14 +1065,12 @@ func computeMachinePool(_ context.Context, s *scope.Scope, desiredControlPlaneSt
// computeMachinePoolVersion calculates the version of the desired machine pool.
// The version is calculated using the state of the current machine pools,
// the current control plane and the version defined in the topology.
// Nb: No MachinePool upgrades will be triggered while any MachinePool is in the middle
// of an upgrade. Even if the number of MachinePools that are being upgraded is less
// than the number of allowed concurrent upgrades.
func computeMachinePoolVersion(s *scope.Scope, desiredControlPlaneState *scope.ControlPlaneState, machinePoolTopology clusterv1.MachinePoolTopology, currentMPState *scope.MachinePoolState) string {
func computeMachinePoolVersion(s *scope.Scope, machinePoolTopology clusterv1.MachinePoolTopology, currentMPState *scope.MachinePoolState) string {
desiredVersion := s.Blueprint.Topology.Version
// If creating a new machine pool, we can pick up the desired version
// Note: We are not blocking the creation of new machine pools when
// the control plane or any of the machine pools are upgrading/scaling.
// If creating a new machine pool, mark it as pending if the control plane is not
// yet stable. Creating a new MP while the control plane is upgrading can lead to unexpected race conditions.
// Example: join could fail if the load balancers are slow in detecting when CP machines are
// being deleted.
if currentMPState == nil || currentMPState.Object == nil {
if !isControlPlaneStable(s) || s.HookResponseTracker.IsBlocking(runtimehooksv1.AfterControlPlaneUpgrade) {
s.UpgradeTracker.MachinePools.MarkPendingCreate(machinePoolTopology.Name)
Expand All @@ -1094,6 +1087,13 @@ func computeMachinePoolVersion(s *scope.Scope, desiredControlPlaneState *scope.C
return currentVersion
}

// Return early if the upgrade for the MachinePool is deferred.
if isMachinePoolDeferred(s.Blueprint.Topology, machinePoolTopology) {
s.UpgradeTracker.MachinePools.MarkDeferredUpgrade(currentMPState.Object.Name)
s.UpgradeTracker.MachinePools.MarkPendingUpgrade(currentMPState.Object.Name)
return currentVersion
}

// Return early if the AfterControlPlaneUpgrade hook returns a blocking response.
if s.HookResponseTracker.IsBlocking(runtimehooksv1.AfterControlPlaneUpgrade) {
s.UpgradeTracker.MachinePools.MarkPendingUpgrade(currentMPState.Object.Name)
Expand All @@ -1107,49 +1107,52 @@ func computeMachinePoolVersion(s *scope.Scope, desiredControlPlaneState *scope.C
}

// Return early if the Control Plane is not stable. Do not pick up the desiredVersion yet.
// Return the current version of the machine deployment. We will pick up the new version after the control
// Return the current version of the machine pool. We will pick up the new version after the control
// plane is stable.
if !isControlPlaneStable(s) {
s.UpgradeTracker.MachinePools.MarkPendingUpgrade(currentMPState.Object.Name)
return currentVersion
}

// Check if we are about to upgrade the control plane. In that case, do not upgrade the machine pool yet.
// Wait for the new upgrade operation on the control plane to finish before picking up the new version for the
// machine pool.
currentCPVersion, err := contract.ControlPlane().Version().Get(s.Current.ControlPlane.Object)
if err != nil {
return currentVersion
}
desiredCPVersion, err := contract.ControlPlane().Version().Get(desiredControlPlaneState.Object)
if err != nil {
return currentVersion
}
if *currentCPVersion != *desiredCPVersion {
// The versions of the current and desired control planes do no match,
// implies we are about to upgrade the control plane.
s.UpgradeTracker.MachinePools.MarkPendingUpgrade(currentMPState.Object.Name)
return currentVersion
// Control plane and machine pools are stable.
// Ready to pick up the topology version.
s.UpgradeTracker.MachinePools.MarkUpgrading(currentMPState.Object.Name)
return desiredVersion
}

// isMachinePoolDeferred returns true if the upgrade for the mpTopology is deferred.
// This is the case when either:
// - the mpTopology has the ClusterTopologyDeferUpgradeAnnotation annotation.
// - the mpTopology has the ClusterTopologyHoldUpgradeSequenceAnnotation annotation.
// - another mp topology which is before mpTopology in the workers.machinePools list has the
// ClusterTopologyHoldUpgradeSequenceAnnotation annotation.
func isMachinePoolDeferred(clusterTopology *clusterv1.Topology, mpTopology clusterv1.MachinePoolTopology) bool {
// If mpTopology has the ClusterTopologyDeferUpgradeAnnotation annotation => mp is deferred.
if _, ok := mpTopology.Metadata.Annotations[clusterv1.ClusterTopologyDeferUpgradeAnnotation]; ok {
return true
}

// If the ControlPlane is pending picking up an upgrade then do not pick up the new version yet.
if s.UpgradeTracker.ControlPlane.IsPendingUpgrade {
s.UpgradeTracker.MachinePools.MarkPendingUpgrade(currentMPState.Object.Name)
return currentVersion
// If mpTopology has the ClusterTopologyHoldUpgradeSequenceAnnotation annotation => mp is deferred.
if _, ok := mpTopology.Metadata.Annotations[clusterv1.ClusterTopologyHoldUpgradeSequenceAnnotation]; ok {
return true
}

// At this point the control plane is stable (not scaling, not upgrading, not being upgraded).
// Checking to see if the machine pool are also stable.
// If any of the MachinePools is rolling out, do not upgrade the machine pool yet.
if s.Current.MachinePools.IsAnyRollingOut() {
s.UpgradeTracker.MachinePools.MarkPendingUpgrade(currentMPState.Object.Name)
return currentVersion
for _, mp := range clusterTopology.Workers.MachinePools {
// If another mp topology with the ClusterTopologyHoldUpgradeSequenceAnnotation annotation
// is found before the mpTopology => mp is deferred.
if _, ok := mp.Metadata.Annotations[clusterv1.ClusterTopologyHoldUpgradeSequenceAnnotation]; ok {
return true
}

// If mpTopology is found before a mp topology with the ClusterTopologyHoldUpgradeSequenceAnnotation
// annotation => mp is not deferred.
if mp.Name == mpTopology.Name {
return false
}
}

// Control plane and machine pools are stable.
// Ready to pick up the topology version.
s.UpgradeTracker.MachinePools.MarkRollingOut(currentMPState.Object.Name)
return desiredVersion
// This case should be impossible as mpTopology should have been found in workers.machinePools.
return false
}

type templateToInput struct {
Expand Down
67 changes: 45 additions & 22 deletions internal/controllers/topology/cluster/scope/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/util/labels/format"
)

// ClusterState holds all the objects representing the state of a managed Cluster topology.
Expand Down Expand Up @@ -131,22 +132,20 @@ func (md *MachineDeploymentState) IsUpgrading(ctx context.Context, c client.Clie
// MachinePoolsStateMap holds a collection of MachinePool states.
type MachinePoolsStateMap map[string]*MachinePoolState

// RollingOut returns the list of the machine pools
// that are rolling out.
func (mps MachinePoolsStateMap) RollingOut() []string {
// Upgrading returns the list of the machine pools
// that are upgrading.
func (mps MachinePoolsStateMap) Upgrading(ctx context.Context, c client.Client) ([]string, error) {
names := []string{}
for _, mp := range mps {
if mp.IsRollingOut() {
upgrading, err := mp.IsUpgrading(ctx, c)
if err != nil {
return nil, errors.Wrap(err, "failed to list upgrading MachinePools")
}
if upgrading {
names = append(names, mp.Object.Name)
}
}
return names
}

// IsAnyRollingOut returns true if at least one of the
// machine deployments is rolling out. False, otherwise.
func (mps MachinePoolsStateMap) IsAnyRollingOut() bool {
return len(mps.RollingOut()) != 0
return names, nil
}

// MachinePoolState holds all the objects representing the state of a managed pool.
Expand All @@ -161,15 +160,39 @@ type MachinePoolState struct {
InfrastructureMachinePoolObject *unstructured.Unstructured
}

// IsRollingOut determines if the machine pool is upgrading.
// A machine pool is considered upgrading if:
// - if any of the replicas of the machine pool is not ready.
func (mp *MachinePoolState) IsRollingOut() bool {
return !rollOutComplete(mp.Object, &mp.Object.Status) || *mp.Object.Spec.Replicas != mp.Object.Status.ReadyReplicas
}

func rollOutComplete(mp *expv1.MachinePool, newStatus *expv1.MachinePoolStatus) bool {
return newStatus.Replicas == *(mp.Spec.Replicas) &&
newStatus.AvailableReplicas == *(mp.Spec.Replicas) &&
newStatus.ObservedGeneration >= mp.Generation
// IsUpgrading determines if the MachinePool is upgrading.
// A machine deployment is considered upgrading if at least one of the Machines of this
// MachinePool has a different version.
func (mp *MachinePoolState) IsUpgrading(ctx context.Context, c client.Client) (bool, error) {
// If the MachinePool has no version there is no definitive way to check if it is upgrading. Therefore, return false.
// Note: This case should not happen.
if mp.Object.Spec.Template.Spec.Version == nil {
return false, nil
}
infraMachineSelector := metav1.LabelSelector{
MatchLabels: map[string]string{
clusterv1.MachinePoolNameLabel: format.MustFormatValue(mp.Object.Name),
clusterv1.ClusterNameLabel: mp.Object.Spec.ClusterName,
},
}
selectorMap, err := metav1.LabelSelectorAsMap(&infraMachineSelector)
if err != nil {
return false, errors.Wrapf(err, "failed to check if MachinePool %s is upgrading: failed to convert label selector to map", mp.Object.Name)
}
machinePools := &expv1.MachinePoolList{}
if err := c.List(ctx, machinePools, client.InNamespace(mp.Object.Namespace), client.MatchingLabels(selectorMap)); err != nil {
return false, errors.Wrapf(err, "failed to check if MachinePool %s is upgrading: failed to list MachinePools", mp.Object.Name)
}
mpVersion := *mp.Object.Spec.Template.Spec.Version
// Check if the versions of the all the MachinePoolMachines match the MachinePool version.
for i := range machinePools.Items {
machinePool := machinePools.Items[i]
if machinePool.Spec.Template.Spec.Version == nil {
return false, fmt.Errorf("failed to check if MachinePool %s is upgrading: MachinePool %s has no version", mp.Object.Name, machinePool.Name)
}
if *machinePool.Spec.Template.Spec.Version != mpVersion {
return true, nil
}
}
return false, nil
}
50 changes: 25 additions & 25 deletions internal/controllers/topology/cluster/scope/upgradetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func NewUpgradeTracker(opts ...UpgradeTrackerOption) *UpgradeTracker {
},
MachinePools: MachinePoolUpgradeTracker{
pendingCreateTopologyNames: sets.Set[string]{},
pendingRollingOutNames: sets.Set[string]{},
pendingUpgradeNames: sets.Set[string]{},
deferredNames: sets.Set[string]{},
rollingOutNames: sets.Set[string]{},
maxMachinePoolRollOutConcurrency: options.maxMPUpgradeConcurrency,
upgradingNames: sets.Set[string]{},
maxMachinePoolUpgradeConcurrency: options.maxMPUpgradeConcurrency,
},
}
}
Expand Down Expand Up @@ -255,48 +255,48 @@ type MachinePoolUpgradeTracker struct {
// names can keep changing for each reconcile loop leading to continuous updates to the TopologyReconciled condition.
pendingCreateTopologyNames sets.Set[string]

// pendingRollingOutNames is the set of MachinePool names that are not going to pick up the new version
// pendingUpgradeNames is the set of MachinePool names that are not going to pick up the new version
// in the current reconcile loop.
// By marking a MachinePool as pendingRollingOut we skip reconciling the MachinePool.
pendingRollingOutNames sets.Set[string]
// By marking a MachinePool as pendingUpgrade we skip reconciling the MachinePool.
pendingUpgradeNames sets.Set[string]

// deferredNames is the set of MachinePool names that are not going to pick up the new version
// in the current reconcile loop because they are deferred by the user.
// Note: If a MachinePool is marked as deferred it should also be marked as pendingUpgrade.
deferredNames sets.Set[string]

// rollingOutNames is the set of MachinePool names that are rolling out. This set contains the names of
// MachinePools that are currently rolling out and the names of MachinePools that will pick up the upgrade
// upgradingNames is the set of MachinePool names that are upgrading. This set contains the names of
// MachinePools that are currently upgrading and the names of MachinePools that will pick up the upgrade
// in the current reconcile loop.
// Note: This information is used to:
// - decide if ControlPlane can be upgraded.
// - calculate MachinePool upgrade concurrency.
// - update TopologyReconciled Condition.
// - decide if the AfterClusterUpgrade hook can be called.
rollingOutNames sets.Set[string]
upgradingNames sets.Set[string]

// maxMachinePoolRollOutConcurrency defines the maximum number of MachinePools that should be in an
// rolling out state. This includes the MachinePools that are currently rolling out and the MachinePools that
// maxMachinePoolUpgradeConcurrency defines the maximum number of MachinePools that should be in an
// upgrading state. This includes the MachinePools that are currently upgrading and the MachinePools that
// will start the upgrade after the current reconcile loop.
maxMachinePoolRollOutConcurrency int
maxMachinePoolUpgradeConcurrency int
}

// MarkRollingOut marks a MachinePool as currently rolling out or about to upgrade.
func (m *MachinePoolUpgradeTracker) MarkRollingOut(names ...string) {
// MarkUpgrading marks a MachinePool as currently upgrading or about to upgrade.
func (m *MachinePoolUpgradeTracker) MarkUpgrading(names ...string) {
for _, name := range names {
m.rollingOutNames.Insert(name)
m.upgradingNames.Insert(name)
}
}

// RollingOutNames returns the list of machine pools that are rolling out or
// are about to roll out.
func (m *MachinePoolUpgradeTracker) RollingOutNames() []string {
return sets.List(m.rollingOutNames)
// UpgradingNames returns the list of machine pools that are upgrading or
// are about to upgrade.
func (m *MachinePoolUpgradeTracker) UpgradingNames() []string {
return sets.List(m.upgradingNames)
}

// UpgradeConcurrencyReached returns true if the number of MachinePools rolling out is at the concurrency limit.
// UpgradeConcurrencyReached returns true if the number of MachinePools upgrading is at the concurrency limit.
func (m *MachinePoolUpgradeTracker) UpgradeConcurrencyReached() bool {
return m.rollingOutNames.Len() >= m.maxMachinePoolRollOutConcurrency
return m.upgradingNames.Len() >= m.maxMachinePoolUpgradeConcurrency
}

// MarkPendingCreate marks a machine pool topology that is pending to be created.
Expand Down Expand Up @@ -327,24 +327,24 @@ func (m *MachinePoolUpgradeTracker) PendingCreateTopologyNames() []string {
// This is generally used to capture machine pools that have not yet
// picked up the topology version.
func (m *MachinePoolUpgradeTracker) MarkPendingUpgrade(name string) {
m.pendingRollingOutNames.Insert(name)
m.pendingUpgradeNames.Insert(name)
}

// IsPendingUpgrade returns true is the MachinePool marked as pending upgrade.
func (m *MachinePoolUpgradeTracker) IsPendingUpgrade(name string) bool {
return m.pendingRollingOutNames.Has(name)
return m.pendingUpgradeNames.Has(name)
}

// IsAnyPendingUpgrade returns true if any of the machine pools are pending
// an upgrade. Returns false, otherwise.
func (m *MachinePoolUpgradeTracker) IsAnyPendingUpgrade() bool {
return len(m.pendingRollingOutNames) != 0
return len(m.pendingUpgradeNames) != 0
}

// PendingUpgradeNames returns the list of machine pool names that
// are pending an upgrade.
func (m *MachinePoolUpgradeTracker) PendingUpgradeNames() []string {
return sets.List(m.pendingRollingOutNames)
return sets.List(m.pendingUpgradeNames)
}

// MarkDeferredUpgrade marks that the upgrade for a MachinePool
Expand Down

0 comments on commit a4da10e

Please sign in to comment.