Skip to content

Commit

Permalink
Added SHC functions
Browse files Browse the repository at this point in the history
  • Loading branch information
tgarg-splunk committed Jul 19, 2023
1 parent b83b304 commit fe0555b
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 4 deletions.
1 change: 1 addition & 0 deletions .github/workflows/helm-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: Helm Test WorkFlow
on:
push:
branches:
- cspl-2344-shc
- develop
- main
jobs:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/int-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: Integration Test WorkFlow
on:
push:
branches:
- cspl-2344-shc
- develop
- main
- feature**
Expand Down
2 changes: 1 addition & 1 deletion pkg/splunk/enterprise/clustermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ func TestAppFrameworkApplyClusterManagerShouldNotFail(t *testing.T) {
}
}

func TestApplyCLusterManagerDeletion(t *testing.T) {
func TestApplyClusterManagerDeletion(t *testing.T) {
ctx := context.TODO()
cm := enterpriseApi.ClusterManager{
ObjectMeta: metav1.ObjectMeta{
Expand Down
6 changes: 6 additions & 0 deletions pkg/splunk/enterprise/monitoringconsole.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ func ApplyMonitoringConsole(ctx context.Context, client splcommon.ControllerClie
if cr.Status.Phase == enterpriseApi.PhaseReady {
finalResult := handleAppFrameworkActivity(ctx, client, cr, &cr.Status.AppContext, &cr.Spec.AppFrameworkConfig)
result = *finalResult

// trigger SearchHeadCluster reconcile by changing the splunk/image-tag annotation
err = changeSearchHeadAnnotations(ctx, client, cr)
if err != nil {
return result, err
}
}
// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
// Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
Expand Down
124 changes: 122 additions & 2 deletions pkg/splunk/enterprise/searchheadcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
splutil "github.com/splunk/splunk-operator/pkg/splunk/util"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
"sigs.k8s.io/controller-runtime/pkg/client"
rclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -160,6 +162,11 @@ func ApplySearchHeadCluster(ctx context.Context, client splcommon.ControllerClie
return result, err
}

continueReconcile, err := isSearchHeadReadyForUpgrade(ctx, client, cr)
if err != nil || !continueReconcile {
return result, err
}

deployerManager := splctrl.DefaultStatefulSetPodManager{}
phase, err := deployerManager.Update(ctx, client, statefulSet, 1)
if err != nil {
Expand All @@ -179,7 +186,7 @@ func ApplySearchHeadCluster(ctx context.Context, client splcommon.ControllerClie
return result, err
}

mgr := newSerachHeadClusterPodManager(client, scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient)
mgr := newSearchHeadClusterPodManager(client, scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient)
phase, err = mgr.Update(ctx, client, statefulSet, cr.Spec.Replicas)
if err != nil {
return result, err
Expand Down Expand Up @@ -247,7 +254,7 @@ type searchHeadClusterPodManager struct {
}

// newSerachHeadClusterPodManager function to create pod manager this is added to write unit test case
var newSerachHeadClusterPodManager = func(client splcommon.ControllerClient, log logr.Logger, cr *enterpriseApi.SearchHeadCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) searchHeadClusterPodManager {
var newSearchHeadClusterPodManager = func(client splcommon.ControllerClient, log logr.Logger, cr *enterpriseApi.SearchHeadCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) searchHeadClusterPodManager {
return searchHeadClusterPodManager{
log: log,
cr: cr,
Expand Down Expand Up @@ -667,3 +674,116 @@ func getSearchHeadClusterList(ctx context.Context, c splcommon.ControllerClient,

return objectList, nil
}

// isSearchHeadReadyForUpgrade checks if SearchHeadCluster can be upgraded if a version upgrade is in-progress
// No-operation otherwise; returns bool, err accordingly
func isSearchHeadReadyForUpgrade(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.SearchHeadCluster) (bool, error) {
reqLogger := log.FromContext(ctx)
scopedLog := reqLogger.WithName("isSearchHeadReadyForUpgrade").WithValues("name", cr.GetName(), "namespace", cr.GetNamespace())
eventPublisher, _ := newK8EventPublisher(c, cr)

// check if a MonitoringConsole is attached to the instance
monitoringConsoleRef := cr.Spec.MonitoringConsoleRef
if monitoringConsoleRef.Name == "" {
return true, nil
}

namespacedName := types.NamespacedName{
Namespace: cr.GetNamespace(),
Name: GetSplunkStatefulsetName(SplunkSearchHead, cr.GetName()),
}

// check if the stateful set is created at this instance
statefulSet := &appsv1.StatefulSet{}
err := c.Get(ctx, namespacedName, statefulSet)
if err != nil && k8serrors.IsNotFound(err) {
return true, nil
}

namespacedName = types.NamespacedName{Namespace: cr.GetNamespace(), Name: monitoringConsoleRef.Name}
monitoringConsole := &enterpriseApi.MonitoringConsole{}

// get the monitoring console referred in search head cluster
err = c.Get(ctx, namespacedName, monitoringConsole)
if err != nil {
if k8serrors.IsNotFound(err) {
return true, nil
}
eventPublisher.Warning(ctx, "isSearchHeadReadyForUpgrade", fmt.Sprintf("Could not find the Monitoring Console. Reason %v", err))
scopedLog.Error(err, "Unable to get Monitoring Console")
return false, err
}

mcImage, err := getCurrentImage(ctx, c, monitoringConsole, SplunkMonitoringConsole)
if err != nil {
eventPublisher.Warning(ctx, "isSearchHeadReadyForUpgrade", fmt.Sprintf("Could not get the Monitoring Console Image. Reason %v", err))
scopedLog.Error(err, "Unable to get Monitoring Console current image")
return false, err
}

shcImage, err := getCurrentImage(ctx, c, cr, SplunkSearchHead)
if err != nil {
eventPublisher.Warning(ctx, "isSearchHeadReadyForUpgrade", fmt.Sprintf("Could not get the Search Head Image. Reason %v", err))
scopedLog.Error(err, "Unable to get Search Head current image")
return false, err
}

// check if an image upgrade is happening and whether the SearchHeadCluster is ready for the upgrade
if (cr.Spec.Image != shcImage) && (monitoringConsole.Status.Phase != enterpriseApi.PhaseReady || mcImage != cr.Spec.Image) {
return false, nil
}

return true, nil
}

// changeSearchHeadAnnotations updates the splunk/image-tag field of the SearchHeadCluster annotations to trigger the reconcile loop
// on update, and returns error if something is wrong.
func changeSearchHeadAnnotations(ctx context.Context, client splcommon.ControllerClient, cr *enterpriseApi.MonitoringConsole) error {
reqLogger := log.FromContext(ctx)
scopedLog := reqLogger.WithName("changeSearchHeadAnnotations").WithValues("name", cr.GetName(), "namespace", cr.GetNamespace())
eventPublisher, _ := newK8EventPublisher(client, cr)

searchHeadClusterInstance := enterpriseApi.SearchHeadCluster{}

// List out all the SearchHeadCluster instances in the namespace
opts := []rclient.ListOption{
rclient.InNamespace(cr.GetNamespace()),
}
objectList := enterpriseApi.SearchHeadClusterList{}
err := client.List(ctx, &objectList, opts...)
if err != nil {
if err.Error() == "NotFound" {
return nil
}
return err
}
if len(objectList.Items) == 0 {
return nil
}

// check if instance has the required MonitoringConsoleRef
for _, shc := range objectList.Items {
if shc.Spec.MonitoringConsoleRef.Name == cr.GetName() {
searchHeadClusterInstance = shc
}
}
if len(searchHeadClusterInstance.GetName()) == 0 {
return nil
}

image, err := getCurrentImage(ctx, client, cr, SplunkMonitoringConsole)
if err != nil {
eventPublisher.Warning(ctx, "changeSearchHeadAnnotations", fmt.Sprintf("Could not get the MonitoringConsole Image. Reason %v", err))
scopedLog.Error(err, "Get MonitoringConsole Image failed with", "error", err)
return err
}

err = changeAnnotations(ctx, client, image, &searchHeadClusterInstance)
if err != nil {
eventPublisher.Warning(ctx, "changeSearchHeadAnnotations", fmt.Sprintf("Could not update annotations. Reason %v", err))
scopedLog.Error(err, "SearchHeadCluster types update after changing annotations failed with", "error", err)
return err
}

return nil
}
170 changes: 169 additions & 1 deletion pkg/splunk/enterprise/searchheadcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ func TestSearchHeadClusterWithReadyState(t *testing.T) {
}

// mock new search pod manager
newSerachHeadClusterPodManager = func(client splcommon.ControllerClient, log logr.Logger, cr *enterpriseApi.SearchHeadCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) searchHeadClusterPodManager {
newSearchHeadClusterPodManager = func(client splcommon.ControllerClient, log logr.Logger, cr *enterpriseApi.SearchHeadCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) searchHeadClusterPodManager {
return searchHeadClusterPodManager{
log: log,
cr: cr,
Expand Down Expand Up @@ -1873,3 +1873,171 @@ func TestSearchHeadClusterWithReadyState(t *testing.T) {
debug.PrintStack()
}
}

func TestIsSearchHeadReadyForUpgrade(t *testing.T) {
ctx := context.TODO()

builder := fake.NewClientBuilder()
client := builder.Build()
utilruntime.Must(enterpriseApi.AddToScheme(clientgoscheme.Scheme))

// Create License Manager
mc := enterpriseApi.MonitoringConsole{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
Spec: enterpriseApi.MonitoringConsoleSpec{
CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{
Spec: enterpriseApi.Spec{
ImagePullPolicy: "Always",
Image: "splunk/splunk:latest",
},
Volumes: []corev1.Volume{},
},
},
}

err := client.Create(ctx, &mc)
_, err = ApplyMonitoringConsole(ctx, client, &mc)
if err != nil {
t.Errorf("applyMonitoringConsole should not have returned error; err=%v", err)
}
mc.Status.Phase = enterpriseApi.PhaseReady
err = client.Status().Update(ctx, &mc)
if err != nil {
t.Errorf("Unexpected status update %v", err)
debug.PrintStack()
}

// Create Search Head Cluster
shc := enterpriseApi.SearchHeadCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
Spec: enterpriseApi.SearchHeadClusterSpec{
CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{
Spec: enterpriseApi.Spec{
ImagePullPolicy: "Always",
Image: "splunk/splunk:latest",
},
Volumes: []corev1.Volume{},
MonitoringConsoleRef: corev1.ObjectReference{
Name: "test",
},
},
Replicas: int32(3),
},
}

err = client.Create(ctx, &shc)
_, err = ApplySearchHeadCluster(ctx, client, &shc)
if err != nil {
t.Errorf("applySearchHeadCluster should not have returned error; err=%v", err)
}

mc.Spec.Image = "splunk2"
shc.Spec.Image = "splunk2"
_, err = ApplyMonitoringConsole(ctx, client, &mc)

searchHeadCluster := &enterpriseApi.SearchHeadCluster{}
namespacedName := types.NamespacedName{
Name: shc.Name,
Namespace: shc.Namespace,
}
err = client.Get(ctx, namespacedName, searchHeadCluster)
if err != nil {
t.Errorf("Get Search Head Cluster should not have returned error=%v", err)
}

check, err := isSearchHeadReadyForUpgrade(ctx, client, searchHeadCluster)

if err != nil {
t.Errorf("Unexpected upgradeScenario error %v", err)
}

if !check {
t.Errorf("isSearchHeadReadyForUpgrade: SHC should be ready for upgrade")
}
}

func TestChangeSearchHeadAnnotations(t *testing.T) {
ctx := context.TODO()

// define MC and SHC
mc := &enterpriseApi.MonitoringConsole{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
Spec: enterpriseApi.MonitoringConsoleSpec{
CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{
Spec: enterpriseApi.Spec{
ImagePullPolicy: "Always",
},
Volumes: []corev1.Volume{},
},
},
}

shc := &enterpriseApi.SearchHeadCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
Spec: enterpriseApi.SearchHeadClusterSpec{
CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{
Spec: enterpriseApi.Spec{
ImagePullPolicy: "Always",
},
Volumes: []corev1.Volume{},
MonitoringConsoleRef: corev1.ObjectReference{
Name: "test",
},
},
},
}
mc.Spec.Image = "splunk/splunk:latest"

builder := fake.NewClientBuilder()
client := builder.Build()
utilruntime.Must(enterpriseApi.AddToScheme(clientgoscheme.Scheme))

// Create the instances
client.Create(ctx, mc)
_, err := ApplyMonitoringConsole(ctx, client, mc)
if err != nil {
t.Errorf("applyMonitoringConsole should not have returned error; err=%v", err)
}
mc.Status.Phase = enterpriseApi.PhaseReady
err = client.Status().Update(ctx, mc)
if err != nil {
t.Errorf("Unexpected update pod %v", err)
debug.PrintStack()
}
client.Create(ctx, shc)
_, err = ApplySearchHeadCluster(ctx, client, shc)
if err != nil {
t.Errorf("applySearchHeadCluster should not have returned error; err=%v", err)
}

err = changeSearchHeadAnnotations(ctx, client, mc)
if err != nil {
t.Errorf("changeSearchHeadAnnotations should not have returned error=%v", err)
}
searchHeadCluster := &enterpriseApi.SearchHeadCluster{}
namespacedName := types.NamespacedName{
Name: shc.Name,
Namespace: shc.Namespace,
}
err = client.Get(ctx, namespacedName, searchHeadCluster)
if err != nil {
t.Errorf("changeSearchHeadAnnotations should not have returned error=%v", err)
}

annotations := searchHeadCluster.GetAnnotations()
if annotations["splunk/image-tag"] != mc.Spec.Image {
t.Errorf("changeSearchHeadAnnotations should have set the splunk/image-tag annotation field to the current image")
}
}

0 comments on commit fe0555b

Please sign in to comment.