From 2ecbcc72b3603b572e92b84edb1fa6ef201a0cb8 Mon Sep 17 00:00:00 2001 From: Dejan Zele Pejchev Date: Thu, 26 Sep 2024 10:29:58 +0200 Subject: [PATCH] remove duplicated code and fix minor bugs (#326) Signed-off-by: Dejan Zele Pejchev --- go.mod | 1 + .../install/armadaserver_controller.go | 167 ++++------ .../install/binoculars_controller.go | 111 ++----- .../install/binoculars_controller_test.go | 9 +- internal/controller/install/common_helpers.go | 224 +++++++++---- .../controller/install/common_helpers_test.go | 297 +++++++++++++++++- .../install/eventingester_controller.go | 43 +-- .../install/eventingester_controller_test.go | 7 + .../controller/install/executor_controller.go | 152 ++++----- .../install/executor_controller_test.go | 11 +- .../controller/install/lookout_controller.go | 127 +++----- .../install/lookout_controller_test.go | 74 +++-- .../install/lookoutingester_controller.go | 55 +--- .../lookoutingester_controller_test.go | 14 +- .../install/scheduler_controller.go | 102 ++---- .../install/scheduler_controller_test.go | 8 +- .../install/scheduleringester_controller.go | 57 ++-- .../scheduleringester_controller_test.go | 14 +- .../armadaserver_controller_test.go | 2 +- test/integration/lookout_controller_test.go | 2 +- 20 files changed, 807 insertions(+), 670 deletions(-) diff --git a/go.mod b/go.mod index 5bce902a..44e52f28 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect + github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/zapr v1.3.0 // indirect diff --git a/internal/controller/install/armadaserver_controller.go b/internal/controller/install/armadaserver_controller.go index a126ee3a..890d016d 100644 --- a/internal/controller/install/armadaserver_controller.go +++ b/internal/controller/install/armadaserver_controller.go @@ -64,67 +64,34 @@ type ArmadaServerReconciler struct { // move the current state of the cluster closer to the desired state. func (r *ArmadaServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) + started := time.Now() - logger.Info("Reconciling ArmadaServer object") - - logger.Info("Fetching ArmadaServer object from cache") - var as installv1alpha1.ArmadaServer - if err := r.Client.Get(ctx, req.NamespacedName, &as); err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("ArmadaServer not found in cache, ending reconcile...", "namespace", req.Namespace, "name", req.Name) - return ctrl.Result{}, nil - } + + logger.Info("Reconciling object") + + var server installv1alpha1.ArmadaServer + if miss, err := getObject(ctx, r.Client, &server, req.NamespacedName, logger); err != nil || miss { return ctrl.Result{}, err } - pc, err := installv1alpha1.BuildPortConfig(as.Spec.ApplicationConfig) + pc, err := installv1alpha1.BuildPortConfig(server.Spec.ApplicationConfig) if err != nil { return ctrl.Result{}, err } - as.Spec.PortConfig = pc + server.Spec.PortConfig = pc var components *CommonComponents - components, err = generateArmadaServerInstallComponents(&as, r.Scheme) + components, err = generateArmadaServerInstallComponents(&server, r.Scheme) if err != nil { return ctrl.Result{}, err } - deletionTimestamp := as.ObjectMeta.DeletionTimestamp - // examine DeletionTimestamp to determine if object is under deletion - if deletionTimestamp.IsZero() { - // The object is not being deleted, so if it does not have our finalizer, - // then lets add the finalizer and update the object. This is equivalent - // registering our finalizer. - if !controllerutil.ContainsFinalizer(&as, operatorFinalizer) { - logger.Info("Attaching finalizer to As object", "finalizer", operatorFinalizer) - controllerutil.AddFinalizer(&as, operatorFinalizer) - if err := r.Update(ctx, &as); err != nil { - return ctrl.Result{}, err - } - } - } else { - logger.Info("ArmadaServer object is being deleted", "finalizer", operatorFinalizer) - logger.Info("Namespace-scoped resources will be deleted by Kubernetes based on their OwnerReference") - // The object is being deleted - if controllerutil.ContainsFinalizer(&as, operatorFinalizer) { - // our finalizer is present, so lets handle any external dependency - logger.Info("Running cleanup function for ArmadaServer cluster-scoped components", "finalizer", operatorFinalizer) - if err := r.deleteExternalResources(ctx, components, logger); err != nil { - // if fail to delete the external dependency here, return with error - // so that it can be retried - return ctrl.Result{}, err - } - - // remove our finalizer from the list and update it. - logger.Info("Removing finalizer from ArmadaServer object", "finalizer", operatorFinalizer) - controllerutil.RemoveFinalizer(&as, operatorFinalizer) - if err := r.Update(ctx, &as); err != nil { - return ctrl.Result{}, err - } - } - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil + cleanupF := func(ctx context.Context) error { + return r.deleteExternalResources(ctx, components, logger) + } + finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &server, operatorFinalizer, cleanupF, logger) + if err != nil || finish { + return ctrl.Result{}, err } componentsCopy := components.DeepCopy() @@ -134,90 +101,58 @@ func (r *ArmadaServerReconciler) Reconcile(ctx context.Context, req ctrl.Request return nil } - if components.ServiceAccount != nil { - logger.Info("Upserting ArmadaServer ServiceAccount object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Secret != nil { - logger.Info("Upserting ArmadaServer Secret object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if as.Spec.PulsarInit { - for idx := range components.Jobs { - err = func() error { - if components.Jobs[idx] != nil { - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Jobs[idx], mutateFn); err != nil { - return err - } - ctxTimeout, cancel := context.WithTimeout(ctx, migrationTimeout) - defer cancel() - - err := waitForJob(ctxTimeout, r.Client, components.Jobs[idx], migrationPollSleep) - if err != nil { - return err - } + if server.Spec.PulsarInit { + for _, job := range components.Jobs { + err = func(job *batchv1.Job) error { + if err := upsertObjectIfNeeded(ctx, r.Client, job, server.Kind, mutateFn, logger); err != nil { + return err + } + + if err := waitForJob(ctx, r.Client, job, jobPollInterval, jobTimeout); err != nil { + return err } return nil - }() + }(job) if err != nil { return ctrl.Result{}, err } } } - if components.Deployment != nil { - logger.Info("Upserting ArmadaServer Deployment object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Service != nil { - logger.Info("Upserting ArmadaServer Service object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.IngressGrpc != nil { - logger.Info("Upserting ArmadaServer GRPC Ingress object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressGrpc, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressGrpc, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.IngressHttp != nil { - logger.Info("Upserting ArmadaServer REST Ingress object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressHttp, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressHttp, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.PodDisruptionBudget != nil { - logger.Info("Upserting ArmadaServer PodDisruptionBudget object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.PodDisruptionBudget, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.PodDisruptionBudget, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.PrometheusRule != nil { - logger.Info("Upserting ArmadaServer PrometheusRule object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.PrometheusRule, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.PrometheusRule, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.ServiceMonitor != nil { - logger.Info("Upserting ArmadaServer ServiceMonitor object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceMonitor, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceMonitor, server.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } logger.Info("Successfully reconciled ArmadaServer object", "durationMillis", time.Since(started).Milliseconds()) @@ -283,13 +218,17 @@ func generateArmadaServerInstallComponents(as *installv1alpha1.ArmadaServer, sch } var pr *monitoringv1.PrometheusRule + var sm *monitoringv1.ServiceMonitor if as.Spec.Prometheus != nil && as.Spec.Prometheus.Enabled { pr = createServerPrometheusRule(as.Name, as.Namespace, as.Spec.Prometheus.ScrapeInterval, as.Spec.Labels, as.Spec.Prometheus.Labels) - } + if err := controllerutil.SetOwnerReference(as, pr, scheme); err != nil { + return nil, err + } - sm := createServiceMonitor(as) - if err := controllerutil.SetOwnerReference(as, sm, scheme); err != nil { - return nil, err + sm = createServiceMonitor(as) + if err := controllerutil.SetOwnerReference(as, sm, scheme); err != nil { + return nil, err + } } jobs := []*batchv1.Job{{}} @@ -331,12 +270,12 @@ func createArmadaServerMigrationJobs(as *installv1alpha1.ArmadaServer) ([]*batch appConfig, err := builders.ConvertRawExtensionToYaml(as.Spec.ApplicationConfig) if err != nil { - return []*batchv1.Job{}, err + return nil, err } var asConfig AppConfig err = yaml.Unmarshal([]byte(appConfig), &asConfig) if err != nil { - return []*batchv1.Job{}, err + return nil, err } // First job is to poll/wait for Pulsar to be fully started diff --git a/internal/controller/install/binoculars_controller.go b/internal/controller/install/binoculars_controller.go index 3c145a5c..382b4c75 100644 --- a/internal/controller/install/binoculars_controller.go +++ b/internal/controller/install/binoculars_controller.go @@ -61,17 +61,13 @@ type BinocularsReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) + started := time.Now() - logger.Info("Reconciling Binoculars object") - logger.Info("Fetching Binoculars object from cache") + logger.Info("Reconciling Binoculars object") var binoculars installv1alpha1.Binoculars - if err := r.Client.Get(ctx, req.NamespacedName, &binoculars); err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("Binoculars not found in cache, ending reconcile...", "namespace", req.Namespace, "name", req.Name) - return ctrl.Result{}, nil - } + if miss, err := getObject(ctx, r.Client, &binoculars, req.NamespacedName, logger); err != nil || miss { return ctrl.Result{}, err } @@ -87,41 +83,12 @@ func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - deletionTimestamp := binoculars.ObjectMeta.DeletionTimestamp - // examine DeletionTimestamp to determine if object is under deletion - if deletionTimestamp.IsZero() { - // The object is not being deleted, so if it does not have our finalizer, - // then lets add the finalizer and update the object. This is equivalent - // registering our finalizer. - if !controllerutil.ContainsFinalizer(&binoculars, operatorFinalizer) { - logger.Info("Attaching finalizer to Binoculars object", "finalizer", operatorFinalizer) - controllerutil.AddFinalizer(&binoculars, operatorFinalizer) - if err := r.Update(ctx, &binoculars); err != nil { - return ctrl.Result{}, err - } - } - } else { - logger.Info("Binoculars object is being deleted", "finalizer", operatorFinalizer) - // The object is being deleted - if controllerutil.ContainsFinalizer(&binoculars, operatorFinalizer) { - // our finalizer is present, so lets handle any external dependency - logger.Info("Running cleanup function for Binoculars object", "finalizer", operatorFinalizer) - if err := r.deleteExternalResources(ctx, components); err != nil { - // if fail to delete the external dependency here, return with error - // so that it can be retried - return ctrl.Result{}, err - } - - // remove our finalizer from the list and update it. - logger.Info("Removing finalizer from Binoculars object", "finalizer", operatorFinalizer) - controllerutil.RemoveFinalizer(&binoculars, operatorFinalizer) - if err := r.Update(ctx, &binoculars); err != nil { - return ctrl.Result{}, err - } - } - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil + cleanupF := func(ctx context.Context) error { + return r.deleteExternalResources(ctx, components) + } + finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &binoculars, operatorFinalizer, cleanupF, logger) + if err != nil || finish { + return ctrl.Result{}, err } componentsCopy := components.DeepCopy() @@ -131,58 +98,40 @@ func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request) return nil } - if components.ServiceAccount != nil { - logger.Info("Upserting Binoculars ServiceAccount object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, binoculars.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.ClusterRole != nil { - logger.Info("Upserting Binoculars ClusterRole object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ClusterRole, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ClusterRole, binoculars.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.ClusterRoleBindings != nil && len(components.ClusterRoleBindings) > 0 { - logger.Info("Upserting Binoculars ClusterRoleBinding object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ClusterRoleBindings[0], mutateFn); err != nil { - return ctrl.Result{}, err + if len(components.ClusterRoleBindings) > 0 { + for _, crb := range components.ClusterRoleBindings { + if err := upsertObjectIfNeeded(ctx, r.Client, crb, binoculars.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err + } } } - if components.Secret != nil { - logger.Info("Upserting Binoculars Secret object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, binoculars.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Deployment != nil { - logger.Info("Upserting Binoculars Deployment object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, binoculars.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Service != nil { - logger.Info("Upserting Binoculars Service object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, binoculars.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.IngressGrpc != nil { - logger.Info("Upserting GRPC Ingress object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressGrpc, mutateFn); err != nil { - return ctrl.Result{}, err - } + + if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressGrpc, binoculars.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.IngressHttp != nil { - logger.Info("Upserting REST Ingress object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressHttp, mutateFn); err != nil { - return ctrl.Result{}, err - } + + if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressHttp, binoculars.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } logger.Info("Successfully reconciled Binoculars object", "durationMillis", time.Since(started).Milliseconds()) diff --git a/internal/controller/install/binoculars_controller_test.go b/internal/controller/install/binoculars_controller_test.go index f661c4db..e0fa87fa 100644 --- a/internal/controller/install/binoculars_controller_test.go +++ b/internal/controller/install/binoculars_controller_test.go @@ -161,14 +161,19 @@ func TestBinocularsReconciler_Reconcile(t *testing.T) { t.Fatal("We should not fail on generating binoculars") } + // Binoculars mockK8sClient := k8sclient.NewMockClient(mockCtrl) mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&v1alpha1.Binoculars{})). Return(nil). SetArg(2, expectedBinoculars) - // Binoculars finalizer - mockK8sClient.EXPECT().Update(gomock.Any(), gomock.AssignableToTypeOf(&installv1alpha1.Binoculars{})).Return(nil) + + // Finalizer + mockK8sClient. + EXPECT(). + Update(gomock.Any(), gomock.AssignableToTypeOf(&installv1alpha1.Binoculars{})). + Return(nil) mockK8sClient. EXPECT(). diff --git a/internal/controller/install/common_helpers.go b/internal/controller/install/common_helpers.go index 3e693b5f..3664c6fa 100644 --- a/internal/controller/install/common_helpers.go +++ b/internal/controller/install/common_helpers.go @@ -5,11 +5,16 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "reflect" "time" - "k8s.io/utils/ptr" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/go-logr/logr" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "github.com/pkg/errors" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" schedulingv1 "k8s.io/api/scheduling/v1" @@ -20,10 +25,7 @@ import ( networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/duration" - "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" @@ -31,12 +33,17 @@ import ( "github.com/armadaproject/armada-operator/internal/controller/builders" ) -type AppName string - const ( + // jobTimeout specifies the maximum time to wait for a job to complete. + jobTimeout = time.Second * 120 + // jobPollInterval specifies the interval to poll for job completion. + jobPollInterval = time.Second * 5 + // defaultPrometheusInterval is the default interval for Prometheus scraping. defaultPrometheusInterval = 1 * time.Second - appConfigFlag = "--config" - appConfigFilepath = "/config/application_config.yaml" + // appConfigFlag is the flag to specify the application config file in the container. + appConfigFlag = "--config" + // appConfigFilepath is the path to the application config file in the container. + appConfigFilepath = "/config/application_config.yaml" ) // CommonComponents are the base components for all Armada services @@ -101,6 +108,9 @@ type AppConfig struct { Pulsar PulsarConfig } +// CleanupFunc is a function that will clean up additional resources which are not deleted by owner references. +type CleanupFunc func(context.Context) error + // DeepCopy will deep-copy values from the receiver and return a new reference func (cc *CommonComponents) DeepCopy() *CommonComponents { var clusterRoleBindings []*rbacv1.ClusterRoleBinding @@ -280,23 +290,20 @@ func ExtractPulsarConfig(config runtime.RawExtension) (PulsarConfig, error) { return asConfig.Pulsar, nil } -// waitForJob will wait for some resolution of the job. Provide context with timeout if needed. -func waitForJob(ctx context.Context, cl client.Client, job *batchv1.Job, sleepTime time.Duration) (err error) { - for { - select { - case <-ctx.Done(): - return errors.New("context timeout while waiting for job") - default: +// waitForJob waits for the Job to reach a terminal state (complete or failed). +func waitForJob(ctx context.Context, c client.Client, job *batchv1.Job, pollInterval, timeout time.Duration) error { + return wait.PollUntilContextTimeout( + ctx, + pollInterval, + timeout, + false, + func(ctx context.Context) (bool, error) { key := client.ObjectKeyFromObject(job) - if err = cl.Get(ctx, key, job); err != nil { - return err + if err := c.Get(ctx, key, job); err != nil { + return false, err } - if isJobFinished(job) { - return nil - } - } - time.Sleep(sleepTime) - } + return isJobFinished(job), nil + }) } // isJobFinished will assess if the job is finished (complete of failed). @@ -424,42 +431,7 @@ func createPulsarVolumes(pulsarConfig PulsarConfig) []corev1.Volume { return volumes } -// createPrometheusRule will provide a prometheus monitoring rule for the name and scrapeInterval -func createPrometheusRule(name, namespace string, scrapeInterval *metav1.Duration, labels ...map[string]string) *monitoringv1.PrometheusRule { - if scrapeInterval == nil { - scrapeInterval = &metav1.Duration{Duration: defaultPrometheusInterval} - } - restRequestHistogram := `histogram_quantile(0.95, ` + - `sum(rate(rest_client_request_duration_seconds_bucket{service="` + name + `"}[2m])) by (endpoint, verb, url, le))` - logRate := "sum(rate(log_messages[2m])) by (level)" - durationString := duration.ShortHumanDuration(scrapeInterval.Duration) - objectMetaName := "armada-" + name + "-metrics" - return &monitoringv1.PrometheusRule{ - TypeMeta: metav1.TypeMeta{}, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - Labels: AllLabels(name, labels...), - }, - Spec: monitoringv1.PrometheusRuleSpec{ - Groups: []monitoringv1.RuleGroup{{ - Name: objectMetaName, - Interval: ptr.To(monitoringv1.Duration(durationString)), - Rules: []monitoringv1.Rule{ - { - Record: "armada:" + name + ":rest:request:histogram95", - Expr: intstr.IntOrString{StrVal: restRequestHistogram}, - }, - { - Record: "armada:" + name + ":log:rate", - Expr: intstr.IntOrString{StrVal: logRate}, - }, - }, - }}, - }, - } -} - +// addGoMemLimit will add the GOMEMLIMIT environment variable if the memory limit is set. func addGoMemLimit(env []corev1.EnvVar, resources corev1.ResourceRequirements) []corev1.EnvVar { if resources.Limits.Memory() != nil && resources.Limits.Memory().Value() != 0 { val := resources.Limits.Memory().Value() @@ -468,3 +440,135 @@ func addGoMemLimit(env []corev1.EnvVar, resources corev1.ResourceRequirements) [ } return env } + +// checkAndHandleObjectDeletion handles the deletion of the resource by adding/removing the finalizer. +// If the resource is being deleted, it will remove the finalizer. +// If the resource is not being deleted, it will add the finalizer. +// If finish is true, the reconciliation should finish early. +func checkAndHandleObjectDeletion( + ctx context.Context, + r client.Client, + object client.Object, + finalizer string, + cleanupF CleanupFunc, + logger logr.Logger, +) (finish bool, err error) { + logger = logger.WithValues("finalizer", finalizer) + deletionTimestamp := object.GetDeletionTimestamp() + if deletionTimestamp.IsZero() { + // The object is not being deleted as deletionTimestamp. + // In this case, we should add the finalizer if it is not already present. + if err := addFinalizerIfNeeded(ctx, r, object, finalizer, logger); err != nil { + return true, err + } + } else { + // The object is being deleted so we should run the cleanup function if needed and remove the finalizer. + return handleObjectDeletion(ctx, r, object, finalizer, cleanupF, logger) + } + // The object is not being deleted, continue reconciliation + return false, nil +} + +// addFinalizerIfNeeded will add the finalizer to the object if it is not already present. +func addFinalizerIfNeeded( + ctx context.Context, + client client.Client, + object client.Object, + finalizer string, + logger logr.Logger, +) error { + if !controllerutil.ContainsFinalizer(object, finalizer) { + logger.Info("Attaching cleanup finalizer because object does not have a deletion timestamp set") + controllerutil.AddFinalizer(object, finalizer) + return client.Update(ctx, object) + } + return nil +} + +func handleObjectDeletion( + ctx context.Context, + client client.Client, + object client.Object, + finalizer string, + cleanupF CleanupFunc, + logger logr.Logger, +) (finish bool, err error) { + deletionTimestamp := object.GetDeletionTimestamp() + logger.Info( + "Object is being deleted as it has a non-zero deletion timestamp set", + "deletionTimestamp", deletionTimestamp, + ) + logger.Info( + "Namespace-scoped objects will be deleted by Kubernetes based on their OwnerReference", + "deletionTimestamp", deletionTimestamp, + ) + // The object is being deleted + if controllerutil.ContainsFinalizer(object, finalizer) { + // Run additional cleanup function if it is provided + if cleanupF != nil { + if err := cleanupF(ctx); err != nil { + return true, err + } + } + // Remove our finalizer from the list and update it. + logger.Info("Removing cleanup finalizer from object") + controllerutil.RemoveFinalizer(object, finalizer) + if err := client.Update(ctx, object); err != nil { + return true, err + } + } + + // Stop reconciliation as the item is being deleted + return true, nil +} + +// upsertObjectIfNeeded will create or update the object with the mutateFn if the resource is not nil. +func upsertObjectIfNeeded( + ctx context.Context, + client client.Client, + object client.Object, + componentName string, + mutateFn controllerutil.MutateFn, + logger logr.Logger, +) error { + if !isNil(object) { + logger.Info(fmt.Sprintf("Upserting %s %s object", componentName, object.GetObjectKind())) + if _, err := controllerutil.CreateOrUpdate(ctx, client, object, mutateFn); err != nil { + return err + } + } + return nil +} + +// Helper function to determine if the object is nil even if it's a pointer to a nil value +func isNil(i any) bool { + iv := reflect.ValueOf(i) + if !iv.IsValid() { + return true + } + switch iv.Kind() { + case reflect.Ptr, reflect.Slice, reflect.Map, reflect.Func, reflect.Interface: + return iv.IsNil() + default: + return false + } +} + +// getObject will get the object from Kubernetes and return if it is missing or an error. +func getObject( + ctx context.Context, + client client.Client, + object client.Object, + namespacedName types.NamespacedName, + logger logr.Logger, +) (miss bool, err error) { + logger.Info("Fetching object from cache") + if err := client.Get(ctx, namespacedName, object); err != nil { + if k8serrors.IsNotFound(err) { + logger.Info("Object not found in cache, ending reconcile...") + return true, nil + } + return true, err + } + return false, nil +} diff --git a/internal/controller/install/common_helpers_test.go b/internal/controller/install/common_helpers_test.go index 88107b0f..80ab867e 100644 --- a/internal/controller/install/common_helpers_test.go +++ b/internal/controller/install/common_helpers_test.go @@ -5,6 +5,16 @@ import ( "testing" "time" + "github.com/go-logr/logr" + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/yaml" "context" @@ -20,7 +30,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -237,7 +247,7 @@ func Test_waitForJob(t *testing.T) { mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&batchv1.Job{})). - Return(errors.NewNotFound(schema.GroupResource{}, "job")) + Return(k8serrors.NewNotFound(schema.GroupResource{}, "job")) }, wantErr: true, }, @@ -250,9 +260,10 @@ func Test_waitForJob(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockK8sClient := k8sclient.NewMockClient(mockCtrl) - sleepTime := time.Millisecond * 1 - tt.setupMockFn(mockK8sClient) + mockClient := k8sclient.NewMockClient(mockCtrl) + pollInterval := time.Millisecond * 10 + timeout := time.Millisecond * 100 + tt.setupMockFn(mockClient) ctx := context.Background() if tt.ctxFn != nil { @@ -264,11 +275,11 @@ func Test_waitForJob(t *testing.T) { Namespace: expectedNamespacedName.Namespace, }, } - rslt := waitForJob(ctx, mockK8sClient, &job, sleepTime) + err := waitForJob(ctx, mockClient, &job, pollInterval, timeout) if tt.wantErr { - assert.Error(t, rslt) + assert.Error(t, err) } else { - assert.NoError(t, rslt) + assert.NoError(t, err) } }) } @@ -796,13 +807,11 @@ func makeCommonComponents() CommonComponents { } func TestAddGoMemLimit(t *testing.T) { - type test struct { + tests := []struct { name string resourcesYaml string expectedGoMemLimit string - } - - tests := []test{ + }{ { name: "1Gi memory limit", resourcesYaml: `limits: @@ -849,3 +858,267 @@ func TestAddGoMemLimit(t *testing.T) { }) } } + +func TestCheckAndHandleResourceDeletion(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // Create a logger (for tests, we use logr.Discard() to avoid actual logging) + logger := logr.Discard() + + // Table-driven test cases + tests := []struct { + name string + deletionTimestamp *time.Time + finalizerPresent bool + expectFinalizer bool + expectFinish bool + expectError bool + }{ + { + name: "Object not being deleted, finalizer not present", + deletionTimestamp: nil, + finalizerPresent: false, + expectFinalizer: true, + expectFinish: false, + expectError: false, + }, + { + name: "Object not being deleted, finalizer already present", + deletionTimestamp: nil, + finalizerPresent: true, + expectFinalizer: true, + expectFinish: false, + expectError: false, + }, + { + name: "Object being deleted, finalizer present", + deletionTimestamp: ptr.To(time.Now()), + finalizerPresent: true, + expectFinalizer: false, + expectFinish: true, + expectError: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Create a fake object + object := newTestObject(t) + + // Set DeletionTimestamp if the test requires it + if tc.deletionTimestamp != nil { + object.SetDeletionTimestamp(&metav1.Time{Time: *tc.deletionTimestamp}) + } + + // Add or remove finalizer based on the test case + finalizer := "test.finalizer" + if tc.finalizerPresent { + controllerutil.AddFinalizer(object, finalizer) + } + + // Create a fake client + fakeClient := fake.NewClientBuilder().WithObjects(object).Build() + + cleanupFunc := func(ctx context.Context) error { + if tc.deletionTimestamp == nil { + t.Fatalf("cleanup function should not be called") + } + return nil + } + + // Call the function under test + finish, err := checkAndHandleObjectDeletion(ctx, fakeClient, object, finalizer, cleanupFunc, logger) + + // Check for errors + if (err != nil) != tc.expectError { + t.Errorf("Expected error: %v, got: %v", tc.expectError, err) + } + + // Check if reconciliation should finish + if finish != tc.expectFinish { + t.Errorf("Expected finish: %v, got: %v", tc.expectFinish, finish) + } + + // Check if finalizer was added/removed as expected + hasFinalizer := controllerutil.ContainsFinalizer(object, finalizer) + if hasFinalizer != tc.expectFinalizer { + t.Errorf("Expected finalizer: %v, got: %v", tc.expectFinalizer, hasFinalizer) + } + }) + } +} + +func TestGetObjectFromCache(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // No-op logger for testing + logger := logr.Discard() + + // Test cases + tests := []struct { + name string + objectExists bool + expectMiss bool + expectError bool + returnError error + }{ + { + name: "Object exists in cache", + objectExists: true, + returnError: nil, + expectMiss: false, + expectError: false, + }, + { + name: "Object not found in cache", + objectExists: false, + returnError: k8serrors.NewNotFound(newTestGroupResource(t), "test-resource"), + expectMiss: true, + expectError: false, + }, + { + name: "Error while fetching from cache", + objectExists: false, + returnError: errors.New("some network error"), + expectMiss: true, + expectError: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Create a fake client with the expected error behavior + clientBuilder := fake.NewClientBuilder() + + // Create a fake object + object := newTestObject(t) + + if tc.objectExists { + clientBuilder.WithObjects(object) + } + + if tc.expectError { + clientBuilder.WithInterceptorFuncs(interceptor.Funcs{ + Get: func(ctx context.Context, client client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + return tc.returnError + }, + }) + } + + fakeClient := clientBuilder.Build() + + // Call the function under test + namespacedName := types.NamespacedName{Name: "test-resource", Namespace: "default"} + miss, err := getObject(ctx, fakeClient, object, namespacedName, logger) + if tc.expectError { + assert.ErrorIs(t, err, tc.returnError) + } else { + assert.NoError(t, err) + } + assert.Equal(t, miss, tc.expectMiss) + }) + } +} + +// newTestObject returns a test unstructured.Unstructured object only to be used in tests. +func newTestObject(t *testing.T) *unstructured.Unstructured { + object := &unstructured.Unstructured{} + object.SetNamespace("default") + object.SetName("test-resource") + object.SetUID(uuid.NewUUID()) + object.SetGroupVersionKind(newTestGroupVersionKind(t)) + return object +} + +// newTestGroupVersionKind returns a test schema.GroupVersionKind only to be used in tests. +func newTestGroupVersionKind(t *testing.T) schema.GroupVersionKind { + return schema.GroupVersionKind{ + Group: "test.group", + Version: "v1", + Kind: "TestKind", + } +} + +// newTestGroupResource returns a test schema.GroupResource only to be used in tests. +func newTestGroupResource(t *testing.T) schema.GroupResource { + return schema.GroupResource{ + Group: "test.group", + Resource: "test-resource", + } +} + +func TestIsNil(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input interface{} + expected bool + }{ + { + name: "Nil interface", + input: nil, + expected: true, + }, + { + name: "Non-nil interface with value", + input: 42, + expected: false, + }, + { + name: "Nil pointer", + input: (*int)(nil), + expected: true, + }, + { + name: "Non-nil pointer", + input: func() *int { val := 42; return &val }(), + expected: false, + }, + { + name: "Nil slice", + input: ([]int)(nil), + expected: true, + }, + { + name: "Empty slice", + input: []int{}, + expected: false, + }, + { + name: "Nil map", + input: (map[string]int)(nil), + expected: true, + }, + { + name: "Empty map", + input: map[string]int{}, + expected: false, + }, + { + name: "Nil function", + input: (func())(nil), + expected: true, + }, + { + name: "Non-nil function", + input: func() {}, + expected: false, + }, + } + + // Iterate over the test cases + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Call the isNil function with the test input + result := isNil(tt.input) + if result != tt.expected { + t.Errorf("isNil(%v) = %v, expected %v", tt.input, result, tt.expected) + } + }) + } +} diff --git a/internal/controller/install/eventingester_controller.go b/internal/controller/install/eventingester_controller.go index 4dba894a..1ba3e9bb 100644 --- a/internal/controller/install/eventingester_controller.go +++ b/internal/controller/install/eventingester_controller.go @@ -24,7 +24,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" @@ -55,16 +54,13 @@ type EventIngesterReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile func (r *EventIngesterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) + started := time.Now() - logger.Info("Reconciling EventIngester object") - logger.Info("Fetching EventIngester object from cache") + logger.Info("Reconciling object") + var eventIngester installv1alpha1.EventIngester - if err := r.Client.Get(ctx, req.NamespacedName, &eventIngester); err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("EventIngester not found in cache, ending reconcile") - return ctrl.Result{}, nil - } + if miss, err := getObject(ctx, r.Client, &eventIngester, req.NamespacedName, logger); err != nil || miss { return ctrl.Result{}, err } @@ -79,13 +75,9 @@ func (r *EventIngesterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - deletionTimestamp := eventIngester.ObjectMeta.DeletionTimestamp - // examine DeletionTimestamp to determine if object is under deletion - if !deletionTimestamp.IsZero() { - logger.Info("EventIngester is being deleted") - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil + finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &eventIngester, operatorFinalizer, nil, logger) + if err != nil || finish { + return ctrl.Result{}, err } componentsCopy := components.DeepCopy() @@ -95,25 +87,16 @@ func (r *EventIngesterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return nil } - if components.ServiceAccount != nil { - logger.Info("Upserting EventIngester ServiceAccount object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, eventIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Secret != nil { - logger.Info("Upserting EventIngester Secret object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, eventIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Deployment != nil { - logger.Info("Upserting EventIngester Deployment object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, eventIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } logger.Info("Successfully reconciled EventIngester object", "durationMillis", time.Since(started).Milliseconds()) diff --git a/internal/controller/install/eventingester_controller_test.go b/internal/controller/install/eventingester_controller_test.go index e645182d..b79d355e 100644 --- a/internal/controller/install/eventingester_controller_test.go +++ b/internal/controller/install/eventingester_controller_test.go @@ -57,12 +57,19 @@ func TestEventIngesterReconciler_Reconcile(t *testing.T) { ownerReference := []metav1.OwnerReference{owner} mockK8sClient := k8sclient.NewMockClient(mockCtrl) + // EventIngester mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&v1alpha1.EventIngester{})). Return(nil). SetArg(2, expectedEventIngester) + // Finalizer + mockK8sClient. + EXPECT(). + Update(gomock.Any(), gomock.AssignableToTypeOf(&installv1alpha1.EventIngester{})). + Return(nil) + expectedSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: expectedEventIngester.Name, diff --git a/internal/controller/install/executor_controller.go b/internal/controller/install/executor_controller.go index df2d45f9..08986066 100644 --- a/internal/controller/install/executor_controller.go +++ b/internal/controller/install/executor_controller.go @@ -21,6 +21,8 @@ import ( "fmt" "time" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "github.com/go-logr/logr" @@ -76,17 +78,14 @@ type ExecutorReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile func (r *ExecutorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) + started := time.Now() - logger.Info("Reconciling Executor object") - logger.Info("Fetching Executor object from cache") + logger.Info("Reconciling object") + var executor installv1alpha1.Executor - if err := r.Client.Get(ctx, req.NamespacedName, &executor); err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("Executor not found in cache, ending reconcile") - return ctrl.Result{}, nil - } + if miss, err := getObject(ctx, r.Client, &executor, req.NamespacedName, logger); err != nil || miss { return ctrl.Result{}, err } @@ -101,42 +100,12 @@ func (r *ExecutorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - deletionTimestamp := executor.ObjectMeta.DeletionTimestamp - // examine DeletionTimestamp to determine if object is under deletion - if deletionTimestamp.IsZero() { - // The object is not being deleted, so if it does not have our finalizer, - // then lets add the finalizer and update the object. This is equivalent - // registering our finalizer. - if !controllerutil.ContainsFinalizer(&executor, operatorFinalizer) { - logger.Info("Attaching finalizer to Executor object", "finalizer", operatorFinalizer) - controllerutil.AddFinalizer(&executor, operatorFinalizer) - if err := r.Update(ctx, &executor); err != nil { - return ctrl.Result{}, err - } - } - } else { - logger.Info("Executor object is being deleted", "finalizer", operatorFinalizer) - logger.Info("Namespace-scoped resources will be deleted by Kubernetes based on their OwnerReference") - // The object is being deleted - if controllerutil.ContainsFinalizer(&executor, operatorFinalizer) { - // our finalizer is present, so lets handle any external dependency - logger.Info("Running cleanup function for Executor cluster-scoped components", "finalizer", operatorFinalizer) - if err := r.deleteExternalResources(ctx, components, logger); err != nil { - // if fail to delete the external dependency here, return with error - // so that it can be retried - return ctrl.Result{}, err - } - - // remove our finalizer from the list and update it. - logger.Info("Removing finalizer from Executor object", "finalizer", operatorFinalizer) - controllerutil.RemoveFinalizer(&executor, operatorFinalizer) - if err := r.Update(ctx, &executor); err != nil { - return ctrl.Result{}, err - } - } - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil + cleanupF := func(ctx context.Context) error { + return r.deleteExternalResources(ctx, components, logger) + } + finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &executor, operatorFinalizer, cleanupF, logger) + if err != nil || finish { + return ctrl.Result{}, err } componentsCopy := components.DeepCopy() @@ -146,70 +115,47 @@ func (r *ExecutorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return nil } - if components.ServiceAccount != nil { - logger.Info("Upserting Executor ServiceAccount object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, executor.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.ClusterRole != nil { - logger.Info("Upserting Executor ClusterRole object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ClusterRole, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ClusterRole, executor.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } for _, crb := range components.ClusterRoleBindings { - logger.Info("Upserting additional Executor ClusterRoleBinding object", "name", crb.Name) - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, crb, mutateFn); err != nil { + if err := upsertObjectIfNeeded(ctx, r.Client, crb, executor.Kind, mutateFn, logger); err != nil { return ctrl.Result{}, err } } - if components.Secret != nil { - logger.Info("Upserting Executor Secret object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, executor.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Deployment != nil { - logger.Info("Upserting Executor Deployment object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, executor.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Service != nil { - logger.Info("Upserting Executor Service object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, executor.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } for _, pc := range components.PriorityClasses { - logger.Info("Upserting additional Executor PriorityClass object", "name", pc.Name) - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, pc, mutateFn); err != nil { + if err := upsertObjectIfNeeded(ctx, r.Client, pc, executor.Kind, mutateFn, logger); err != nil { return ctrl.Result{}, err } } - if components.PrometheusRule != nil { - logger.Info("Upserting Executor PrometheusRule object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.PrometheusRule, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.PrometheusRule, executor.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.ServiceMonitor != nil { - logger.Info("Upserting Executor ServiceMonitor object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceMonitor, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceMonitor, executor.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - logger.Info("Successfully reconciled Executor object", "durationMillis", time.Since(started).Milliseconds()) + logger.Info("Successfully reconciled object", "durationMillis", time.Since(started).Milliseconds()) return ctrl.Result{}, nil } @@ -265,7 +211,7 @@ func (r *ExecutorReconciler) generateExecutorInstallComponents(executor *install } components.ServiceMonitor = serviceMonitor - components.PrometheusRule = createPrometheusRule(executor.Name, executor.Namespace, executor.Spec.Prometheus.ScrapeInterval, executor.Spec.Labels, executor.Spec.Prometheus.Labels) + components.PrometheusRule = createExecutorPrometheusRule(executor.Name, executor.Namespace, executor.Spec.Prometheus.ScrapeInterval, executor.Spec.Labels, executor.Spec.Prometheus.Labels) } return components, nil @@ -508,6 +454,42 @@ func (r *ExecutorReconciler) deleteExternalResources(ctx context.Context, compon return nil } +// createExecutorPrometheusRule will provide a prometheus monitoring rule for the name and scrapeInterval +func createExecutorPrometheusRule(name, namespace string, scrapeInterval *metav1.Duration, labels ...map[string]string) *monitoringv1.PrometheusRule { + if scrapeInterval == nil { + scrapeInterval = &metav1.Duration{Duration: defaultPrometheusInterval} + } + restRequestHistogram := `histogram_quantile(0.95, ` + + `sum(rate(rest_client_request_duration_seconds_bucket{service="` + name + `"}[2m])) by (endpoint, verb, url, le))` + logRate := "sum(rate(log_messages[2m])) by (level)" + durationString := duration.ShortHumanDuration(scrapeInterval.Duration) + objectMetaName := "armada-" + name + "-metrics" + return &monitoringv1.PrometheusRule{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: AllLabels(name, labels...), + }, + Spec: monitoringv1.PrometheusRuleSpec{ + Groups: []monitoringv1.RuleGroup{{ + Name: objectMetaName, + Interval: ptr.To(monitoringv1.Duration(durationString)), + Rules: []monitoringv1.Rule{ + { + Record: "armada:" + name + ":rest:request:histogram95", + Expr: intstr.IntOrString{StrVal: restRequestHistogram}, + }, + { + Record: "armada:" + name + ":log:rate", + Expr: intstr.IntOrString{StrVal: logRate}, + }, + }, + }}, + }, + } +} + // SetupWithManager sets up the controller with the Manager. func (r *ExecutorReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/internal/controller/install/executor_controller_test.go b/internal/controller/install/executor_controller_test.go index f03854c2..0d9a4805 100644 --- a/internal/controller/install/executor_controller_test.go +++ b/internal/controller/install/executor_controller_test.go @@ -59,11 +59,13 @@ func TestExecutorReconciler_ReconcileNewExecutor(t *testing.T) { Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&installv1alpha1.Executor{})). Return(nil). SetArg(2, expectedExecutor) - // Executor finalizer + + // Finalizer mockK8sClient. EXPECT(). Update(gomock.Any(), gomock.AssignableToTypeOf(&installv1alpha1.Executor{})). Return(nil) + // ServiceAccount mockK8sClient. EXPECT(). @@ -73,6 +75,7 @@ func TestExecutorReconciler_ReconcileNewExecutor(t *testing.T) { EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.ServiceAccount{})). Return(nil) + // ClusterRole mockK8sClient. EXPECT(). @@ -82,6 +85,7 @@ func TestExecutorReconciler_ReconcileNewExecutor(t *testing.T) { EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&rbacv1.ClusterRole{})). Return(nil) + // ClusterRoleBinding mockK8sClient. EXPECT(). @@ -91,6 +95,7 @@ func TestExecutorReconciler_ReconcileNewExecutor(t *testing.T) { EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&rbacv1.ClusterRoleBinding{})). Return(nil) + // Secret mockK8sClient. EXPECT(). @@ -100,6 +105,7 @@ func TestExecutorReconciler_ReconcileNewExecutor(t *testing.T) { EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})). Return(nil) + // Deployment mockK8sClient. EXPECT(). @@ -109,6 +115,7 @@ func TestExecutorReconciler_ReconcileNewExecutor(t *testing.T) { EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). Return(nil) + // Service mockK8sClient. EXPECT(). @@ -118,6 +125,7 @@ func TestExecutorReconciler_ReconcileNewExecutor(t *testing.T) { EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Service{})). Return(nil) + // PrometheusRule mockK8sClient. EXPECT(). @@ -127,6 +135,7 @@ func TestExecutorReconciler_ReconcileNewExecutor(t *testing.T) { EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.PrometheusRule{})). Return(nil) + // ServiceMonitor mockK8sClient. EXPECT(). diff --git a/internal/controller/install/lookout_controller.go b/internal/controller/install/lookout_controller.go index d80e50b1..48c0e33d 100644 --- a/internal/controller/install/lookout_controller.go +++ b/internal/controller/install/lookout_controller.go @@ -30,7 +30,6 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -40,12 +39,6 @@ import ( "sigs.k8s.io/yaml" ) -// migrationTimeout is how long we'll wait for the Lookout db migration job -const ( - migrationTimeout = time.Second * 120 - migrationPollSleep = time.Second * 5 -) - // LookoutReconciler reconciles a Lookout object type LookoutReconciler struct { client.Client @@ -63,17 +56,18 @@ type LookoutReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile func (r *LookoutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) + started := time.Now() - logger.Info("Reconciling Lookout object") - logger.Info("Fetching Lookout object from cache") + logger.Info("Reconciling object") var lookout installv1alpha1.Lookout - if err := r.Client.Get(ctx, req.NamespacedName, &lookout); err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("Lookout not found in cache, ending reconcile...", "namespace", req.Namespace, "name", req.Name) - return ctrl.Result{}, nil - } + if miss, err := getObject(ctx, r.Client, &lookout, req.NamespacedName, logger); err != nil || miss { + return ctrl.Result{}, err + } + + finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &lookout, operatorFinalizer, nil, logger) + if err != nil || finish { return ctrl.Result{}, err } @@ -89,37 +83,6 @@ func (r *LookoutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } - // examine DeletionTimestamp to determine if object is under deletion - deletionTimestamp := lookout.ObjectMeta.DeletionTimestamp - // examine DeletionTimestamp to determine if object is under deletion - if deletionTimestamp.IsZero() { - // The object is not being deleted, so if it does not have our finalizer, - // then lets add the finalizer and update the object. This is equivalent - // registering our finalizer. - if !controllerutil.ContainsFinalizer(&lookout, operatorFinalizer) { - logger.Info("Attaching finalizer to Lookout object", "finalizer", operatorFinalizer) - controllerutil.AddFinalizer(&lookout, operatorFinalizer) - if err := r.Update(ctx, &lookout); err != nil { - return ctrl.Result{}, err - } - } - } else { - logger.Info("Lookout object is being deleted", "finalizer", operatorFinalizer) - logger.Info("Namespace-scoped resources will be deleted by Kubernetes based on their OwnerReference") - // The object is being deleted - if controllerutil.ContainsFinalizer(&lookout, operatorFinalizer) { - // remove our finalizer from the list and update it. - logger.Info("Removing finalizer from Lookout object", "finalizer", operatorFinalizer) - controllerutil.RemoveFinalizer(&lookout, operatorFinalizer) - if err := r.Update(ctx, &lookout); err != nil { - return ctrl.Result{}, err - } - } - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil - } - componentsCopy := components.DeepCopy() mutateFn := func() error { @@ -127,68 +90,52 @@ func (r *LookoutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return nil } - if components.ServiceAccount != nil { - logger.Info("Upserting Lookout ServiceAccount object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, lookout.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Secret != nil { - logger.Info("Upserting Lookout Secret object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, lookout.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Jobs != nil && len(components.Jobs) > 0 { - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Jobs[0], mutateFn); err != nil { - return ctrl.Result{}, err - } - ctxTimeout, cancel := context.WithTimeout(ctx, migrationTimeout) - defer cancel() - err := waitForJob(ctxTimeout, r.Client, components.Jobs[0], migrationPollSleep) + for _, job := range components.Jobs { + err = func(job *batchv1.Job) error { + if err := upsertObjectIfNeeded(ctx, r.Client, job, lookout.Kind, mutateFn, logger); err != nil { + return err + } + + if err := waitForJob(ctx, r.Client, job, jobPollInterval, jobTimeout); err != nil { + return err + } + + return nil + }(job) if err != nil { return ctrl.Result{}, err } } - if components.Deployment != nil { - logger.Info("Upserting Lookout Deployment object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, lookout.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Service != nil { - logger.Info("Upserting Lookout Service object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, lookout.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.IngressHttp != nil { - logger.Info("Upserting Lookout Ingress Rest object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressHttp, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressHttp, lookout.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.PrometheusRule != nil { - logger.Info("Upserting Lookout PrometheusRule object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.PrometheusRule, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.CronJob, lookout.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.ServiceMonitor != nil { - logger.Info("Upserting Lookout ServiceMonitor object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceMonitor, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceMonitor, lookout.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - logger.Info("Successfully reconciled Lookout object", "durationMillis", time.Since(started).Milliseconds()) + logger.Info("Successfully reconciled resource", "durationMillis", time.Since(started).Milliseconds()) return ctrl.Result{}, nil } @@ -246,8 +193,8 @@ func generateLookoutInstallComponents(lookout *installv1alpha1.Lookout, scheme * } var cronJob *batchv1.CronJob - if lookout.Spec.DbPruningEnabled != nil && *lookout.Spec.DbPruningEnabled { - cronJob, err := createLookoutCronJob(lookout) + if enabled := lookout.Spec.DbPruningEnabled; enabled != nil && *enabled { + cronJob, err = createLookoutCronJob(lookout) if err != nil { return nil, err } diff --git a/internal/controller/install/lookout_controller_test.go b/internal/controller/install/lookout_controller_test.go index 1e1c0545..23a81351 100644 --- a/internal/controller/install/lookout_controller_test.go +++ b/internal/controller/install/lookout_controller_test.go @@ -73,12 +73,8 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { }, } - lookout, err := generateLookoutInstallComponents(&expectedLookout, scheme) - if err != nil { - t.Fatal("We should not fail on generating lookout") - } - mockK8sClient := k8sclient.NewMockClient(mockCtrl) + // Lookout mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&v1alpha1.Lookout{})). @@ -99,8 +95,7 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { mockK8sClient. EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.ServiceAccount{})). - Return(nil). - SetArg(1, *lookout.ServiceAccount) + Return(nil) mockK8sClient. EXPECT(). @@ -109,16 +104,34 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { mockK8sClient. EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})). - Return(nil). - SetArg(1, *lookout.Secret) + Return(nil) expectedJobName := types.NamespacedName{Namespace: "default", Name: "lookout-migration"} - lookout.Jobs[0].Status = batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{{ - Type: batchv1.JobComplete, - Status: corev1.ConditionTrue, - }}, + expectedMigrationJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "lookout-migration", + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "lookout-migration", + Image: "testrepo:1.0.0", + }, + }, + }, + }, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }}, + }, } + mockK8sClient. EXPECT(). Get(gomock.Any(), expectedJobName, gomock.AssignableToTypeOf(&batchv1.Job{})). @@ -126,13 +139,12 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { mockK8sClient. EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&batchv1.Job{})). - Return(nil). - SetArg(1, *lookout.Jobs[0]) + Return(nil) mockK8sClient. EXPECT(). Get(gomock.Any(), expectedJobName, gomock.AssignableToTypeOf(&batchv1.Job{})). Return(nil). - SetArg(2, *lookout.Jobs[0]) + SetArg(2, *expectedMigrationJob) mockK8sClient. EXPECT(). @@ -141,8 +153,7 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { mockK8sClient. EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). - Return(nil). - SetArg(1, *lookout.Deployment) + Return(nil) mockK8sClient. EXPECT(). @@ -151,8 +162,7 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { mockK8sClient. EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Service{})). - Return(nil). - SetArg(1, *lookout.Service) + Return(nil) // IngressHttp expectedIngressName := expectedNamespacedName @@ -164,8 +174,7 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { mockK8sClient. EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&networkingv1.Ingress{})). - Return(nil). - SetArg(1, *lookout.IngressHttp) + Return(nil) // ServiceMonitor mockK8sClient. @@ -177,6 +186,18 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.ServiceMonitor{})). Return(nil) + // CronJob + expectedCronJobName := expectedNamespacedName + expectedCronJobName.Name = expectedCronJobName.Name + "-db-pruner" + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedCronJobName, gomock.AssignableToTypeOf(&batchv1.CronJob{})). + Return(errors.NewNotFound(schema.GroupResource{}, "armadaserver")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&batchv1.CronJob{})). + Return(nil) + r := LookoutReconciler{ Client: mockK8sClient, Scheme: scheme, @@ -238,10 +259,9 @@ func TestLookoutReconciler_ReconcileErrorDueToApplicationConfig(t *testing.T) { APIVersion: "install.armadaproject.io/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "lookout", - DeletionTimestamp: &metav1.Time{Time: time.Now()}, - Finalizers: []string{operatorFinalizer}, + Namespace: "default", + Name: "lookout", + Finalizers: []string{operatorFinalizer}, }, Spec: v1alpha1.LookoutSpec{ CommonSpecBase: installv1alpha1.CommonSpecBase{ diff --git a/internal/controller/install/lookoutingester_controller.go b/internal/controller/install/lookoutingester_controller.go index dfb08a3e..5d26c46c 100644 --- a/internal/controller/install/lookoutingester_controller.go +++ b/internal/controller/install/lookoutingester_controller.go @@ -18,14 +18,12 @@ package install import ( "context" - "fmt" "time" "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -52,20 +50,21 @@ type LookoutIngesterReconciler struct { // move the current state of the cluster closer to the desired state. func (r *LookoutIngesterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) + started := time.Now() - logger.Info("Reconciling LookoutIngester object") - logger.Info("Fetching LookoutIngester object from cache") + logger.Info("Reconciling object") + var lookoutIngester installv1alpha1.LookoutIngester - if err := r.Client.Get(ctx, req.NamespacedName, &lookoutIngester); err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("LookoutIngester not found in cache, ending reconcile") - return ctrl.Result{}, nil - } + if miss, err := getObject(ctx, r.Client, &lookoutIngester, req.NamespacedName, logger); err != nil || miss { + return ctrl.Result{}, err + } + + finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &lookoutIngester, operatorFinalizer, nil, logger) + if err != nil || finish { return ctrl.Result{}, err } - logger.Info(fmt.Sprintf("LookoutIngester Name %s", lookoutIngester.Name)) pc, err := installv1alpha1.BuildPortConfig(lookoutIngester.Spec.ApplicationConfig) if err != nil { return ctrl.Result{}, err @@ -77,19 +76,6 @@ func (r *LookoutIngesterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } - deletionTimestamp := lookoutIngester.ObjectMeta.DeletionTimestamp - - // examine DeletionTimestamp to determine if object is under deletion - if !deletionTimestamp.IsZero() { - logger.Info("LookoutIngester is being deleted") - - // FIXME: Seems like something actually has to happen here? - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil - - } - componentsCopy := components.DeepCopy() mutateFn := func() error { @@ -97,28 +83,19 @@ func (r *LookoutIngesterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return nil } - if components.Secret != nil { - logger.Info("Upserting LookoutIngester Secret object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, lookoutIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.ServiceAccount != nil { - logger.Info("Upserting LookoutIngester ServiceAccount object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, lookoutIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Deployment != nil { - logger.Info("Upserting LookoutIngester Deployment object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, lookoutIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - logger.Info("Successfully reconciled LookoutIngester object", "durationMillis", time.Since(started).Milliseconds()) + logger.Info("Successfully reconciled object", "durationMillis", time.Since(started).Milliseconds()) return ctrl.Result{}, nil } diff --git a/internal/controller/install/lookoutingester_controller_test.go b/internal/controller/install/lookoutingester_controller_test.go index 7d3d8803..d44b17f6 100644 --- a/internal/controller/install/lookoutingester_controller_test.go +++ b/internal/controller/install/lookoutingester_controller_test.go @@ -57,12 +57,19 @@ func TestLookoutIngesterReconciler_Reconcile(t *testing.T) { ownerReference := []metav1.OwnerReference{owner} mockK8sClient := k8sclient.NewMockClient(mockCtrl) + // LookoutIngester mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&v1alpha1.LookoutIngester{})). Return(nil). SetArg(2, expectedLookoutIngester) + // Finalizer + mockK8sClient. + EXPECT(). + Update(gomock.Any(), gomock.AssignableToTypeOf(&installv1alpha1.LookoutIngester{})). + Return(nil) + expectedSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: expectedLookoutIngester.Name, @@ -240,10 +247,9 @@ func TestLookoutIngesterReconciler_ErrorOnApplicationConfig(t *testing.T) { APIVersion: "install.armadaproject.io/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "LookoutIngester", - DeletionTimestamp: &metav1.Time{Time: time.Now()}, - Finalizers: []string{"batch.tutorial.kubebuilder.io/finalizer"}, + Namespace: "default", + Name: "LookoutIngester", + Finalizers: []string{operatorFinalizer}, }, Spec: v1alpha1.LookoutIngesterSpec{ CommonSpecBase: installv1alpha1.CommonSpecBase{ diff --git a/internal/controller/install/scheduler_controller.go b/internal/controller/install/scheduler_controller.go index 12b0131d..9e66d59b 100644 --- a/internal/controller/install/scheduler_controller.go +++ b/internal/controller/install/scheduler_controller.go @@ -31,7 +31,6 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -65,24 +64,22 @@ type SchedulerReconciler struct { func (r *SchedulerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) started := time.Now() - logger.Info("Reconciling Scheduler object") - - logger.Info("Fetching Scheduler object from cache") + logger.Info("Reconciling object") var scheduler installv1alpha1.Scheduler - if err := r.Client.Get(ctx, req.NamespacedName, &scheduler); err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("Scheduler not found in cache, ending reconcile...", "namespace", req.Namespace, "name", req.Name) - return ctrl.Result{}, nil - } + if miss, err := getObject(ctx, r.Client, &scheduler, req.NamespacedName, logger); err != nil || miss { + return ctrl.Result{}, err + } + + finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &scheduler, operatorFinalizer, nil, logger) + if err != nil || finish { return ctrl.Result{}, err } - pc, err := installv1alpha1.BuildPortConfig(scheduler.Spec.ApplicationConfig) + scheduler.Spec.PortConfig, err = installv1alpha1.BuildPortConfig(scheduler.Spec.ApplicationConfig) if err != nil { return ctrl.Result{}, err } - scheduler.Spec.PortConfig = pc var components *CommonComponents components, err = generateSchedulerInstallComponents(&scheduler, r.Scheme) @@ -90,37 +87,6 @@ func (r *SchedulerReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } - // examine DeletionTimestamp to determine if object is under deletion - deletionTimestamp := scheduler.ObjectMeta.DeletionTimestamp - // examine DeletionTimestamp to determine if object is under deletion - if deletionTimestamp.IsZero() { - // The object is not being deleted, so if it does not have our finalizer, - // then lets add the finalizer and update the object. This is equivalent - // registering our finalizer. - if !controllerutil.ContainsFinalizer(&scheduler, operatorFinalizer) { - logger.Info("Attaching finalizer to Scheduler object", "finalizer", operatorFinalizer) - controllerutil.AddFinalizer(&scheduler, operatorFinalizer) - if err := r.Update(ctx, &scheduler); err != nil { - return ctrl.Result{}, err - } - } - } else { - logger.Info("Scheduler object is being deleted", "finalizer", operatorFinalizer) - logger.Info("Namespace-scoped resources will be deleted by Kubernetes based on their OwnerReference") - // The object is being deleted - if controllerutil.ContainsFinalizer(&scheduler, operatorFinalizer) { - // remove our finalizer from the list and update it. - logger.Info("Removing finalizer from Scheduler object", "finalizer", operatorFinalizer) - controllerutil.RemoveFinalizer(&scheduler, operatorFinalizer) - if err := r.Update(ctx, &scheduler); err != nil { - return ctrl.Result{}, err - } - } - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil - } - componentsCopy := components.DeepCopy() mutateFn := func() error { @@ -128,58 +94,38 @@ func (r *SchedulerReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return nil } - if components.ServiceAccount != nil { - logger.Info("Upserting Scheduler ServiceAccount object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, scheduler.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Secret != nil { - logger.Info("Upserting Scheduler Secret object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, scheduler.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Jobs != nil && len(components.Jobs) > 0 { - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Jobs[0], mutateFn); err != nil { + if len(components.Jobs) > 0 { + if err := upsertObjectIfNeeded(ctx, r.Client, components.Jobs[0], scheduler.Kind, mutateFn, logger); err != nil { return ctrl.Result{}, err } - ctxTimeout, cancel := context.WithTimeout(ctx, migrationTimeout) - defer cancel() - err := waitForJob(ctxTimeout, r.Client, components.Jobs[0], migrationPollSleep) - if err != nil { + + if err := waitForJob(ctx, r.Client, components.Jobs[0], jobPollInterval, jobTimeout); err != nil { return ctrl.Result{}, err } } - if components.Deployment != nil { - logger.Info("Upserting Scheduler Deployment object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, scheduler.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Service != nil { - logger.Info("Upserting Scheduler Service object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, scheduler.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.IngressGrpc != nil { - logger.Info("Upserting Scheduler Ingress Grpc object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressGrpc, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressGrpc, scheduler.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.ServiceMonitor != nil { - logger.Info("Upserting Scheduler ServiceMonitor object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceMonitor, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceMonitor, scheduler.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } logger.Info("Successfully reconciled Scheduler object", "durationMillis", time.Since(started).Milliseconds()) diff --git a/internal/controller/install/scheduler_controller_test.go b/internal/controller/install/scheduler_controller_test.go index f900fdcc..2570e92e 100644 --- a/internal/controller/install/scheduler_controller_test.go +++ b/internal/controller/install/scheduler_controller_test.go @@ -82,6 +82,7 @@ func TestSchedulerReconciler_Reconcile(t *testing.T) { } mockK8sClient := k8sclient.NewMockClient(mockCtrl) + // Scheduler mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&v1alpha1.Scheduler{})). @@ -241,10 +242,9 @@ func TestSchedulerReconciler_ReconcileErrorDueToApplicationConfig(t *testing.T) APIVersion: "install.armadaproject.io/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "scheduler", - DeletionTimestamp: &metav1.Time{Time: time.Now()}, - Finalizers: []string{operatorFinalizer}, + Namespace: "default", + Name: "scheduler", + Finalizers: []string{operatorFinalizer}, }, Spec: v1alpha1.SchedulerSpec{ CommonSpecBase: installv1alpha1.CommonSpecBase{ diff --git a/internal/controller/install/scheduleringester_controller.go b/internal/controller/install/scheduleringester_controller.go index 8e3039c3..11ee1cff 100644 --- a/internal/controller/install/scheduleringester_controller.go +++ b/internal/controller/install/scheduleringester_controller.go @@ -24,7 +24,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" @@ -55,37 +54,30 @@ type SchedulerIngesterReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile func (r *SchedulerIngesterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) + started := time.Now() - logger.Info("Reconciling SchedulerIngester object") - - logger.Info("Fetching SchedulerIngester object from cache") - var scheduleringester installv1alpha1.SchedulerIngester - if err := r.Client.Get(ctx, req.NamespacedName, &scheduleringester); err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("SchedulerIngester not found in cache, ending reconcile") - return ctrl.Result{}, nil - } + + logger.Info("Reconciling object") + + var schedulerIngester installv1alpha1.SchedulerIngester + if miss, err := getObject(ctx, r.Client, &schedulerIngester, req.NamespacedName, logger); err != nil || miss { return ctrl.Result{}, err } - pc, err := installv1alpha1.BuildPortConfig(scheduleringester.Spec.ApplicationConfig) - if err != nil { + finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &schedulerIngester, operatorFinalizer, nil, logger) + if err != nil || finish { return ctrl.Result{}, err } - scheduleringester.Spec.PortConfig = pc - components, err := r.generateSchedulerIngesterComponents(&scheduleringester, r.Scheme) + pc, err := installv1alpha1.BuildPortConfig(schedulerIngester.Spec.ApplicationConfig) if err != nil { return ctrl.Result{}, err } + schedulerIngester.Spec.PortConfig = pc - deletionTimestamp := scheduleringester.ObjectMeta.DeletionTimestamp - // examine DeletionTimestamp to determine if object is under deletion - if !deletionTimestamp.IsZero() { - logger.Info("SchedulerIngester is being deleted") - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil + components, err := r.generateSchedulerIngesterComponents(&schedulerIngester, r.Scheme) + if err != nil { + return ctrl.Result{}, err } componentsCopy := components.DeepCopy() @@ -95,28 +87,19 @@ func (r *SchedulerIngesterReconciler) Reconcile(ctx context.Context, req ctrl.Re return nil } - if components.ServiceAccount != nil { - logger.Info("Upserting SchedulerIngester ServiceAccount object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, schedulerIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Secret != nil { - logger.Info("Upserting SchedulerIngester Secret object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, schedulerIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - if components.Deployment != nil { - logger.Info("Upserting SchedulerIngester Deployment object") - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil { - return ctrl.Result{}, err - } + if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, schedulerIngester.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err } - logger.Info("Successfully reconciled SchedulerIngester object", "durationMillis", time.Since(started).Milliseconds()) + logger.Info("Successfully reconciled object", "durationMillis", time.Since(started).Milliseconds()) return ctrl.Result{}, nil } diff --git a/internal/controller/install/scheduleringester_controller_test.go b/internal/controller/install/scheduleringester_controller_test.go index 674bd08a..d121d93d 100644 --- a/internal/controller/install/scheduleringester_controller_test.go +++ b/internal/controller/install/scheduleringester_controller_test.go @@ -57,12 +57,19 @@ func TestSchedulerIngesterReconciler_Reconcile(t *testing.T) { ownerReference := []metav1.OwnerReference{owner} mockK8sClient := k8sclient.NewMockClient(mockCtrl) + // SchedulerIngester mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&v1alpha1.SchedulerIngester{})). Return(nil). SetArg(2, expectedSchedulerIngester) + // Finalizer + mockK8sClient. + EXPECT(). + Update(gomock.Any(), gomock.AssignableToTypeOf(&installv1alpha1.SchedulerIngester{})). + Return(nil) + expectedSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: expectedSchedulerIngester.Name, @@ -233,10 +240,9 @@ func TestSchedulerIngesterReconciler_ReconcileErrorOnApplicationConfig(t *testin APIVersion: "install.armadaproject.io/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "SchedulerIngester", - DeletionTimestamp: &metav1.Time{Time: time.Now()}, - Finalizers: []string{operatorFinalizer}, + Namespace: "default", + Name: "SchedulerIngester", + Finalizers: []string{operatorFinalizer}, }, Spec: v1alpha1.SchedulerIngesterSpec{ CommonSpecBase: installv1alpha1.CommonSpecBase{ diff --git a/test/integration/armadaserver_controller_test.go b/test/integration/armadaserver_controller_test.go index 6765618a..9ed74260 100644 --- a/test/integration/armadaserver_controller_test.go +++ b/test/integration/armadaserver_controller_test.go @@ -14,7 +14,7 @@ import ( installv1alpha1 "github.com/armadaproject/armada-operator/api/install/v1alpha1" ) -var _ = Describe("Armada Operator", func() { +var _ = Describe("Armada Server Controller", func() { When("User applies a new ArmadaServer YAML using kubectl", func() { It("Kubernetes should create ArmadaServer Kubernetes resources", func() { By("Calling the ArmadaServer Controller Reconcile function", func() { diff --git a/test/integration/lookout_controller_test.go b/test/integration/lookout_controller_test.go index 08cdf794..61c3d7f6 100644 --- a/test/integration/lookout_controller_test.go +++ b/test/integration/lookout_controller_test.go @@ -14,7 +14,7 @@ import ( kclient "sigs.k8s.io/controller-runtime/pkg/client" ) -var _ = Describe("Armada Operator", func() { +var _ = Describe("Lookout Controller", func() { When("User applies Lookout YAML using kubectl", func() { It("Kubernetes should create Lookout Kubernetes resources", func() { By("Calling the Lookout Controller Reconcile function", func() {