Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add components verification step to bootstraping WGE #3675

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions pkg/bootstrap/steps/install_wge.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/weaveworks/weave-gitops-enterprise/pkg/bootstrap/utils"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/object"
)

const (
Expand All @@ -36,14 +38,20 @@ const (
gitopssetsHealthBindAddress = ":8081"
)

var Components = []string{"cluster-controller-manager",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the suggested components to be checked, do u think we should add more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that we should try to see whether the list could be dynamic over static

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the installwge step, the components are being defined as part of the helm chart and its dependencies

https://github.com/weaveworks/weave-gitops-enterprise/blob/main/charts/mccp/Chart.yaml

an strategy to resolve the components could be created out of it

"gitopssets-controller-manager",
"weave-gitops-enterprise-mccp-cluster-bootstrap-controller",
"weave-gitops-enterprise-mccp-cluster-service"}

// NewInstallWGEStep step to install Weave GitOps Enterprise
func NewInstallWGEStep() BootstrapStep {
inputs := []StepInput{}

return BootstrapStep{
Name: "Install Weave GitOps Enterprise",
Input: inputs,
Step: installWge,
Name: "Install Weave GitOps Enterprise",
Input: inputs,
Step: installWge,
Verify: verifyComponents,
}
}

Expand Down Expand Up @@ -235,3 +243,37 @@ func constructWGEhelmRelease(valuesFile valuesFile, chartVersion string) (string

return utils.CreateHelmReleaseYamlString(wgeHelmRelease)
}

func verifyComponents(output []StepOutput, c *Config) error {
c.Logger.Waitingf("waiting for components to be healthy")
err := reportComponentsHealth(c, Components, WGEDefaultNamespace, 5*time.Minute)
if err != nil {
return err
}
return nil
}

func reportComponentsHealth(c *Config, componentNames []string, namespace string, timeout time.Duration) error {
// Initialize the status checker
checker, err := utils.NewStatusChecker(c.KubernetesClient, 5*time.Second, timeout, c.Logger)
if err != nil {
return err
}

// Construct a list of resources to check
var identifiers []object.ObjMetadata
for _, name := range componentNames {
identifiers = append(identifiers, object.ObjMetadata{
Namespace: namespace,
Name: name,
GroupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"},
})
}

// Perform the health check
if err := checker.Assess(identifiers...); err != nil {
return err
}

return nil
}
2 changes: 2 additions & 0 deletions pkg/bootstrap/steps/install_wge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func TestInstallWge_Execute(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
step := NewInstallWGEStep()
// skip verify step
step.Verify = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess that missing testing here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to unit test this logic ... possible paths

a) mock the behaviour via interface
b) given this is based on kstatus, see how to unit test kstatus https://github.com/kubernetes-sigs/cli-utils/blob/master/pkg/kstatus/README.md

gotOutputs, err := step.Execute(&tt.config)
if tt.wantErr != "" {
if msg := err.Error(); msg != tt.wantErr {
Expand Down
17 changes: 13 additions & 4 deletions pkg/bootstrap/steps/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
// It is abstracted to have a generic way to handle them, so we could achieve easier
// extensibility, consistency and maintainability.
type BootstrapStep struct {
Name string
Input []StepInput
Step func(input []StepInput, c *Config) ([]StepOutput, error)
Stdin io.ReadCloser
Name string
Input []StepInput
Step func(input []StepInput, c *Config) ([]StepOutput, error)
Verify func(output []StepOutput, c *Config) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add the internal documentation on guidance to use it

Stdin io.ReadCloser
}

// StepInput represents an input a step requires to execute it. for example user needs to introduce a string or a password.
Expand Down Expand Up @@ -76,6 +77,14 @@ func (s BootstrapStep) Execute(c *Config) ([]StepOutput, error) {
if err != nil {
return []StepOutput{}, fmt.Errorf("cannot process output '%s': %v", s.Name, err)
}

//verify the result of the step if the function is defined in the step
if s.Verify != nil {
if err := s.Verify(outputs, c); err != nil {
return []StepOutput{}, fmt.Errorf("cannot verify '%s': %v", s.Name, err)
}
}

return outputs, nil
}

Expand Down
91 changes: 91 additions & 0 deletions pkg/bootstrap/utils/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package utils

import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/weaveworks/weave-gitops/pkg/logger"
"sigs.k8s.io/controller-runtime/pkg/client"
k8s_client "sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

type StatusChecker struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing comments as exported

pollInterval time.Duration
timeout time.Duration
client client.Client
statusPoller *polling.StatusPoller
logger logger.Logger
}

func NewStatusChecker(client k8s_client.Client, pollInterval time.Duration, timeout time.Duration, log logger.Logger) (*StatusChecker, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing comments


return &StatusChecker{
pollInterval: pollInterval,
timeout: timeout,
client: client,
statusPoller: polling.NewStatusPoller(client, client.RESTMapper(), polling.Options{}),
logger: log,
}, nil
}

func (sc *StatusChecker) Assess(identifiers ...object.ObjMetadata) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing comments as exported

ctx, cancel := context.WithTimeout(context.Background(), sc.timeout)
defer cancel()

opts := polling.PollOptions{PollInterval: sc.pollInterval}
eventsChan := sc.statusPoller.Poll(ctx, identifiers, opts)

coll := collector.NewResourceStatusCollector(identifiers)
done := coll.ListenWithObserver(eventsChan, desiredStatusNotifierFunc(cancel, status.CurrentStatus))

<-done

// we use sorted identifiers to loop over the resource statuses because a Go's map is unordered.
// sorting identifiers by object's name makes sure that the logs look stable for every run
sort.SliceStable(identifiers, func(i, j int) bool {
return strings.Compare(identifiers[i].Name, identifiers[j].Name) < 0
})
for _, id := range identifiers {
rs := coll.ResourceStatuses[id]
switch rs.Status {
case status.CurrentStatus:
sc.logger.Successf("%s: %s ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind))
case status.NotFoundStatus:
sc.logger.Failuref("%s: %s not found", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind))
default:
sc.logger.Failuref("%s: %s not ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind))
}
}

if coll.Error != nil || ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("timed out waiting for condition")
}
return nil
}

// desiredStatusNotifierFunc returns an Observer function for the
// ResourceStatusCollector that will cancel the context (using the cancelFunc)
// when all resources have reached the desired status.
func desiredStatusNotifierFunc(cancelFunc context.CancelFunc,
desired status.Status) collector.ObserverFunc {
return func(rsc *collector.ResourceStatusCollector, _ event.Event) {
var rss []*event.ResourceStatus
for _, rs := range rsc.ResourceStatuses {
rss = append(rss, rs)
}
aggStatus := aggregator.AggregateStatus(rss, desired)
if aggStatus == desired {
cancelFunc()
}
}
}