Skip to content

Commit

Permalink
Set up golang context when initiating controller manager
Browse files Browse the repository at this point in the history
Set up golang context when initiating controller manager and also pass it down
to VSphereCluster controller

Signed-off-by: Gong Zhang <gongz@vmware.com>
  • Loading branch information
zhanggbj committed Sep 12, 2023
1 parent 2188e96 commit 9d02623
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 85 deletions.
2 changes: 1 addition & 1 deletion controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func setup() {
panic(fmt.Sprintf("unable to create ClusterCacheReconciler controller: %v", err))
}

if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}, controllerOpts); err != nil {
if err := AddClusterControllerToManager(ctx, testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereCluster controller: %v", err))
}
if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}, controllerOpts); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions controllers/vmware/test/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,15 @@ func getManager(cfg *rest.Config, networkProvider string) manager.Manager {

controllerOpts := controller.Options{MaxConcurrentReconciles: 10}

opts.AddToManager = func(controllerCtx *capvcontext.ControllerManagerContext, mgr ctrlmgr.Manager) error {
if err := controllers.AddClusterControllerToManager(controllerCtx, mgr, &vmwarev1.VSphereCluster{}, controllerOpts); err != nil {
opts.AddToManager = func(ctx context.Context, controllerCtx *capvcontext.ControllerManagerContext, mgr ctrlmgr.Manager) error {
if err := controllers.AddClusterControllerToManager(ctx, controllerCtx, mgr, &vmwarev1.VSphereCluster{}, controllerOpts); err != nil {
return err
}

return controllers.AddMachineControllerToManager(controllerCtx, mgr, &vmwarev1.VSphereMachine{}, controllerOpts)
}

mgr, err := manager.New(opts)
mgr, err := manager.New(ctx, opts)
Expect(err).NotTo(HaveOccurred())
return mgr
}
Expand Down
8 changes: 4 additions & 4 deletions controllers/vspherecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import (

// AddClusterControllerToManager adds the cluster controller to the provided
// manager.
func AddClusterControllerToManager(controllerCtx *capvcontext.ControllerManagerContext, mgr manager.Manager, clusterControlledType client.Object, options controller.Options) error {
func AddClusterControllerToManager(ctx context.Context, controllerCtx *capvcontext.ControllerManagerContext, mgr manager.Manager, clusterControlledType client.Object, options controller.Options) error {
supervisorBased, err := util.IsSupervisorType(clusterControlledType)
if err != nil {
return err
Expand Down Expand Up @@ -110,7 +110,7 @@ func AddClusterControllerToManager(controllerCtx *capvcontext.ControllerManagerC
ControllerContext: controllerContext,
clusterModuleReconciler: NewReconciler(controllerContext),
}
clusterToInfraFn := clusterToInfrastructureMapFunc(controllerCtx)
clusterToInfraFn := clusterToInfrastructureMapFunc(ctx, controllerCtx)
c, err := ctrl.NewControllerManagedBy(mgr).
// Watch the controlled, infrastructure resource.
For(clusterControlledType).
Expand Down Expand Up @@ -173,7 +173,7 @@ func AddClusterControllerToManager(controllerCtx *capvcontext.ControllerManagerC
return nil
}

func clusterToInfrastructureMapFunc(controllerCtx *capvcontext.ControllerManagerContext) handler.MapFunc {
func clusterToInfrastructureMapFunc(ctx context.Context, controllerCtx *capvcontext.ControllerManagerContext) handler.MapFunc {
gvk := infrav1.GroupVersion.WithKind(reflect.TypeOf(&infrav1.VSphereCluster{}).Elem().Name())
return clusterutilv1.ClusterToInfrastructureMapFunc(controllerCtx, gvk, controllerCtx.Client, &infrav1.VSphereCluster{})
return clusterutilv1.ClusterToInfrastructureMapFunc(ctx, gvk, controllerCtx.Client, &infrav1.VSphereCluster{})
}
76 changes: 39 additions & 37 deletions controllers/vspherecluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ type clusterReconciler struct {
}

// Reconcile ensures the back-end state reflects the Kubernetes resource state intent.
func (r clusterReconciler) Reconcile(_ context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
func (r clusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
// Get the VSphereCluster resource for this request.
vsphereCluster := &infrav1.VSphereCluster{}
if err := r.Client.Get(r, req.NamespacedName, vsphereCluster); err != nil {
if err := r.Client.Get(ctx, req.NamespacedName, vsphereCluster); err != nil {
if apierrors.IsNotFound(err) {
r.Logger.V(4).Info("VSphereCluster not found, won't reconcile", "key", req.NamespacedName)
return reconcile.Result{}, nil
Expand All @@ -73,7 +73,7 @@ func (r clusterReconciler) Reconcile(_ context.Context, req ctrl.Request) (_ ctr
}

// Fetch the CAPI Cluster.
cluster, err := clusterutilv1.GetOwnerCluster(r, r.Client, vsphereCluster.ObjectMeta)
cluster, err := clusterutilv1.GetOwnerCluster(ctx, r.Client, vsphereCluster.ObjectMeta)
if err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -118,13 +118,13 @@ func (r clusterReconciler) Reconcile(_ context.Context, req ctrl.Request) (_ ctr
}
}()

if err := setOwnerRefsOnVsphereMachines(clusterContext); err != nil {
if err := setOwnerRefsOnVsphereMachines(ctx, clusterContext); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to set owner refs on VSphereMachine objects")
}

// Handle deleted clusters
if !vsphereCluster.DeletionTimestamp.IsZero() {
return r.reconcileDelete(clusterContext)
return r.reconcileDelete(ctx, clusterContext)
}

// If the VSphereCluster doesn't have our finalizer, add it.
Expand All @@ -135,13 +135,13 @@ func (r clusterReconciler) Reconcile(_ context.Context, req ctrl.Request) (_ ctr
}

// Handle non-deleted clusters
return r.reconcileNormal(clusterContext)
return r.reconcileNormal(ctx, clusterContext)
}

func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
func (r clusterReconciler) reconcileDelete(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
clusterCtx.Logger.Info("Reconciling VSphereCluster delete")

vsphereMachines, err := infrautilv1.GetVSphereMachinesInCluster(clusterCtx, clusterCtx.Client, clusterCtx.Cluster.Namespace, clusterCtx.Cluster.Name)
vsphereMachines, err := infrautilv1.GetVSphereMachinesInCluster(ctx, clusterCtx.Client, clusterCtx.Cluster.Namespace, clusterCtx.Cluster.Name)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err,
"unable to list VSphereMachines part of VSphereCluster %s/%s", clusterCtx.VSphereCluster.Namespace, clusterCtx.VSphereCluster.Name)
Expand All @@ -160,10 +160,10 @@ func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContex
// Remove the finalizer since VM creation wouldn't proceed
r.Logger.Info("Removing finalizer from VSphereMachine", "namespace", vsphereMachine.Namespace, "name", vsphereMachine.Name)
ctrlutil.RemoveFinalizer(vsphereMachine, infrav1.MachineFinalizer)
if err := r.Client.Update(clusterCtx, vsphereMachine); err != nil {
if err := r.Client.Update(ctx, vsphereMachine); err != nil {
return reconcile.Result{}, err
}
if err := r.Client.Delete(clusterCtx, vsphereMachine); err != nil && !apierrors.IsNotFound(err) {
if err := r.Client.Delete(ctx, vsphereMachine); err != nil && !apierrors.IsNotFound(err) {
clusterCtx.Logger.Error(err, "Failed to delete for VSphereMachine", "namespace", vsphereMachine.Namespace, "name", vsphereMachine.Name)
deletionErrors = append(deletionErrors, err)
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContex
Namespace: clusterCtx.VSphereCluster.Namespace,
Name: clusterCtx.VSphereCluster.Spec.IdentityRef.Name,
}
err := clusterCtx.Client.Get(clusterCtx, secretKey, secret)
err := clusterCtx.Client.Get(ctx, secretKey, secret)
if err != nil {
if apierrors.IsNotFound(err) {
ctrlutil.RemoveFinalizer(clusterCtx.VSphereCluster, infrav1.ClusterFinalizer)
Expand All @@ -208,10 +208,10 @@ func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContex
if ctrlutil.ContainsFinalizer(secret, legacyIdentityFinalizer) {
ctrlutil.RemoveFinalizer(secret, legacyIdentityFinalizer)
}
if err := clusterCtx.Client.Update(clusterCtx, secret); err != nil {
if err := clusterCtx.Client.Update(ctx, secret); err != nil {
return reconcile.Result{}, err
}
if err := clusterCtx.Client.Delete(clusterCtx, secret); err != nil {
if err := clusterCtx.Client.Delete(ctx, secret); err != nil {
return reconcile.Result{}, err
}
}
Expand All @@ -222,10 +222,10 @@ func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContex
return reconcile.Result{}, nil
}

func (r clusterReconciler) reconcileNormal(clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
func (r clusterReconciler) reconcileNormal(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
clusterCtx.Logger.Info("Reconciling VSphereCluster")

ok, err := r.reconcileDeploymentZones(clusterCtx)
ok, err := r.reconcileDeploymentZones(ctx, clusterCtx)
if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -234,12 +234,12 @@ func (r clusterReconciler) reconcileNormal(clusterCtx *capvcontext.ClusterContex
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}

if err := r.reconcileIdentitySecret(clusterCtx); err != nil {
if err := r.reconcileIdentitySecret(ctx, clusterCtx); err != nil {
conditions.MarkFalse(clusterCtx.VSphereCluster, infrav1.VCenterAvailableCondition, infrav1.VCenterUnreachableReason, clusterv1.ConditionSeverityError, err.Error())
return reconcile.Result{}, err
}

vcenterSession, err := r.reconcileVCenterConnectivity(clusterCtx)
vcenterSession, err := r.reconcileVCenterConnectivity(ctx, clusterCtx)
if err != nil {
conditions.MarkFalse(clusterCtx.VSphereCluster, infrav1.VCenterAvailableCondition, infrav1.VCenterUnreachableReason, clusterv1.ConditionSeverityError, err.Error())
return reconcile.Result{}, errors.Wrapf(err,
Expand Down Expand Up @@ -276,22 +276,22 @@ func (r clusterReconciler) reconcileNormal(clusterCtx *capvcontext.ClusterContex
}

// Wait until the API server is online and accessible.
if !r.isAPIServerOnline(clusterCtx) {
if !r.isAPIServerOnline(ctx, clusterCtx) {
return reconcile.Result{}, nil
}

return reconcile.Result{}, nil
}

func (r clusterReconciler) reconcileIdentitySecret(clusterCtx *capvcontext.ClusterContext) error {
func (r clusterReconciler) reconcileIdentitySecret(ctx context.Context, clusterCtx *capvcontext.ClusterContext) error {
vsphereCluster := clusterCtx.VSphereCluster
if identity.IsSecretIdentity(vsphereCluster) {
secret := &corev1.Secret{}
secretKey := client.ObjectKey{
Namespace: vsphereCluster.Namespace,
Name: vsphereCluster.Spec.IdentityRef.Name,
}
err := clusterCtx.Client.Get(clusterCtx, secretKey, secret)
err := clusterCtx.Client.Get(ctx, secretKey, secret)
if err != nil {
return err
}
Expand All @@ -313,7 +313,7 @@ func (r clusterReconciler) reconcileIdentitySecret(clusterCtx *capvcontext.Clust
if !ctrlutil.ContainsFinalizer(secret, infrav1.SecretIdentitySetFinalizer) {
ctrlutil.AddFinalizer(secret, infrav1.SecretIdentitySetFinalizer)
}
err = r.Client.Update(clusterCtx, secret)
err = r.Client.Update(ctx, secret)
if err != nil {
return err
}
Expand All @@ -322,7 +322,7 @@ func (r clusterReconciler) reconcileIdentitySecret(clusterCtx *capvcontext.Clust
return nil
}

func (r clusterReconciler) reconcileVCenterConnectivity(clusterCtx *capvcontext.ClusterContext) (*session.Session, error) {
func (r clusterReconciler) reconcileVCenterConnectivity(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (*session.Session, error) {
params := session.NewParams().
WithServer(clusterCtx.VSphereCluster.Spec.Server).
WithThumbprint(clusterCtx.VSphereCluster.Spec.Thumbprint).
Expand All @@ -332,13 +332,13 @@ func (r clusterReconciler) reconcileVCenterConnectivity(clusterCtx *capvcontext.
})

if clusterCtx.VSphereCluster.Spec.IdentityRef != nil {
creds, err := identity.GetCredentials(clusterCtx, r.Client, clusterCtx.VSphereCluster, r.Namespace)
creds, err := identity.GetCredentials(ctx, r.Client, clusterCtx.VSphereCluster, r.Namespace)
if err != nil {
return nil, err
}

params = params.WithUserInfo(creds.Username, creds.Password)
return session.GetOrCreate(clusterCtx, params)
return session.GetOrCreate(ctx, params)
}

params = params.WithUserInfo(clusterCtx.Username, clusterCtx.Password)
Expand All @@ -354,7 +354,7 @@ func (r clusterReconciler) reconcileVCenterVersion(clusterCtx *capvcontext.Clust
return nil
}

func (r clusterReconciler) reconcileDeploymentZones(clusterCtx *capvcontext.ClusterContext) (bool, error) {
func (r clusterReconciler) reconcileDeploymentZones(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (bool, error) {
// If there is no failure domain selector, we should simply skip it
if clusterCtx.VSphereCluster.Spec.FailureDomainSelector == nil {
return true, nil
Expand All @@ -368,7 +368,7 @@ func (r clusterReconciler) reconcileDeploymentZones(clusterCtx *capvcontext.Clus
}

var deploymentZoneList infrav1.VSphereDeploymentZoneList
err = r.Client.List(clusterCtx, &deploymentZoneList, &opts)
err = r.Client.List(ctx, &deploymentZoneList, &opts)
if err != nil {
return false, errors.Wrap(err, "unable to list deployment zones")
}
Expand Down Expand Up @@ -438,9 +438,10 @@ func (r clusterReconciler) reconcileVSphereClusterWhenAPIServerIsOnline(clusterC
}
apiServerTriggers[clusterCtx.Cluster.UID] = struct{}{}
go func() {
ctx := context.Background()
// Block until the target API server is online.
clusterCtx.Logger.Info("start polling API server for online check")
wait.PollUntilContextCancel(context.Background(), time.Second*1, true, func(context.Context) (bool, error) { return r.isAPIServerOnline(clusterCtx), nil }) //nolint:errcheck
wait.PollUntilContextCancel(ctx, time.Second*1, true, func(context.Context) (bool, error) { return r.isAPIServerOnline(ctx, clusterCtx), nil }) //nolint:errcheck
clusterCtx.Logger.Info("stop polling API server for online check")
clusterCtx.Logger.Info("triggering GenericEvent", "reason", "api-server-online")
eventChannel := clusterCtx.GetGenericEventChannelFor(clusterCtx.VSphereCluster.GetObjectKind().GroupVersionKind())
Expand All @@ -452,17 +453,17 @@ func (r clusterReconciler) reconcileVSphereClusterWhenAPIServerIsOnline(clusterC
// remove the key from the map that prevents multiple goroutines from
// polling the API server to see if it is online.
clusterCtx.Logger.Info("start polling for control plane initialized")
wait.PollUntilContextCancel(context.Background(), time.Second*1, true, func(context.Context) (bool, error) { return r.isControlPlaneInitialized(clusterCtx), nil }) //nolint:errcheck
wait.PollUntilContextCancel(ctx, time.Second*1, true, func(context.Context) (bool, error) { return r.isControlPlaneInitialized(ctx, clusterCtx), nil }) //nolint:errcheck
clusterCtx.Logger.Info("stop polling for control plane initialized")
apiServerTriggersMu.Lock()
delete(apiServerTriggers, clusterCtx.Cluster.UID)
apiServerTriggersMu.Unlock()
}()
}

func (r clusterReconciler) isAPIServerOnline(clusterCtx *capvcontext.ClusterContext) bool {
if kubeClient, err := infrautilv1.NewKubeClient(clusterCtx, clusterCtx.Client, clusterCtx.Cluster); err == nil {
if _, err := kubeClient.CoreV1().Nodes().List(clusterCtx, metav1.ListOptions{}); err == nil {
func (r clusterReconciler) isAPIServerOnline(ctx context.Context, clusterCtx *capvcontext.ClusterContext) bool {
if kubeClient, err := infrautilv1.NewKubeClient(ctx, clusterCtx.Client, clusterCtx.Cluster); err == nil {
if _, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}); err == nil {
// The target cluster is online. To make sure the correct control
// plane endpoint information is logged, it is necessary to fetch
// an up-to-date Cluster resource. If this fails, then set the
Expand All @@ -471,7 +472,7 @@ func (r clusterReconciler) isAPIServerOnline(clusterCtx *capvcontext.ClusterCont
// if the API server is online.
cluster := &clusterv1.Cluster{}
clusterKey := client.ObjectKey{Namespace: clusterCtx.Cluster.Namespace, Name: clusterCtx.Cluster.Name}
if err := clusterCtx.Client.Get(clusterCtx, clusterKey, cluster); err != nil {
if err := clusterCtx.Client.Get(ctx, clusterKey, cluster); err != nil {
cluster = clusterCtx.Cluster.DeepCopy()
cluster.Spec.ControlPlaneEndpoint.Host = clusterCtx.VSphereCluster.Spec.ControlPlaneEndpoint.Host
cluster.Spec.ControlPlaneEndpoint.Port = clusterCtx.VSphereCluster.Spec.ControlPlaneEndpoint.Port
Expand All @@ -486,10 +487,10 @@ func (r clusterReconciler) isAPIServerOnline(clusterCtx *capvcontext.ClusterCont
return false
}

func (r clusterReconciler) isControlPlaneInitialized(clusterCtx *capvcontext.ClusterContext) bool {
func (r clusterReconciler) isControlPlaneInitialized(ctx context.Context, clusterCtx *capvcontext.ClusterContext) bool {
cluster := &clusterv1.Cluster{}
clusterKey := client.ObjectKey{Namespace: clusterCtx.Cluster.Namespace, Name: clusterCtx.Cluster.Name}
if err := clusterCtx.Client.Get(clusterCtx, clusterKey, cluster); err != nil {
if err := clusterCtx.Client.Get(ctx, clusterKey, cluster); err != nil {
if !apierrors.IsNotFound(err) {
clusterCtx.Logger.Error(err, "failed to get updated cluster object while checking if control plane is initialized")
return false
Expand All @@ -500,8 +501,8 @@ func (r clusterReconciler) isControlPlaneInitialized(clusterCtx *capvcontext.Clu
return conditions.IsTrue(clusterCtx.Cluster, clusterv1.ControlPlaneInitializedCondition)
}

func setOwnerRefsOnVsphereMachines(clusterCtx *capvcontext.ClusterContext) error {
vsphereMachines, err := infrautilv1.GetVSphereMachinesInCluster(clusterCtx, clusterCtx.Client, clusterCtx.Cluster.Namespace, clusterCtx.Cluster.Name)
func setOwnerRefsOnVsphereMachines(ctx context.Context, clusterCtx *capvcontext.ClusterContext) error {
vsphereMachines, err := infrautilv1.GetVSphereMachinesInCluster(ctx, clusterCtx.Client, clusterCtx.Cluster.Namespace, clusterCtx.Cluster.Name)
if err != nil {
return errors.Wrapf(err,
"unable to list VSphereMachines part of VSphereCluster %s/%s", clusterCtx.VSphereCluster.Namespace, clusterCtx.VSphereCluster.Name)
Expand All @@ -524,13 +525,14 @@ func setOwnerRefsOnVsphereMachines(clusterCtx *capvcontext.ClusterContext) error
UID: clusterCtx.VSphereCluster.UID,
}))

if err := patchHelper.Patch(clusterCtx, vsphereMachine); err != nil {
if err := patchHelper.Patch(ctx, vsphereMachine); err != nil {
patchErrors = append(patchErrors, err)
}
}
return kerrors.NewAggregate(patchErrors)
}

// TODO: zhanggbj: Add golang context after refactoring ClusterModule controller and reconciler.
func (r clusterReconciler) reconcileClusterModules(clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
if feature.Gates.Enabled(feature.NodeAntiAffinity) {
return r.clusterModuleReconciler.Reconcile(clusterCtx)
Expand Down
Loading

0 comments on commit 9d02623

Please sign in to comment.