diff --git a/api/v1beta1/module_types.go b/api/v1beta1/module_types.go index 74f7b94..95a356e 100644 --- a/api/v1beta1/module_types.go +++ b/api/v1beta1/module_types.go @@ -17,15 +17,22 @@ limitations under the License. package v1beta1 import ( + "encoding/json" "strings" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +const ( + RunRequestAnnotationKey = `terraform-applier.uw.systems/run-request` +) + // The potential reasons for events and current state const ( ReasonRunTriggered = "RunTriggered" @@ -33,6 +40,7 @@ const ( ReasonForcedApplyTriggered = "ForcedApplyTriggered" ReasonPollingRunTriggered = "PollingRunTriggered" ReasonScheduledRunTriggered = "ScheduledRunTriggered" + ReasonPRPlanTriggered = "PullRequestPlanTriggered" ReasonRunPreparationFailed = "RunPreparationFailed" ReasonDelegationFailed = "DelegationFailed" @@ -62,6 +70,8 @@ const ( ForcedPlan = "ForcedPlan" // ForcedApply indicates a forced (triggered on the UI) terraform apply. ForcedApply = "ForcedApply" + // PRPlan indicates terraform plan trigged by PullRequest on modules repo path. + PRPlan = "PullRequestPlan" ) // Overall state of Module run @@ -306,7 +316,41 @@ func (m *Module) IsPlanOnly() bool { return m.Spec.PlanOnly != nil && *m.Spec.PlanOnly } -func GetRunReason(runType string) string { +func (m *Module) NewRunRequest(reqType string) *Request { + req := Request{ + NamespacedName: types.NamespacedName{ + Namespace: m.Namespace, + Name: m.Name, + }, + RequestedAt: &metav1.Time{Time: time.Now()}, + ID: NewRequestID(), + Type: reqType, + } + + return &req +} + +// PendingRunRequest returns pending requests if any from module's annotation. +func (m *Module) PendingRunRequest() (*Request, bool) { + valueString, exists := m.ObjectMeta.Annotations[RunRequestAnnotationKey] + if !exists { + return nil, false + } + value := Request{} + if err := json.Unmarshal([]byte(valueString), &value); err != nil { + // unmarshal errors are ignored as it should not happen and if it does + // it can be treated as no request pending and module can override it + // with new valid request + return nil, false + } + value.NamespacedName = types.NamespacedName{ + Namespace: m.Namespace, + Name: m.Name, + } + return &value, true +} + +func RunReason(runType string) string { switch runType { case ScheduledRun: return ReasonScheduledRunTriggered @@ -316,6 +360,8 @@ func GetRunReason(runType string) string { return ReasonForcedPlanTriggered case ForcedApply: return ReasonForcedApplyTriggered + case PRPlan: + return ReasonPRPlanTriggered } return ReasonRunTriggered } diff --git a/api/v1beta1/request.go b/api/v1beta1/request.go new file mode 100644 index 0000000..35937da --- /dev/null +++ b/api/v1beta1/request.go @@ -0,0 +1,96 @@ +package v1beta1 + +import ( + "crypto/rand" + "encoding/base64" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var ( + ErrRunRequestExist = fmt.Errorf("another pending run request found") + ErrNoRunRequestFound = fmt.Errorf("no pending run requests found") + ErrRunRequestMismatch = fmt.Errorf("run request ID doesn't match pending request id") +) + +// Request represents terraform run request +type Request struct { + NamespacedName types.NamespacedName `json:"-"` + ID string `json:"id,omitempty"` + RequestedAt *metav1.Time `json:"reqAt,omitempty"` + Type string `json:"type,omitempty"` + PR *PullRequest `json:"pr,omitempty"` +} + +type PullRequest struct { + Number int `json:"num,omitempty"` + HeadBranch string `json:"headBranch,omitempty"` + CommentID string `json:"commentID,omitempty"` +} + +func (req *Request) Validate() error { + if req.NamespacedName.Namespace == "" { + return fmt.Errorf("namespace is required") + } + if req.NamespacedName.Name == "" { + return fmt.Errorf("name is required") + } + if req.RequestedAt.IsZero() { + return fmt.Errorf("valid timestamp is required for 'RequestedAt'") + } + + switch req.Type { + case ScheduledRun, + PollingRun, + ForcedPlan, + ForcedApply, + PRPlan: + default: + return fmt.Errorf("unknown Request type provided") + } + + return nil +} + +// IsPlanOnly will return is req is plan-only +func (req *Request) IsPlanOnly(module *Module) bool { + // for scheduled and polling run respect module spec + if req.Type == ScheduledRun || + req.Type == PollingRun { + return module.IsPlanOnly() + } + + // this is override triggered by user + if req.Type == ForcedApply { + return false + } + + // these are plan only override requests + if req.Type == PRPlan || + req.Type == ForcedPlan { + return true + } + + // its always safe to default to plan-only + return true +} + +// RepoRef returns the revision of the repository for the module source code +// based on request type +func (req *Request) RepoRef(module *Module) string { + // this is override triggered by user + if req.Type == PRPlan { + return req.PR.HeadBranch + } + + return module.Spec.RepoRef +} + +// NewRequestID generates random string as ID +func NewRequestID() string { + b := make([]byte, 6) + rand.Read(b) + return base64.StdEncoding.EncodeToString(b) +} diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 7db9e18..df98372 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -182,6 +182,21 @@ func (in *OutputStats) DeepCopy() *OutputStats { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PullRequest) DeepCopyInto(out *PullRequest) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PullRequest. +func (in *PullRequest) DeepCopy() *PullRequest { + if in == nil { + return nil + } + out := new(PullRequest) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RBAC) DeepCopyInto(out *RBAC) { *out = *in @@ -202,6 +217,31 @@ func (in *RBAC) DeepCopy() *RBAC { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Request) DeepCopyInto(out *Request) { + *out = *in + out.NamespacedName = in.NamespacedName + if in.RequestedAt != nil { + in, out := &in.RequestedAt, &out.RequestedAt + *out = (*in).DeepCopy() + } + if in.PR != nil { + in, out := &in.PR, &out.PR + *out = new(PullRequest) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Request. +func (in *Request) DeepCopy() *Request { + if in == nil { + return nil + } + out := new(Request) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Subject) DeepCopyInto(out *Subject) { *out = *in diff --git a/controllers/module_controller.go b/controllers/module_controller.go index 1413c69..e5f1a86 100644 --- a/controllers/module_controller.go +++ b/controllers/module_controller.go @@ -20,11 +20,9 @@ import ( "context" "fmt" "log/slog" - "sync" "time" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -33,7 +31,7 @@ import ( "github.com/robfig/cron/v3" tfaplv1beta1 "github.com/utilitywarehouse/terraform-applier/api/v1beta1" "github.com/utilitywarehouse/terraform-applier/git" - "github.com/utilitywarehouse/terraform-applier/runner" + "github.com/utilitywarehouse/terraform-applier/metrics" "github.com/utilitywarehouse/terraform-applier/sysutil" corev1 "k8s.io/api/core/v1" ) @@ -47,10 +45,12 @@ type ModuleReconciler struct { Recorder record.EventRecorder Clock sysutil.ClockInterface Repos git.Repositories - Queue chan<- runner.Request + Queue chan<- *tfaplv1beta1.Request Log *slog.Logger MinIntervalBetweenRuns time.Duration - RunStatus *sync.Map + MaxConcurrentRuns int + RunStatus *sysutil.RunStatus + Metrics metrics.PrometheusInterface } //+kubebuilder:rbac:groups="",resources=events,verbs=create;patch @@ -74,8 +74,8 @@ func (r *ModuleReconciler) Reconcile(ctx context.Context, req reconcile.Request) log.Log(ctx, trace, "reconciling...") - var module tfaplv1beta1.Module - if err := r.Get(ctx, req.NamespacedName, &module); err != nil { + module, err := sysutil.GetModule(ctx, r.Client, req.NamespacedName) + if err != nil { log.Error("unable to fetch terraform module", "err", err) // we'll ignore not-found errors, since they can't be fixed by an immediate requeue return ctrl.Result{}, client.IgnoreNotFound(err) @@ -93,88 +93,108 @@ func (r *ModuleReconciler) Reconcile(ctx context.Context, req reconcile.Request) if module.Spec.RepoURL == "" { msg := fmt.Sprintf("repoURL is required, please add repoURL instead of repoName:%s", module.Spec.RepoName) log.Error(msg) - r.setFailedStatus(req, &module, tfaplv1beta1.ReasonSpecsParsingFailure, msg) + r.setFailedStatus(req, module, tfaplv1beta1.ReasonSpecsParsingFailure, msg) // we don't really care about requeuing until we get an update that // fixes the repoURL, so don't return an error return ctrl.Result{}, nil } - // pollIntervalDuration is used as minimum duration for the next run + // pollIntervalDuration is used as minimum duration for re-queue pollIntervalDuration := time.Duration(module.Spec.PollInterval) * time.Second - // check module's run status + // check if module is actually running on this controller... + if _, ok := r.RunStatus.Load(req.NamespacedName.String()); ok { + // it is running so use next poll internal as minimum queue duration + return ctrl.Result{RequeueAfter: pollIntervalDuration}, nil + } + + // at this stage module is not deleting and its not currently running + // + + // check module's run status, it is possible that process got killed before + // runner could update module status or module status update API could have + // failed if module.Status.CurrentState == string(tfaplv1beta1.StatusRunning) { - // make sure module is actually running at the moment - // it is possible that process got killed before it could update module status or - // module status update could have failed and hence it stayed in running state - _, ok := r.RunStatus.Load(req.NamespacedName.String()) - if ok { - // it is running so use next poll internal as minimum queue duration - return ctrl.Result{RequeueAfter: pollIntervalDuration}, nil - } - // module is not currently running so change status and continue - msg := "wrong status found, module is not currently running" + // module is not actually running so change status and continue + msg := "wrong status found, module is not actually running" log.Error(msg) - r.setFailedStatus(req, &module, tfaplv1beta1.ReasonUnknown, msg) + r.setFailedStatus(req, module, tfaplv1beta1.ReasonUnknown, msg) } - var isPlanOnly bool - if module.Spec.PlanOnly != nil && *module.Spec.PlanOnly { - isPlanOnly = true + // case 1: + // check for run triggers + // + runReq, ok := module.PendingRunRequest() + if ok { + log.Debug("processing pending run request", "req", runReq, "delay", time.Since(runReq.RequestedAt.Time)) + // use next poll internal as minimum queue duration as status change will not trigger Reconcile + return r.triggerRunORRequeue(runReq, pollIntervalDuration) + } + // case 2: + // check for initial run + // if module.Status.RunCommitHash == "" { - log.Debug("starting initial run") - r.Queue <- runner.Request{NamespacedName: req.NamespacedName, Type: tfaplv1beta1.PollingRun, PlanOnly: isPlanOnly} + log.Debug("requesting initial run") // use next poll internal as minimum queue duration as status change will not trigger Reconcile - return ctrl.Result{RequeueAfter: pollIntervalDuration}, nil + return r.triggerRunORRequeue(module.NewRunRequest(tfaplv1beta1.PollingRun), pollIntervalDuration) } - // check for new changes on modules path + // case 3: + // check for new git hash changes on modules path + // hash, err := r.Repos.Hash(ctx, module.Spec.RepoURL, module.Spec.RepoRef, module.Spec.Path) if err != nil { msg := fmt.Sprintf("unable to get current hash of the repo err:%s", err) log.Error(msg) - r.setFailedStatus(req, &module, tfaplv1beta1.ReasonGitFailure, msg) + r.setFailedStatus(req, module, tfaplv1beta1.ReasonGitFailure, msg) // since issue is not related to module specs, requeue again in case its fixed return ctrl.Result{RequeueAfter: pollIntervalDuration}, nil } if hash != module.Status.RunCommitHash { - log.Debug("revision is changed on module path triggering run", "lastRun", module.Status.RunCommitHash, "current", hash) - r.Queue <- runner.Request{NamespacedName: req.NamespacedName, Type: tfaplv1beta1.PollingRun, PlanOnly: isPlanOnly} + log.Debug("requesting run as revision is changed on module path", "lastRun", module.Status.RunCommitHash, "current", hash) // use next poll internal as minimum queue duration as status change will not trigger Reconcile - return ctrl.Result{RequeueAfter: pollIntervalDuration}, nil + return r.triggerRunORRequeue(module.NewRunRequest(tfaplv1beta1.PollingRun), pollIntervalDuration) } + // case 4: + // check if schedule run required + // + // If No schedule is provided, just requeue for next git check if module.Spec.Schedule == "" { return ctrl.Result{RequeueAfter: pollIntervalDuration}, nil } // figure out the next times that we need to run or last missed runs time if any. - numOfMissedRuns, nextRun, err := NextSchedule(&module, r.Clock.Now(), r.MinIntervalBetweenRuns) + numOfMissedRuns, nextRun, err := NextSchedule(module, r.Clock.Now(), r.MinIntervalBetweenRuns) if err != nil { msg := fmt.Sprintf("unable to figure out CronJob schedule: err:%s", err) log.Error(msg) - r.setFailedStatus(req, &module, tfaplv1beta1.ReasonSpecsParsingFailure, msg) + r.setFailedStatus(req, module, tfaplv1beta1.ReasonSpecsParsingFailure, msg) // we don't really care about requeuing until we get an update that // fixes the schedule, so don't return an error return ctrl.Result{}, nil } if numOfMissedRuns > 0 { - log.Debug("starting scheduled run", "missed-runs", numOfMissedRuns) - r.Queue <- runner.Request{NamespacedName: req.NamespacedName, Type: tfaplv1beta1.ScheduledRun} + log.Debug("requesting scheduled run", "missed-runs", numOfMissedRuns) + // use next poll internal as minimum queue duration as status change will not trigger Reconcile + return r.triggerRunORRequeue(module.NewRunRequest(tfaplv1beta1.ScheduledRun), pollIntervalDuration) } + // default: + // No action required so requeue module + // + // Calculate shortest duration to next run requeueAfter := nextRun.Sub(r.Clock.Now()) if pollIntervalDuration < requeueAfter { requeueAfter = pollIntervalDuration } - // Requeue module again even after triggering run as status change will not trigger Reconcile return ctrl.Result{RequeueAfter: requeueAfter}, nil } @@ -251,6 +271,20 @@ func NextSchedule(module *tfaplv1beta1.Module, now time.Time, minIntervalBetween return numOfMissedRuns, sched.Next(now), nil } +// triggerRunORRequeue will check controller capacity before triggering a run +// if max limit is reached it will re-queue module to try again +func (r *ModuleReconciler) triggerRunORRequeue(runReq *tfaplv1beta1.Request, requeueAfter time.Duration) (ctrl.Result, error) { + runningModuleCount := r.RunStatus.Len() + if r.MaxConcurrentRuns <= 0 || runningModuleCount < r.MaxConcurrentRuns { + r.Queue <- runReq + r.Metrics.SetRunPending(runReq.NamespacedName.Namespace, runReq.NamespacedName.Name, false) + } else { + r.Metrics.SetRunPending(runReq.NamespacedName.Namespace, runReq.NamespacedName.Name, true) + r.Log.Warn("skipping run max concurrent run limit reached", "module", runReq.NamespacedName, "count", runningModuleCount) + } + return ctrl.Result{RequeueAfter: requeueAfter}, nil +} + func (r *ModuleReconciler) setFailedStatus(req ctrl.Request, module *tfaplv1beta1.Module, reason, msg string) { module.Status.CurrentState = string(tfaplv1beta1.StatusErrored) @@ -262,19 +296,7 @@ func (r *ModuleReconciler) setFailedStatus(req ctrl.Request, module *tfaplv1beta r.Recorder.Event(module, corev1.EventTypeWarning, reason, msg) - if err := r.patchStatus(context.Background(), req.NamespacedName, module.Status); err != nil { + if err := sysutil.PatchModuleStatus(context.Background(), r.Client, req.NamespacedName, module.Status); err != nil { r.Log.With("module", req).Error("unable to set failed status", "err", err) } } - -func (r *ModuleReconciler) patchStatus(ctx context.Context, objectKey types.NamespacedName, newStatus tfaplv1beta1.ModuleStatus) error { - module := new(tfaplv1beta1.Module) - if err := r.Get(ctx, objectKey, module); err != nil { - return err - } - - patch := client.MergeFrom(module.DeepCopy()) - module.Status = newStatus - - return r.Status().Patch(ctx, module, patch, client.FieldOwner("terraform-applier")) -} diff --git a/go.mod b/go.mod index 0e61dd6..fffa176 100644 --- a/go.mod +++ b/go.mod @@ -19,10 +19,12 @@ require ( github.com/utilitywarehouse/git-mirror v0.1.0 github.com/utilitywarehouse/go-operational v0.0.0-20220413104526-79ce40a50281 golang.org/x/oauth2 v0.18.0 + golang.org/x/time v0.5.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.3 k8s.io/apimachinery v0.29.3 k8s.io/client-go v0.29.3 + k8s.io/klog/v2 v2.120.1 sigs.k8s.io/controller-runtime v0.17.2 sigs.k8s.io/controller-runtime/tools/setup-envtest v0.0.0-20240215124517-56159419231e sigs.k8s.io/controller-tools v0.14.0 @@ -100,7 +102,6 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.18.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect @@ -109,7 +110,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.2 // indirect k8s.io/component-base v0.29.2 // indirect - k8s.io/klog/v2 v2.120.1 // indirect k8s.io/kube-openapi v0.0.0-20240227032403-f107216b40e2 // indirect k8s.io/utils v0.0.0-20240102154912-e7106e64919e // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/integration_test/module_controller_filter_test.go b/integration_test/module_controller_filter_test.go index c50351f..05f3087 100644 --- a/integration_test/module_controller_filter_test.go +++ b/integration_test/module_controller_filter_test.go @@ -39,6 +39,8 @@ var _ = Describe("Module controller without runner with label selector", func() Return(commitHash, nil).AnyTimes() testRepos.EXPECT().LogMsg(gomock.Any(), "https://host.xy/dummy/repo.git", "HEAD", "hello-filter-test"). Return(commitMsg, nil).AnyTimes() + + testMetrics.EXPECT().SetRunPending(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() }) It("Should send module with valid selector label selector to job queue", func() { diff --git a/integration_test/module_controller_no_runner_test.go b/integration_test/module_controller_no_runner_test.go index e17a6b2..48aaea0 100644 --- a/integration_test/module_controller_no_runner_test.go +++ b/integration_test/module_controller_no_runner_test.go @@ -30,6 +30,8 @@ var _ = Describe("Module controller without runner", func() { // remove any label selector testFilter.LabelSelectorKey = "" testFilter.LabelSelectorValue = "" + + testMetrics.EXPECT().SetRunPending(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() }) It("Should send module to job queue on schedule", func() { @@ -317,5 +319,52 @@ var _ = Describe("Module controller without runner", func() { // delete module to stopping requeue Expect(k8sClient.Delete(ctx, module)).Should(Succeed()) }) + + It("Should send module to job queue on pending run request", func() { + const ( + moduleName = "test-module5" + repoURL = "https://host.xy/dummy/repo2.git" + path = "dev/" + moduleName + ) + + By("By creating a new Module") + ctx := context.Background() + module := &tfaplv1beta1.Module{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "terraform-applier.uw.systems/v1beta1", + Kind: "Module", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: moduleName, + Namespace: moduleNamespace, + Annotations: map[string]string{ + tfaplv1beta1.RunRequestAnnotationKey: `{"id":"ueLMEbQj","reqAt":"2024-04-11T14:55:04Z","type":"ForcedPlan"}`, + }, + }, + Spec: tfaplv1beta1.ModuleSpec{ + RepoURL: repoURL, + Path: path, + }, + } + Expect(k8sClient.Create(ctx, module)).Should(Succeed()) + + moduleLookupKey := types.NamespacedName{Name: moduleName, Namespace: moduleNamespace} + + By("By making sure job was sent to jobQueue") + // wait for just about 60 sec default poll interval + Eventually(func() types.NamespacedName { + timer := time.NewTimer(time.Second) + for { + select { + case req := <-testControllerQueue: + return req.NamespacedName + case <-timer.C: + return types.NamespacedName{} + } + } + }, time.Second*70, interval).Should(Equal(moduleLookupKey)) + // delete module to stopping requeue + Expect(k8sClient.Delete(ctx, module)).Should(Succeed()) + }) }) }) diff --git a/integration_test/module_controller_with_runner_test.go b/integration_test/module_controller_with_runner_test.go index 47ecaf6..8bf266c 100644 --- a/integration_test/module_controller_with_runner_test.go +++ b/integration_test/module_controller_with_runner_test.go @@ -63,6 +63,7 @@ var _ = Describe("Module controller with Runner", func() { testMetrics.EXPECT().UpdateModuleRunDuration(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() testMetrics.EXPECT().UpdateModuleSuccess(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() testMetrics.EXPECT().UpdateTerraformExitCodeCount(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + testMetrics.EXPECT().SetRunPending(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() // clear state file if exits os.Remove(testStateFilePath) diff --git a/integration_test/suite_test.go b/integration_test/suite_test.go index a599f98..d47e01c 100644 --- a/integration_test/suite_test.go +++ b/integration_test/suite_test.go @@ -22,7 +22,6 @@ import ( "os" "os/exec" "path/filepath" - "sync" "testing" "time" @@ -74,14 +73,14 @@ var ( goMockCtrl *gomock.Controller testLogger *slog.Logger // testControllerQueue only used for controller behaviour testing - testControllerQueue chan runner.Request - testFilterControllerQueue chan runner.Request + testControllerQueue chan *tfaplv1beta1.Request + testFilterControllerQueue chan *tfaplv1beta1.Request testStateFilePath string testFilter *controllers.Filter // testRunnerQueue only used for send job to runner of runner testing - testRunnerQueue chan runner.Request + testRunnerQueue chan *tfaplv1beta1.Request testRepos *git.MockRepositories testMetrics *metrics.MockPrometheusInterface testDelegate *runner.MockDelegateInterface @@ -153,12 +152,12 @@ var _ = BeforeSuite(func() { T: time.Date(01, 01, 01, 0, 0, 0, 0, time.UTC), } - runStatus := new(sync.Map) + runStatus := sysutil.NewRunStatus() minIntervalBetweenRunsDuration := 1 * time.Minute - testControllerQueue = make(chan runner.Request) - testFilterControllerQueue = make(chan runner.Request) - testRunnerQueue = make(chan runner.Request) + testControllerQueue = make(chan *tfaplv1beta1.Request) + testFilterControllerQueue = make(chan *tfaplv1beta1.Request) + testRunnerQueue = make(chan *tfaplv1beta1.Request) goMockCtrl = gomock.NewController(RecoveringGinkgoT()) @@ -178,6 +177,7 @@ var _ = BeforeSuite(func() { Log: testLogger.With("logger", "manager"), MinIntervalBetweenRuns: minIntervalBetweenRunsDuration, RunStatus: runStatus, + Metrics: testMetrics, } testFilter = &controllers.Filter{ diff --git a/main.go b/main.go index 6aedff1..a72dbca 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "os/signal" "path/filepath" "strings" - "sync" "syscall" "time" @@ -102,6 +101,12 @@ var ( Value: 60, Usage: "The minimum interval in seconds, user can set between 2 consecutive runs. This value defines the frequency of runs.", }, + &cli.IntFlag{ + Name: "max-concurrent-runs", + EnvVars: []string{"MAX_CONCURRENT_RUNS"}, + Value: 10, + Usage: "The maximum number of concurrent module runs allowed on controller. if its 0 there is no limit", + }, &cli.IntFlag{ Name: "termination-grace-period", EnvVars: []string{"TERMINATION_GRACE_PERIOD"}, @@ -373,6 +378,14 @@ func setupGlobalEnv(c *cli.Context) { // terraform depends on git for pulling remote modules globalRunEnv["PATH"] = os.Getenv("PATH") + // setup plugin cache + pluginCache, err := os.MkdirTemp("", "plugin-cache") + if err != nil { + logger.Error("unable to create plugin cache dir", "err", err) + os.Exit(1) + } + globalRunEnv["TF_PLUGIN_CACHE_DIR"] = pluginCache + for _, env := range strings.Split(c.String("controller-runtime-env"), ",") { globalRunEnv[env] = os.Getenv(env) } @@ -412,8 +425,7 @@ current-context: in-cluster users: [] preferences: {} ` - err := os.WriteFile(configPath, []byte(defaultCurrentKubeConfig), 0666) - if err != nil { + if err := os.WriteFile(configPath, []byte(defaultCurrentKubeConfig), 0666); err != nil { logger.Error("unable to create custom in cluster config file", "err", err) os.Exit(1) } @@ -486,9 +498,9 @@ func run(c *cli.Context) { ctrl.SetLogger(logr.FromSlogHandler(logger.Handler())) // runStatus keeps track of currently running modules - runStatus := new(sync.Map) + runStatus := sysutil.NewRunStatus() - moduleQueue := make(chan runner.Request) + moduleQueue := make(chan *tfaplv1beta1.Request) done := make(chan bool, 1) clock := &sysutil.Clock{} @@ -596,7 +608,9 @@ func run(c *cli.Context) { Repos: repos, Log: logger.With("logger", "manager"), MinIntervalBetweenRuns: time.Duration(c.Int("min-interval-between-runs")) * time.Second, + MaxConcurrentRuns: c.Int("max-concurrent-runs"), RunStatus: runStatus, + Metrics: metrics, }).SetupWithManager(mgr, filter); err != nil { logger.Error("unable to create module controller", "err", err) os.Exit(1) @@ -655,7 +669,6 @@ func run(c *cli.Context) { ListenAddress: c.String("webserver-bind-address"), ClusterClt: mgr.GetClient(), KubeClient: kubeClient, - RunQueue: moduleQueue, RunStatus: runStatus, Log: logger.With("logger", "webserver"), } diff --git a/metrics/mock_prometheus.go b/metrics/mock_prometheus.go index cd42bef..e60190c 100644 --- a/metrics/mock_prometheus.go +++ b/metrics/mock_prometheus.go @@ -33,6 +33,18 @@ func (m *MockPrometheusInterface) EXPECT() *MockPrometheusInterfaceMockRecorder return m.recorder } +// SetRunPending mocks base method. +func (m *MockPrometheusInterface) SetRunPending(arg0, arg1 string, arg2 bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetRunPending", arg0, arg1, arg2) +} + +// SetRunPending indicates an expected call of SetRunPending. +func (mr *MockPrometheusInterfaceMockRecorder) SetRunPending(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetRunPending", reflect.TypeOf((*MockPrometheusInterface)(nil).SetRunPending), arg0, arg1, arg2) +} + // UpdateModuleRunDuration mocks base method. func (m *MockPrometheusInterface) UpdateModuleRunDuration(arg0, arg1 string, arg2 float64, arg3 bool) { m.ctrl.T.Helper() diff --git a/metrics/prometheus.go b/metrics/prometheus.go index fe621cc..466e8bf 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -19,6 +19,7 @@ type PrometheusInterface interface { UpdateTerraformExitCodeCount(string, string, string, int) UpdateModuleSuccess(string, string, bool) UpdateModuleRunDuration(string, string, float64, bool) + SetRunPending(string, string, bool) } // Prometheus implements instrumentation of metrics for terraform-applier. @@ -31,6 +32,7 @@ type Prometheus struct { terraformExitCodeCount *prometheus.CounterVec moduleRunCount *prometheus.CounterVec moduleRunDuration *prometheus.HistogramVec + moduleRunPending *prometheus.GaugeVec moduleRunSuccess *prometheus.GaugeVec moduleRunTimestamp *prometheus.GaugeVec } @@ -68,6 +70,18 @@ func (p *Prometheus) Init() { "success", }, ) + p.moduleRunPending = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "module_run_pending", + Help: "is module ready to run but not yet started run?", + }, + []string{ + // Name of the module + "module", + // Namespace name of the module that was ran + "namespace", + }, + ) p.moduleRunSuccess = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, Name: "module_last_run_success", @@ -112,6 +126,7 @@ func (p *Prometheus) Init() { p.moduleRunCount, p.moduleRunDuration, p.moduleRunSuccess, + p.moduleRunPending, p.moduleRunTimestamp, p.terraformExitCodeCount, ) @@ -164,3 +179,15 @@ func (p *Prometheus) setRunSuccess(module, namespace string, success bool) { "namespace": namespace, }).Set(as) } + +// setRunPending sets pending status for a module +func (p *Prometheus) SetRunPending(module, namespace string, pending bool) { + as := float64(0) + if pending { + as = 1 + } + p.moduleRunPending.With(prometheus.Labels{ + "module": module, + "namespace": namespace, + }).Set(as) +} diff --git a/runner/delegation.go b/runner/delegation.go index b89b0f3..5e265ca 100644 --- a/runner/delegation.go +++ b/runner/delegation.go @@ -7,8 +7,8 @@ import ( "os" tfaplv1beta1 "github.com/utilitywarehouse/terraform-applier/api/v1beta1" + "github.com/utilitywarehouse/terraform-applier/sysutil" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/util/cert" @@ -34,7 +34,7 @@ func (d *Delegate) SetupDelegation(ctx context.Context, jwt string) (kubernetes. } func (d *Delegate) DelegateToken(ctx context.Context, kubeClt kubernetes.Interface, module *tfaplv1beta1.Module) (string, error) { - secret, err := kubeClt.CoreV1().Secrets(module.Namespace).Get(ctx, module.Spec.DelegateServiceAccountSecretRef, metav1.GetOptions{}) + secret, err := sysutil.GetSecret(ctx, kubeClt, module.Namespace, module.Spec.DelegateServiceAccountSecretRef) if err != nil { return "", fmt.Errorf(`unable to get delegate token secret "%s/%s" err:%w`, module.Namespace, module.Spec.DelegateServiceAccountSecretRef, err) } @@ -85,16 +85,14 @@ func fetchEnvVars(ctx context.Context, client kubernetes.Interface, module *tfap } if env.ValueFrom.ConfigMapKeyRef != nil { - - cm, err := client.CoreV1().ConfigMaps(module.Namespace).Get(ctx, env.ValueFrom.ConfigMapKeyRef.Name, metav1.GetOptions{}) + cm, err := sysutil.GetConfigMaps(ctx, client, module.Namespace, env.ValueFrom.ConfigMapKeyRef.Name) if err != nil { return nil, fmt.Errorf("unable to get valueFrom configMap:%s err:%w", env.ValueFrom.ConfigMapKeyRef.Name, err) } kvPairs[env.Name] = cm.Data[env.ValueFrom.ConfigMapKeyRef.Key] } else if env.ValueFrom.SecretKeyRef != nil { - - secret, err := client.CoreV1().Secrets(module.Namespace).Get(ctx, env.ValueFrom.SecretKeyRef.Name, metav1.GetOptions{}) + secret, err := sysutil.GetSecret(ctx, client, module.Namespace, env.ValueFrom.SecretKeyRef.Name) if err != nil { return nil, fmt.Errorf("unable to get valueFrom configMap:%s err:%w", env.ValueFrom.SecretKeyRef.Name, err) } diff --git a/runner/runner.go b/runner/runner.go index 74fb9a6..7d74be5 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -26,26 +26,20 @@ var ( reApplyStatus = regexp.MustCompile(`.*(Apply complete! .* destroyed)`) ) -type Request struct { - types.NamespacedName - Type string - PlanOnly bool -} - type Runner struct { Clock sysutil.ClockInterface ClusterClt client.Client Recorder record.EventRecorder KubeClt kubernetes.Interface Repos git.Repositories - Queue <-chan Request + Queue <-chan *tfaplv1beta1.Request Log *slog.Logger Delegate DelegateInterface Metrics metrics.PrometheusInterface TerraformExecPath string TerminationGracePeriod time.Duration AWSSecretsEngineConfig vault.AWSSecretsEngineInterface - RunStatus *sync.Map + RunStatus *sysutil.RunStatus GlobalENV map[string]string } @@ -71,9 +65,14 @@ func (r *Runner) Start(ctx context.Context, done chan bool) { case req := <-r.Queue: wg.Add(1) - go func(req Request) { + go func(req *tfaplv1beta1.Request) { defer wg.Done() + if err := req.Validate(); err != nil { + r.Log.Error("run triggered with invalid request", "req", req, "err", err) + return + } + start := time.Now() r.Log.Info("starting run", "module", req.NamespacedName, "type", req.Type) @@ -85,15 +84,15 @@ func (r *Runner) Start(ctx context.Context, done chan bool) { } else { r.Log.Error("run completed with error", "module", req.NamespacedName) } - r.Metrics.UpdateModuleSuccess(req.Name, req.Namespace, success) - r.Metrics.UpdateModuleRunDuration(req.Name, req.Namespace, time.Since(start).Seconds(), success) + r.Metrics.UpdateModuleSuccess(req.NamespacedName.Name, req.NamespacedName.Namespace, success) + r.Metrics.UpdateModuleRunDuration(req.NamespacedName.Name, req.NamespacedName.Namespace, time.Since(start).Seconds(), success) }(req) } } } // process will prepare and run module it returns bool indicating failed run -func (r *Runner) process(req Request, cancelChan <-chan struct{}) bool { +func (r *Runner) process(req *tfaplv1beta1.Request, cancelChan <-chan struct{}) bool { log := r.Log.With("module", req.NamespacedName) // make sure module is not already running @@ -106,13 +105,20 @@ func (r *Runner) process(req Request, cancelChan <-chan struct{}) bool { r.RunStatus.Store(req.NamespacedName.String(), true) defer r.RunStatus.Delete(req.NamespacedName.String()) + // remove pending run request regardless of run outcome + defer func() { + if err := sysutil.RemoveRequest(context.Background(), r.ClusterClt, req); err != nil { + log.Error("unable to remove run request", "err", err) + } + }() + // create new context ctx, cancel := context.WithCancel(context.Background()) defer cancel() // get Object - module := new(tfaplv1beta1.Module) - if err := r.ClusterClt.Get(ctx, req.NamespacedName, module); err != nil { + module, err := sysutil.GetModule(ctx, r.ClusterClt, req.NamespacedName) + if err != nil { log.Error("unable to fetch terraform module", "err", err) return false } @@ -239,7 +245,7 @@ func (r *Runner) process(req Request, cancelChan <-chan struct{}) bool { // it returns bool indicating success or failure func (r *Runner) runTF( ctx context.Context, - req Request, + req *tfaplv1beta1.Request, module *tfaplv1beta1.Module, te TFExecuter, backendConf map[string]string, @@ -318,7 +324,7 @@ func (r *Runner) runTF( } // return if plan only mode - if req.PlanOnly { + if req.IsPlanOnly(module) { if err = r.SetRunFinishedStatus(req.NamespacedName, module, tfaplv1beta1.ReasonPlanedDriftDetected, "PlanOnly/"+planStatus, r.Clock.Now()); err != nil { log.Error("unable to set drift status", "err", err) return false @@ -369,10 +375,10 @@ func (r *Runner) runTF( func (r *Runner) SetProgressingStatus(objectKey types.NamespacedName, m *tfaplv1beta1.Module, msg string) error { m.Status.CurrentState = string(tfaplv1beta1.StatusRunning) m.Status.StateMessage = tfaplv1beta1.NormaliseStateMsg(msg) - return r.patchStatus(context.Background(), objectKey, m.Status) + return sysutil.PatchModuleStatus(context.Background(), r.ClusterClt, objectKey, m.Status) } -func (r *Runner) SetRunStartedStatus(req Request, m *tfaplv1beta1.Module, msg, commitHash, commitMsg, remoteURL string, now time.Time) error { +func (r *Runner) SetRunStartedStatus(req *tfaplv1beta1.Request, m *tfaplv1beta1.Module, msg, commitHash, commitMsg, remoteURL string, now time.Time) error { m.Status.CurrentState = string(tfaplv1beta1.StatusRunning) m.Status.RunType = req.Type @@ -383,11 +389,11 @@ func (r *Runner) SetRunStartedStatus(req Request, m *tfaplv1beta1.Module, msg, c m.Status.RunCommitMsg = commitMsg m.Status.RemoteURL = remoteURL m.Status.StateMessage = tfaplv1beta1.NormaliseStateMsg(msg) - m.Status.StateReason = tfaplv1beta1.GetRunReason(req.Type) + m.Status.StateReason = tfaplv1beta1.RunReason(req.Type) - r.Recorder.Eventf(m, corev1.EventTypeNormal, tfaplv1beta1.GetRunReason(req.Type), "%s: type:%s, commit:%s", msg, req.Type, commitHash) + r.Recorder.Eventf(m, corev1.EventTypeNormal, tfaplv1beta1.RunReason(req.Type), "%s: type:%s, commit:%s", msg, req.Type, commitHash) - return r.patchStatus(context.Background(), req.NamespacedName, m.Status) + return sysutil.PatchModuleStatus(context.Background(), r.ClusterClt, req.NamespacedName, m.Status) } func (r *Runner) SetRunFinishedStatus(objectKey types.NamespacedName, m *tfaplv1beta1.Module, reason, msg string, now time.Time) error { @@ -398,10 +404,10 @@ func (r *Runner) SetRunFinishedStatus(objectKey types.NamespacedName, m *tfaplv1 r.Recorder.Event(m, corev1.EventTypeNormal, reason, msg) - return r.patchStatus(context.Background(), objectKey, m.Status) + return sysutil.PatchModuleStatus(context.Background(), r.ClusterClt, objectKey, m.Status) } -func (r *Runner) setFailedStatus(req Request, module *tfaplv1beta1.Module, reason, msg string, now time.Time) { +func (r *Runner) setFailedStatus(req *tfaplv1beta1.Request, module *tfaplv1beta1.Module, reason, msg string, now time.Time) { module.Status.CurrentState = string(tfaplv1beta1.StatusErrored) module.Status.RunDuration = &metav1.Duration{Duration: now.Sub(module.Status.RunStartedAt.Time).Round(time.Second)} @@ -410,49 +416,11 @@ func (r *Runner) setFailedStatus(req Request, module *tfaplv1beta1.Module, reaso r.Recorder.Event(module, corev1.EventTypeWarning, reason, fmt.Sprintf("%q", msg)) - if err := r.patchStatus(context.Background(), req.NamespacedName, module.Status); err != nil { - r.Log.With("module", req.NamespacedName).Error("unable to set failed status", "err", err) + if err := sysutil.PatchModuleStatus(context.Background(), r.ClusterClt, req.NamespacedName, module.Status); err != nil { + r.Log.With("module", req).Error("unable to set failed status", "err", err) } } -func (r *Runner) patchStatus(ctx context.Context, objectKey types.NamespacedName, newStatus tfaplv1beta1.ModuleStatus) error { - maxAttempt := 5 - waitMultiplier := 5 //sec - - log := r.Log.With("module", objectKey) - - var attempt int - for { - attempt++ - - err := r.tryPatchStatus(ctx, objectKey, newStatus) - if err == nil { - return nil - } - if attempt > maxAttempt { - return fmt.Errorf("unable to set status, max attempted reached : err:%w", err) - } - - retryAfter := attempt * waitMultiplier - - log.Warn("unable to set status will try again", "attempt", attempt, "retryAfter", retryAfter, "err", err) - - time.Sleep(time.Second * time.Duration(retryAfter)) - } -} - -func (r *Runner) tryPatchStatus(ctx context.Context, objectKey types.NamespacedName, newStatus tfaplv1beta1.ModuleStatus) error { - module := new(tfaplv1beta1.Module) - if err := r.ClusterClt.Get(ctx, objectKey, module); err != nil { - return err - } - - patch := client.MergeFrom(module.DeepCopy()) - module.Status = newStatus - - return r.ClusterClt.Status().Patch(ctx, module, patch, client.FieldOwner("terraform-applier")) -} - func isChannelClosed(cancelChan <-chan struct{}) bool { select { case _, ok := <-cancelChan: diff --git a/sysutil/filesystem.go b/sysutil/filesystem.go index c01027a..72711ba 100644 --- a/sysutil/filesystem.go +++ b/sysutil/filesystem.go @@ -5,25 +5,8 @@ import ( "io" "os" "path" - "path/filepath" ) -// ListDirs walks the directory tree rooted at the path and adds all non-directory file paths to a []string. -func ListDirs(rootPath string) ([]string, error) { - var dirs []string - files, err := os.ReadDir(rootPath) - if err != nil { - return dirs, fmt.Errorf("Could not read %s error=(%v)", rootPath, err) - } - - for _, file := range files { - if file.IsDir() { - dirs = append(dirs, filepath.Join(rootPath, file.Name())) - } - } - return dirs, nil -} - // CopyFile copies a file func CopyFile(src, dst string) error { var err error diff --git a/sysutil/k8s_api.go b/sysutil/k8s_api.go new file mode 100644 index 0000000..47515fd --- /dev/null +++ b/sysutil/k8s_api.go @@ -0,0 +1,144 @@ +package sysutil + +import ( + "context" + "fmt" + "time" + + tfaplv1beta1 "github.com/utilitywarehouse/terraform-applier/api/v1beta1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + k8sAPICallRetryInterval = 5 * time.Second // How much time to wait in between retrying a k8s API call + k8sAPICallRetryTimeout = 5 * time.Minute // How long to wait until we determine that the k8s API is definitively unavailable +) + +var ( + retriableError = func(err error) bool { + return errors.IsConflict(err) || + errors.IsServiceUnavailable(err) || + errors.IsServerTimeout(err) || + errors.IsTimeout(err) || + errors.IsTooManyRequests(err) + } +) + +// CallWithBackOff uses wait.PollUntilContextTimeout to retry +// this function should be used to get object +func PollUntilTimeout(ctx context.Context, fn func(ctx context.Context) error) error { + var apiErr error + err := wait.PollUntilContextTimeout(ctx, k8sAPICallRetryInterval, k8sAPICallRetryTimeout, true, func(ctx context.Context) (bool, error) { + apiErr = fn(ctx) + switch { + case apiErr == nil: + return true, nil + case retriableError(apiErr): + return false, nil + default: + return false, apiErr + } + }) + if wait.Interrupted(err) { + err = apiErr + } + return err +} + +// CallWithBackOff uses wait.ExponentialBackoffWithContext with default retry +// this function should be used to update or patch object +func CallWithBackOff(ctx context.Context, fn func(ctx context.Context) error) error { + var apiErr error + err := wait.ExponentialBackoffWithContext(ctx, retry.DefaultRetry, func(ctx context.Context) (bool, error) { + apiErr = fn(ctx) + switch { + case apiErr == nil: + return true, nil + case retriableError(apiErr): + return false, nil + default: + return false, apiErr + } + }) + if wait.Interrupted(err) { + err = apiErr + } + return err +} + +// GetModule will use PollUntilContextTimeout to get requested module +func GetModule(ctx context.Context, client client.Client, key types.NamespacedName) (*tfaplv1beta1.Module, error) { + module := new(tfaplv1beta1.Module) + + err := PollUntilTimeout(ctx, func(ctx context.Context) (err error) { + return client.Get(ctx, key, module) + }) + if err != nil { + return nil, fmt.Errorf("timed out trying to get module err:%w", err) + } + return module, nil +} + +// GetSecret will use PollUntilContextTimeout to get requested secret +func GetSecret(ctx context.Context, client kubernetes.Interface, namespace, name string) (*corev1.Secret, error) { + var secret *corev1.Secret + + err := PollUntilTimeout(ctx, func(ctx context.Context) (err error) { + secret, err = client.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) + return err + }) + if err != nil { + return nil, fmt.Errorf("timed out trying to get secret err:%w", err) + } + return secret, nil +} + +// GetConfigMaps will use PollUntilContextTimeout to get requested ConfigMaps +func GetConfigMaps(ctx context.Context, client kubernetes.Interface, namespace, name string) (*corev1.ConfigMap, error) { + var cm *corev1.ConfigMap + + err := PollUntilTimeout(ctx, func(ctx context.Context) (err error) { + cm, err = client.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{}) + return err + }) + if err != nil { + return nil, fmt.Errorf("timed out trying to get configMap err:%w", err) + } + + return cm, nil +} + +// PatchModuleStatus will re-try patching with back-off +func PatchModuleStatus(ctx context.Context, c client.Client, objectKey types.NamespacedName, newStatus tfaplv1beta1.ModuleStatus) error { + tryPatch := func(ctx context.Context) error { + // refetch module on every try, since + // if you got a conflict on the last update attempt then you need to get + // the current version before making your own changes. + module, err := GetModule(ctx, c, objectKey) + if err != nil { + return err + } + + // Make whatever updates to the resource are needed + patch := client.MergeFrom(module.DeepCopy()) + module.Status = newStatus + + // You have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return c.Status().Patch(ctx, module, patch, client.FieldOwner("terraform-applier")) + } + + err := CallWithBackOff(ctx, tryPatch) + if err != nil { + return fmt.Errorf("unable to set status, max attempted reached err:%w", err) + } + + return nil +} diff --git a/sysutil/k8s_req_api.go b/sysutil/k8s_req_api.go new file mode 100644 index 0000000..cd317a1 --- /dev/null +++ b/sysutil/k8s_req_api.go @@ -0,0 +1,89 @@ +package sysutil + +import ( + "context" + "encoding/json" + "fmt" + + tfaplv1beta1 "github.com/utilitywarehouse/terraform-applier/api/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// EnsureRequest will try to add run request annotation with back-off +func EnsureRequest(ctx context.Context, client client.Client, req *tfaplv1beta1.Request) error { + if err := req.Validate(); err != nil { + return err + } + + tryUpdate := func(ctx context.Context) error { + // refetch module on every try + module, err := GetModule(ctx, client, req.NamespacedName) + if err != nil { + return err + } + + existingReq, ok := module.PendingRunRequest() + if ok { + // if annotated request ID is matching then nothing to do + if req.ID == existingReq.ID { + return nil + } else { + return tfaplv1beta1.ErrRunRequestExist + } + } + + if module.ObjectMeta.Annotations == nil { + module.ObjectMeta.Annotations = make(map[string]string) + } + valueBytes, err := json.Marshal(&req) + if err != nil { + return err + } + module.ObjectMeta.Annotations[tfaplv1beta1.RunRequestAnnotationKey] = string(valueBytes) + + // return err itself here (not wrapped inside another error) + // so that ExponentialBackoffWithContext can identify it correctly. + return client.Update(ctx, module) + } + + err := CallWithBackOff(ctx, tryUpdate) + if err != nil { + return fmt.Errorf("unable to set run request err:%w", err) + } + + return nil +} + +// RemoveRequest will try to remove given run request +// it will error if given request id doesn't match existing pending request ID +func RemoveRequest(ctx context.Context, client client.Client, req *tfaplv1beta1.Request) error { + tryUpdate := func(ctx context.Context) error { + // refetch module on every try + module, err := GetModule(ctx, client, req.NamespacedName) + if err != nil { + return err + } + + existingReq, ok := module.PendingRunRequest() + if !ok { + return tfaplv1beta1.ErrNoRunRequestFound + } + + if req.ID != existingReq.ID { + return tfaplv1beta1.ErrRunRequestMismatch + } + + delete(module.ObjectMeta.Annotations, tfaplv1beta1.RunRequestAnnotationKey) + + // return err itself here (not wrapped inside another error) + // so that ExponentialBackoffWithContext can identify it correctly. + return client.Update(ctx, module) + } + + err := CallWithBackOff(ctx, tryUpdate) + if err != nil { + return fmt.Errorf("unable to remove pending run request err:%w", err) + } + + return nil +} diff --git a/sysutil/runstatus.go b/sysutil/runstatus.go new file mode 100644 index 0000000..37230e6 --- /dev/null +++ b/sysutil/runstatus.go @@ -0,0 +1,52 @@ +package sysutil + +import "sync" + +// RunStatus is the Map with Lock so its safe for concurrent use +// sync.Map is not used as it doesn't have Len() function and normal map with +// lock will do for our limited use case +type RunStatus struct { + *sync.RWMutex + status map[string]interface{} +} + +func NewRunStatus() *RunStatus { + return &RunStatus{ + &sync.RWMutex{}, + make(map[string]interface{}), + } +} + +// Delete deletes the value for a key. +func (rs *RunStatus) Delete(key string) { + rs.Lock() + defer rs.Unlock() + + delete(rs.status, key) +} + +// Len returns current length of the Map +func (rs *RunStatus) Len() int { + rs.RLock() + defer rs.RUnlock() + + return len(rs.status) +} + +// Load returns the value stored in the map for a key, or nil if no value is present. +// The ok result indicates whether value was found in the map. +func (rs *RunStatus) Load(key string) (interface{}, bool) { + rs.RLock() + defer rs.RUnlock() + + v, ok := rs.status[key] + return v, ok +} + +// Store sets the value for a key. +func (rs *RunStatus) Store(key string, value interface{}) { + rs.Lock() + defer rs.Unlock() + + rs.status[key] = value +} diff --git a/webserver/template_test.go b/webserver/template_test.go index 2f56a25..b39330a 100644 --- a/webserver/template_test.go +++ b/webserver/template_test.go @@ -55,8 +55,11 @@ func Test_ExecuteTemplate(t *testing.T) { }, }, { - TypeMeta: metav1.TypeMeta{APIVersion: "terraform-applier.uw.systems/v1beta1", Kind: "Module"}, - ObjectMeta: metav1.ObjectMeta{Name: "groups", Namespace: "bar"}, + TypeMeta: metav1.TypeMeta{APIVersion: "terraform-applier.uw.systems/v1beta1", Kind: "Module"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "groups", Namespace: "bar", + Annotations: map[string]string{tfaplv1beta1.RunRequestAnnotationKey: `'{"id":"VMqlQIIX","reqAt":"2024-04-11T15:05:46Z","type":"ForcedPlan"}'`}, + }, Spec: tfaplv1beta1.ModuleSpec{ RepoURL: "ssh://git@github.com/utilitywarehouse/terraform-applier.git", RepoRef: "as-test-module", diff --git a/webserver/templates/status.html b/webserver/templates/status.html index 51f07a1..d6e6b00 100644 --- a/webserver/templates/status.html +++ b/webserver/templates/status.html @@ -179,6 +179,13 @@

{{.Name}}
+
+
Pending request
+
+
{{index .ObjectMeta.Annotations "terraform-applier.uw.systems/run-request"}}
+
+
+
Message
{{.Status.StateMessage}}
diff --git a/webserver/webserver.go b/webserver/webserver.go index 12a6b16..633b407 100644 --- a/webserver/webserver.go +++ b/webserver/webserver.go @@ -10,11 +10,10 @@ import ( "io" "log/slog" "net/http" - "sync" "github.com/gorilla/mux" tfaplv1beta1 "github.com/utilitywarehouse/terraform-applier/api/v1beta1" - "github.com/utilitywarehouse/terraform-applier/runner" + "github.com/utilitywarehouse/terraform-applier/sysutil" "github.com/utilitywarehouse/terraform-applier/webserver/oidc" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -37,8 +36,7 @@ type WebServer struct { Authenticator *oidc.Authenticator ClusterClt client.Client KubeClient kubernetes.Interface - RunQueue chan<- runner.Request - RunStatus *sync.Map + RunStatus *sysutil.RunStatus Log *slog.Logger } @@ -95,8 +93,7 @@ type ForceRunHandler struct { Authenticator *oidc.Authenticator ClusterClt client.Client KubeClt kubernetes.Interface - RunQueue chan<- runner.Request - RunStatus *sync.Map + RunStatus *sysutil.RunStatus Log *slog.Logger } @@ -124,6 +121,8 @@ func (f *ForceRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var user *oidc.UserInfo var err error + // authentication + // check if user logged in if f.Authenticator != nil { user, err = f.Authenticator.UserInfo(r.Context(), r) if err != nil { @@ -166,10 +165,6 @@ func (f *ForceRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Namespace: payload["namespace"], Name: payload["module"], } - isPlanOnly := true - if payload["planOnly"] == "false" { - isPlanOnly = false - } var module tfaplv1beta1.Module err = f.ClusterClt.Get(r.Context(), namespacedName, &module) @@ -181,6 +176,8 @@ func (f *ForceRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // authorisation + // check if user has access if f.Authenticator != nil { // this should not happen but just in case if user == nil { @@ -210,7 +207,7 @@ func (f *ForceRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - f.Log.Info("force run triggered", "module", namespacedName, "isPlanOnly", isPlanOnly, "user", user.Email) + f.Log.Info("requesting force run...", "module", namespacedName, "user", user.Email) } // make sure module is not already running @@ -223,20 +220,34 @@ func (f *ForceRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - req := runner.Request{ - NamespacedName: namespacedName, - Type: tfaplv1beta1.ForcedPlan, - PlanOnly: isPlanOnly, - } - if !isPlanOnly { - req.Type = tfaplv1beta1.ForcedApply + reqType := tfaplv1beta1.ForcedPlan + if payload["planOnly"] == "false" { + reqType = tfaplv1beta1.ForcedApply } - f.RunQueue <- req + req := module.NewRunRequest(reqType) - data.Result = "success" - data.Message = "Run queued" - w.WriteHeader(http.StatusOK) + err = sysutil.EnsureRequest(r.Context(), f.ClusterClt, req) + switch { + case err == nil: + data.Result = "success" + data.Message = "Run queued" + f.Log.Info("force run requested", "module", namespacedName, "req", req) + w.WriteHeader(http.StatusOK) + return + case errors.Is(err, tfaplv1beta1.ErrRunRequestExist): + data.Result = "error" + data.Message = "Unable to request run as another request is pending" + f.Log.Error("unable to request force run", "module", namespacedName, "err", err) + w.WriteHeader(http.StatusConflict) + return + default: + data.Result = "error" + data.Message = "internal error" + f.Log.Error("unable to request force run", "module", namespacedName, "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } } // Start starts the webserver using the given port, and sets up handlers for: @@ -262,7 +273,6 @@ func (ws *WebServer) Start(ctx context.Context) error { ws.Authenticator, ws.ClusterClt, ws.KubeClient, - ws.RunQueue, ws.RunStatus, ws.Log, }