Skip to content

Commit

Permalink
Improve test coverage of workspace_resource_event_loop.go
Browse files Browse the repository at this point in the history
  • Loading branch information
chetan-rns committed Sep 20, 2023
1 parent ad82ba5 commit 5a17ce8
Show file tree
Hide file tree
Showing 2 changed files with 468 additions and 65 deletions.
166 changes: 101 additions & 65 deletions backend/eventloop/workspace_resource_event_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type workspaceResourceLoopMessageType string
const (
workspaceResourceLoopMessageType_processRepositoryCredential workspaceResourceLoopMessageType = "processRepositoryCredential"
workspaceResourceLoopMessageType_processManagedEnvironment workspaceResourceLoopMessageType = "processManagedEnvironment"

retry = true
noRetry = false
)

func (werl *workspaceResourceEventLoop) processRepositoryCredential(ctx context.Context, req ctrl.Request, apiNamespaceClient client.Client) {
Expand Down Expand Up @@ -81,14 +84,26 @@ func newWorkspaceResourceLoop(sharedResourceLoop *shared_resource_loop.SharedRes
inputChannel: make(chan workspaceResourceLoopMessage),
}

go internalWorkspaceResourceEventLoop(workspaceResourceEventLoop.inputChannel, sharedResourceLoop, workspaceEventLoopInputChannel)
go internalWorkspaceResourceEventLoop(workspaceResourceEventLoop.inputChannel, sharedResourceLoop, workspaceEventLoopInputChannel, shared_resource_loop.DefaultK8sClientFactory{})

return workspaceResourceEventLoop
}

func newWorkspaceResourceLoopWithFactory(sharedResourceLoop *shared_resource_loop.SharedResourceEventLoop,
workspaceEventLoopInputChannel chan workspaceEventLoopMessage, k8sClientFactory shared_resource_loop.SRLK8sClientFactory) *workspaceResourceEventLoop {

workspaceResourceEventLoop := &workspaceResourceEventLoop{
inputChannel: make(chan workspaceResourceLoopMessage),
}

go internalWorkspaceResourceEventLoop(workspaceResourceEventLoop.inputChannel, sharedResourceLoop, workspaceEventLoopInputChannel, k8sClientFactory)

return workspaceResourceEventLoop
}

func internalWorkspaceResourceEventLoop(inputChan chan workspaceResourceLoopMessage,
sharedResourceLoop *shared_resource_loop.SharedResourceEventLoop,
workspaceEventLoopInputChannel chan workspaceEventLoopMessage) {
workspaceEventLoopInputChannel chan workspaceEventLoopMessage, k8sClientFactory shared_resource_loop.SRLK8sClientFactory) {

ctx := context.Background()
l := log.FromContext(ctx).
Expand Down Expand Up @@ -142,6 +157,7 @@ func internalWorkspaceResourceEventLoop(inputChan chan workspaceResourceLoopMess
log: l,
sharedResourceLoop: sharedResourceLoop,
workspaceEventLoopInputChannel: workspaceEventLoopInputChannel,
k8sClientFactory: k8sClientFactory,
}

taskRetryLoop.AddTaskIfNotPresent(mapKey, task, sharedutil.ExponentialBackoff{Factor: 2, Min: time.Millisecond * 200, Max: time.Second * 10, Jitter: true})
Expand All @@ -154,12 +170,13 @@ type workspaceResourceEventTask struct {
log logr.Logger
sharedResourceLoop *shared_resource_loop.SharedResourceEventLoop
workspaceEventLoopInputChannel chan workspaceEventLoopMessage
k8sClientFactory shared_resource_loop.SRLK8sClientFactory
}

// Returns true if the task should be retried, false otherwise, plus an error
func (wert *workspaceResourceEventTask) PerformTask(taskContext context.Context) (bool, error) {

retry, err := internalProcessWorkspaceResourceMessage(taskContext, wert.msg, wert.sharedResourceLoop, wert.workspaceEventLoopInputChannel, wert.dbQueries, wert.log)
retry, err := internalProcessWorkspaceResourceMessage(taskContext, wert.msg, wert.sharedResourceLoop, wert.workspaceEventLoopInputChannel, wert.dbQueries, wert.k8sClientFactory, wert.log)

// If we recognize this error is a connection error due to the user providing us invalid credentials, don't bother to log it.
if isManagedEnvironmentConnectionUserError(err, wert.log) {
Expand All @@ -171,8 +188,7 @@ func (wert *workspaceResourceEventTask) PerformTask(taskContext context.Context)
// Returns true if the task should be retried, false otherwise, plus an error
func internalProcessWorkspaceResourceMessage(ctx context.Context, msg workspaceResourceLoopMessage,
sharedResourceLoop *shared_resource_loop.SharedResourceEventLoop, workspaceEventLoopInputChannel chan workspaceEventLoopMessage,
dbQueries db.DatabaseQueries, log logr.Logger) (bool, error) {
const retry, noRetry = true, false
dbQueries db.DatabaseQueries, k8sClientFactory shared_resource_loop.SRLK8sClientFactory, log logr.Logger) (bool, error) {

log.V(logutil.LogLevel_Debug).Info("processWorkspaceResource received message: " + string(msg.messageType))

Expand All @@ -182,89 +198,109 @@ func internalProcessWorkspaceResourceMessage(ctx context.Context, msg workspaceR

// When the event is related to 'GitOpsDeploymentRepositoryCredential' resource, we need to process the event
if msg.messageType == workspaceResourceLoopMessageType_processRepositoryCredential {
req, ok := (msg.payload).(ctrl.Request)
if !ok {
return noRetry, fmt.Errorf("invalid payload in processWorkspaceResourceMessage")
shouldRetry, err := handleResourceLoopRepositoryCredential(ctx, msg, sharedResourceLoop, k8sClientFactory, log)
if err != nil {
return shouldRetry, fmt.Errorf("failed to process workspace resource message: %v", err)
}

// Retrieve the namespace that the repository credential is contained within
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: req.Namespace,
},
return noRetry, nil
} else if msg.messageType == workspaceResourceLoopMessageType_processManagedEnvironment {

shouldRetry, err := handleResourceLoopManagedEnvironment(ctx, msg, sharedResourceLoop, k8sClientFactory, workspaceEventLoopInputChannel, log)
if err != nil {
return shouldRetry, fmt.Errorf("failed to process workspace resource message: %v", err)
}
if err := msg.apiNamespaceClient.Get(ctx, client.ObjectKeyFromObject(namespace), namespace); err != nil {

if !apierr.IsNotFound(err) {
return retry, fmt.Errorf("unexpected error in retrieving repo credentials: %v", err)
}
return noRetry, nil

log.V(logutil.LogLevel_Warn).Info("Received a message for a repository credential in a namepace that doesn't exist", "namespace", namespace)
return noRetry, nil
}
}
return noRetry, fmt.Errorf("SEVERE: unrecognized sharedResourceLoopMessageType: %s", msg.messageType)

// Request that the shared resource loop handle the GitOpsDeploymentRepositoryCredential resource:
// - If the GitOpsDeploymentRepositoryCredential doesn't exist, delete the corresponding database table
// - If the GitOpsDeploymentRepositoryCredential does exist, but not in the DB, then create a RepositoryCredential DB entry
// - If the GitOpsDeploymentRepositoryCredential does exist, and also in the DB, then compare and change a RepositoryCredential DB entry
// Then, in all 3 cases, create an Operation to update the cluster-agent
_, err := sharedResourceLoop.ReconcileRepositoryCredential(ctx, msg.apiNamespaceClient, *namespace, req.Name, shared_resource_loop.DefaultK8sClientFactory{})
}

if err != nil {
return retry, fmt.Errorf("unable to reconcile repository credential. Error: %v", err)
func handleResourceLoopRepositoryCredential(ctx context.Context, msg workspaceResourceLoopMessage, sharedResourceLoop *shared_resource_loop.SharedResourceEventLoop, k8sClientFactory shared_resource_loop.SRLK8sClientFactory, log logr.Logger) (bool, error) {

req, ok := (msg.payload).(ctrl.Request)
if !ok {
return noRetry, fmt.Errorf("invalid RepositoryCredential payload in processWorkspaceResourceMessage")
}

// Retrieve the namespace that the repository credential is contained within
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: req.Namespace,
},
}
if err := msg.apiNamespaceClient.Get(ctx, client.ObjectKeyFromObject(namespace), namespace); err != nil {

if !apierr.IsNotFound(err) {
return retry, fmt.Errorf("unexpected error in retrieving repo credentials: %v", err)
}

log.V(logutil.LogLevel_Warn).Info("Received a message for a repository credential in a namepace that doesn't exist", "namespace", namespace)
return noRetry, nil
} else if msg.messageType == workspaceResourceLoopMessageType_processManagedEnvironment {
}

evlMessage, ok := (msg.payload).(eventlooptypes.EventLoopMessage)
if !ok {
return noRetry, fmt.Errorf("invalid payload in processWorkspaceResourceMessage")
}
// Request that the shared resource loop handle the GitOpsDeploymentRepositoryCredential resource:
// - If the GitOpsDeploymentRepositoryCredential doesn't exist, delete the corresponding database table
// - If the GitOpsDeploymentRepositoryCredential does exist, but not in the DB, then create a RepositoryCredential DB entry
// - If the GitOpsDeploymentRepositoryCredential does exist, and also in the DB, then compare and change a RepositoryCredential DB entry
// Then, in all 3 cases, create an Operation to update the cluster-agent
_, err := sharedResourceLoop.ReconcileRepositoryCredential(ctx, msg.apiNamespaceClient, *namespace, req.Name, k8sClientFactory)

if evlMessage.Event == nil { // Sanity test the message
log.Error(nil, "SEVERE: process managed env event is nil")
}
if err != nil {
return retry, fmt.Errorf("unable to reconcile repository credential. Error: %v", err)
}

req := evlMessage.Event.Request
return noRetry, nil
}

// Retrieve the namespace that the managed environment is contained within
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: req.Namespace,
},
}
if err := msg.apiNamespaceClient.Get(ctx, client.ObjectKeyFromObject(namespace), namespace); err != nil {
func handleResourceLoopManagedEnvironment(ctx context.Context, msg workspaceResourceLoopMessage, sharedResourceLoop *shared_resource_loop.SharedResourceEventLoop, k8sClientFactory shared_resource_loop.SRLK8sClientFactory, workspaceEventLoopInputChannel chan workspaceEventLoopMessage, log logr.Logger) (bool, error) {

if !apierr.IsNotFound(err) {
return retry, fmt.Errorf("unexpected error in retrieving namespace of managed env CR: %v", err)
}
evlMessage, ok := (msg.payload).(eventlooptypes.EventLoopMessage)
if !ok {
return noRetry, fmt.Errorf("invalid ManagedEnvironment payload in processWorkspaceResourceMessage")
}

log.V(logutil.LogLevel_Warn).Info("Received a message for a managed end CR in a namespace that doesn't exist", "namespace", namespace)
return noRetry, nil
}
if evlMessage.Event == nil { // Sanity test the message
log.Error(nil, "SEVERE: process managed env event is nil")
}

// Ask the shared resource loop to ensure the managed environment is reconciled
_, err := sharedResourceLoop.ReconcileSharedManagedEnv(ctx, msg.apiNamespaceClient, *namespace, req.Name, req.Namespace,
false, shared_resource_loop.DefaultK8sClientFactory{}, log)
if err != nil {
return retry, fmt.Errorf("unable to reconcile shared managed env: %v", err)
}
req := evlMessage.Event.Request

// Once we finish processing the managed environment, send it back to the workspace event loop, so it can be passed to GitOpsDeployments.
// - Send it on another go routine to keep from blocking this one
go func() {
workspaceEventLoopInputChannel <- workspaceEventLoopMessage{
messageType: workspaceEventLoopMessageType_managedEnvProcessed_Event,
payload: evlMessage,
}
}()
// Retrieve the namespace that the managed environment is contained within
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: req.Namespace,
},
}
if err := msg.apiNamespaceClient.Get(ctx, client.ObjectKeyFromObject(namespace), namespace); err != nil {

if !apierr.IsNotFound(err) {
return retry, fmt.Errorf("unexpected error in retrieving namespace of managed env CR: %v", err)
}

log.V(logutil.LogLevel_Warn).Info("Received a message for a managed end CR in a namespace that doesn't exist", "namespace", namespace)
return noRetry, nil
}

// Ask the shared resource loop to ensure the managed environment is reconciled
_, err := sharedResourceLoop.ReconcileSharedManagedEnv(ctx, msg.apiNamespaceClient, *namespace, req.Name, req.Namespace,
false, k8sClientFactory, log)
if err != nil {
return retry, fmt.Errorf("unable to reconcile shared managed env: %v", err)
}
return noRetry, fmt.Errorf("SEVERE: unrecognized sharedResourceLoopMessageType: %s " + string(msg.messageType))

// Once we finish processing the managed environment, send it back to the workspace event loop, so it can be passed to GitOpsDeployments.
// - Send it on another go routine to keep from blocking this one
go func() {
workspaceEventLoopInputChannel <- workspaceEventLoopMessage{
messageType: workspaceEventLoopMessageType_managedEnvProcessed_Event,
payload: evlMessage,
}
}()

return noRetry, nil
}

// isManagedEnvironmentConnectionUserError returns true if the error is likely an error in the cluster credentials provided by the user
Expand Down
Loading

0 comments on commit 5a17ce8

Please sign in to comment.