diff --git a/pkg/bootstrap/steps/install_wge.go b/pkg/bootstrap/steps/install_wge.go index ec05f2e975..2fdf64ba28 100644 --- a/pkg/bootstrap/steps/install_wge.go +++ b/pkg/bootstrap/steps/install_wge.go @@ -11,6 +11,8 @@ import ( "github.com/weaveworks/weave-gitops-enterprise/pkg/bootstrap/utils" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/object" ) const ( @@ -36,12 +38,18 @@ const ( gitopssetsHealthBindAddress = ":8081" ) +var Components = []string{"cluster-controller-manager", + "gitopssets-controller-manager", + "weave-gitops-enterprise-mccp-cluster-bootstrap-controller", + "weave-gitops-enterprise-mccp-cluster-service"} + // NewInstallWGEStep step to install Weave GitOps Enterprise func NewInstallWGEStep() BootstrapStep { return BootstrapStep{ - Name: "Install Weave GitOps Enterprise", - Input: []StepInput{}, - Step: installWge, + Name: "Install Weave GitOps Enterprise", + Input: []StepInput{}, + Step: installWge, + Verify: verifyComponents, } } @@ -233,3 +241,37 @@ func constructWGEhelmRelease(valuesFile valuesFile, chartVersion string) (string return utils.CreateHelmReleaseYamlString(wgeHelmRelease) } + +func verifyComponents(output []StepOutput, c *Config) error { + c.Logger.Waitingf("waiting for components to be healthy") + err := reportComponentsHealth(c, Components, WGEDefaultNamespace, 5*time.Minute) + if err != nil { + return err + } + return nil +} + +func reportComponentsHealth(c *Config, componentNames []string, namespace string, timeout time.Duration) error { + // Initialize the status checker + checker, err := utils.NewStatusChecker(c.KubernetesClient, 5*time.Second, timeout, c.Logger) + if err != nil { + return err + } + + // Construct a list of resources to check + var identifiers []object.ObjMetadata + for _, name := range componentNames { + identifiers = append(identifiers, object.ObjMetadata{ + Namespace: namespace, + Name: name, + GroupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"}, + }) + } + + // Perform the health check + if err := checker.Assess(identifiers...); err != nil { + return err + } + + return nil +} diff --git a/pkg/bootstrap/steps/install_wge_test.go b/pkg/bootstrap/steps/install_wge_test.go index 3b15c30126..88acb79c33 100644 --- a/pkg/bootstrap/steps/install_wge_test.go +++ b/pkg/bootstrap/steps/install_wge_test.go @@ -148,6 +148,8 @@ func TestInstallWge_Execute(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { step := NewInstallWGEStep() + // skip verify step + step.Verify = nil gotOutputs, err := step.Execute(&tt.config) if tt.wantErr != "" { if msg := err.Error(); msg != tt.wantErr { diff --git a/pkg/bootstrap/steps/step.go b/pkg/bootstrap/steps/step.go index 076fd877fe..5b373c21f4 100644 --- a/pkg/bootstrap/steps/step.go +++ b/pkg/bootstrap/steps/step.go @@ -14,9 +14,10 @@ import ( // It is abstracted to have a generic way to handle them, so we could achieve easier // extensibility, consistency and maintainability. type BootstrapStep struct { - Name string - Input []StepInput - Step func(input []StepInput, c *Config) ([]StepOutput, error) + Name string + Input []StepInput + Step func(input []StepInput, c *Config) ([]StepOutput, error) + Verify func(output []StepOutput, c *Config) error } // StepInput represents an input a step requires to execute it. for example user needs to introduce a string or a password. @@ -135,6 +136,14 @@ func (s BootstrapStep) Execute(c *Config) ([]StepOutput, error) { if err != nil { return []StepOutput{}, fmt.Errorf("cannot process output '%s': %v", s.Name, err) } + + //verify the result of the step if the function is defined in the step + if s.Verify != nil { + if err := s.Verify(outputs, c); err != nil { + return []StepOutput{}, fmt.Errorf("cannot verify '%s': %v", s.Name, err) + } + } + return outputs, nil } diff --git a/pkg/bootstrap/utils/status.go b/pkg/bootstrap/utils/status.go new file mode 100644 index 0000000000..c9e7a0170b --- /dev/null +++ b/pkg/bootstrap/utils/status.go @@ -0,0 +1,95 @@ +package utils + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/weaveworks/weave-gitops/pkg/logger" + "sigs.k8s.io/controller-runtime/pkg/client" + k8s_client "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/cli-utils/pkg/kstatus/polling" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +// StatusChecker is a wrapper around the StatusPoller +// that provides a way to poll the status of a set of resources +type StatusChecker struct { + pollInterval time.Duration + timeout time.Duration + client client.Client + statusPoller *polling.StatusPoller + logger logger.Logger +} + +// NewStatusChecker returns a new StatusChecker that will use the provided client to poll the status of the resources +func NewStatusChecker(client k8s_client.Client, pollInterval time.Duration, timeout time.Duration, log logger.Logger) (*StatusChecker, error) { + + return &StatusChecker{ + pollInterval: pollInterval, + timeout: timeout, + client: client, + statusPoller: polling.NewStatusPoller(client, client.RESTMapper(), polling.Options{}), + logger: log, + }, nil +} + +// Assess will poll the status of the provided resources until all resources have reached the desired status +func (sc *StatusChecker) Assess(identifiers ...object.ObjMetadata) error { + ctx, cancel := context.WithTimeout(context.Background(), sc.timeout) + defer cancel() + + opts := polling.PollOptions{PollInterval: sc.pollInterval} + eventsChan := sc.statusPoller.Poll(ctx, identifiers, opts) + + coll := collector.NewResourceStatusCollector(identifiers) + done := coll.ListenWithObserver(eventsChan, desiredStatusNotifierFunc(cancel, status.CurrentStatus)) + + <-done + + // we use sorted identifiers to loop over the resource statuses because a Go's map is unordered. + // sorting identifiers by object's name makes sure that the logs look stable for every run + sort.SliceStable(identifiers, func(i, j int) bool { + return strings.Compare(identifiers[i].Name, identifiers[j].Name) < 0 + }) + for _, id := range identifiers { + rs := coll.ResourceStatuses[id] + switch rs.Status { + case status.CurrentStatus: + sc.logger.Successf("%s: %s ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)) + case status.NotFoundStatus: + sc.logger.Failuref("%s: %s not found", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)) + default: + sc.logger.Failuref("%s: %s not ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)) + } + } + + if coll.Error != nil || ctx.Err() == context.DeadlineExceeded { + return fmt.Errorf("timed out waiting for condition") + } + return nil +} + +// desiredStatusNotifierFunc returns an Observer function for the +// ResourceStatusCollector that will cancel the context (using the cancelFunc) +// when all resources have reached the desired status. +func desiredStatusNotifierFunc(cancelFunc context.CancelFunc, + desired status.Status) collector.ObserverFunc { + return func(rsc *collector.ResourceStatusCollector, _ event.Event) { + var rss []*event.ResourceStatus + for _, rs := range rsc.ResourceStatuses { + rss = append(rss, rs) + } + aggStatus := aggregator.AggregateStatus(rss, desired) + if aggStatus == desired { + cancelFunc() + } + } +}