Skip to content

Commit

Permalink
Added multisite func
Browse files Browse the repository at this point in the history
  • Loading branch information
tgarg-splunk committed Jul 24, 2023
1 parent b83b304 commit 207d9b6
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/splunk/enterprise/clustermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func isClusterManagerReadyForUpgrade(ctx context.Context, c splcommon.Controller
return false, err
}

lmImage, err := getCurrentImage(ctx, c, cr, SplunkLicenseManager)
lmImage, err := getCurrentImage(ctx, c, licenseManager, SplunkLicenseManager)
if err != nil {
eventPublisher.Warning(ctx, "isClusterManagerReadyForUpgrade", fmt.Sprintf("Could not get the License Manager Image. Reason %v", err))
scopedLog.Error(err, "Unable to get licenseManager current image")
Expand Down
145 changes: 145 additions & 0 deletions pkg/splunk/enterprise/indexercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
return result, err
}
} else {
// check if the IndexerCluster is ready for version upgrade
continueReconcile, err := mgr.isIndexerClusterReadyForUpgrade(ctx, client, cr)
if err != nil || !continueReconcile {
return result, err
}
// Delete the statefulset and recreate new one
err = client.Delete(ctx, statefulSet)
if err != nil {
Expand Down Expand Up @@ -1069,3 +1074,143 @@ func RetrieveCMSpec(ctx context.Context, client splcommon.ControllerClient, cr *

return "", nil
}

func getIndexerClusterSortedSiteList(ctx context.Context, c splcommon.ControllerClient, ref corev1.ObjectReference, indexerList enterpriseApi.IndexerClusterList) (enterpriseApi.IndexerClusterList, error) {

namespaceList := enterpriseApi.IndexerClusterList{}

for _, v := range indexerList.Items {
if v.Spec.ClusterManagerRef == ref {
namespaceList.Items = append(namespaceList.Items, v)
}
}

listLen := len(namespaceList.Items)

for i := 0; i < listLen-1; i++ {
for j := 0; j < listLen-i-1; j++ {
curr := &namespaceList.Items[j]
post := &namespaceList.Items[j+1]
if getSiteName(ctx, c, curr) > getSiteName(ctx, c, post) {
namespaceList.Items[j], namespaceList.Items[j+1] = namespaceList.Items[j+1], namespaceList.Items[j]
}
}
}

return namespaceList, nil

}

func getSiteName(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IndexerCluster) string {
defaults := cr.Spec.Defaults
pattern := `site:\s+(\w+)`

// Compile the regular expression pattern
re := regexp.MustCompile(pattern)

// Find the first match in the input string
match := re.FindStringSubmatch(defaults)

var extractedValue string
if len(match) > 1 {
// Extracted value is stored in the second element of the match array
extractedValue := match[1]
return extractedValue
}

return extractedValue
}

// isIndexerClusterReadyForUpgrade checks if IndexerCluster can be upgraded if a version upgrade is in-progress
// No-operation otherwise; returns bool, err accordingly
func (mgr *indexerClusterPodManager) isIndexerClusterReadyForUpgrade(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IndexerCluster) (bool, error) {
reqLogger := log.FromContext(ctx)
scopedLog := reqLogger.WithName("isIndexerClusterReadyForUpgrade").WithValues("name", cr.GetName(), "namespace", cr.GetNamespace())
eventPublisher, _ := newK8EventPublisher(c, cr)

// get the clusterManagerRef attached to the instance
clusterManagerRef := cr.Spec.ClusterManagerRef

cm := mgr.getClusterManagerClient(ctx)
clusterInfo, err := cm.GetClusterInfo(false)
if err != nil {
return false, fmt.Errorf("could not get cluster info from cluster manager")
}
if clusterInfo.MultiSite == "true" {
opts := []rclient.ListOption{
rclient.InNamespace(cr.GetNamespace()),
}
indexerList, err := getIndexerClusterList(ctx, c, cr, opts)
if err != nil {
return false, err
}
sortedList, err := getIndexerClusterSortedSiteList(ctx, c, cr.Spec.ClusterManagerRef, indexerList)

preIdx := enterpriseApi.IndexerCluster{}

for i, v := range sortedList.Items {
if &v == cr {
if i > 0 {
preIdx = sortedList.Items[i-1]
}
break

}
}
if len(preIdx.Name) != 0 {
image, _ := getCurrentImage(ctx, c, &preIdx, SplunkIndexer)
if preIdx.Status.Phase != enterpriseApi.PhaseReady || image != cr.Spec.Image {
return false, nil
}
}

}

// check if a search head cluster exists with the same ClusterManager instance attached
searchHeadClusterInstance := enterpriseApi.SearchHeadCluster{}
opts := []rclient.ListOption{
rclient.InNamespace(cr.GetNamespace()),
}
searchHeadList, err := getSearchHeadClusterList(ctx, c, cr, opts)
if err != nil {
if err.Error() == "NotFound" {
return true, nil
}
return false, err
}
if len(searchHeadList.Items) == 0 {
return true, nil
}

// check if instance has the required ClusterManagerRef
for _, shc := range searchHeadList.Items {
if shc.Spec.ClusterManagerRef.Name == clusterManagerRef.Name {
searchHeadClusterInstance = shc
break
}
}
if len(searchHeadClusterInstance.GetName()) == 0 {
return true, nil
}

shcImage, err := getCurrentImage(ctx, c, &searchHeadClusterInstance, SplunkSearchHead)
if err != nil {
eventPublisher.Warning(ctx, "isIndexerClusterReadyForUpgrade", fmt.Sprintf("Could not get the Search Head Cluster Image. Reason %v", err))
scopedLog.Error(err, "Unable to get SearchHeadCluster current image")
return false, err
}

idxImage, err := getCurrentImage(ctx, c, cr, SplunkIndexer)
if err != nil {
eventPublisher.Warning(ctx, "isIndexerClusterReadyForUpgrade", fmt.Sprintf("Could not get the Indexer Cluster Image. Reason %v", err))
scopedLog.Error(err, "Unable to get IndexerCluster current image")
return false, err
}

// check if an image upgrade is happening and whether SHC has finished updating yet, return false to stop
// further reconcile operations on IDX until SHC is ready
if (cr.Spec.Image != idxImage) && (searchHeadClusterInstance.Status.Phase != enterpriseApi.PhaseReady || shcImage != cr.Spec.Image) {
return false, nil
}
return true, nil
}
66 changes: 66 additions & 0 deletions pkg/splunk/enterprise/indexercluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1875,3 +1875,69 @@ func TestIndexerClusterWithReadyState(t *testing.T) {
debug.PrintStack()
}
}

func TestGetIndexerClusterSortedSiteList(t *testing.T) {

ctx := context.TODO()

builder := fake.NewClientBuilder()
client := builder.Build()
utilruntime.Must(enterpriseApi.AddToScheme(clientgoscheme.Scheme))

namespaceList := enterpriseApi.IndexerClusterList{}

siteCount := 3
name := "test"
clusterManagerRef := "test"
siteName := []string{"sitea", "siteb", "sitec"}

for site := 1; site <= siteCount; site++ {
siteName := fmt.Sprintf("site%c", rune('a'+3-site))
siteDefaults := fmt.Sprintf(`splunk:
multisite_master: splunk-%s-%s-service
site: %s
`, name, "cluster-manager", siteName)

idx := enterpriseApi.IndexerCluster{
TypeMeta: metav1.TypeMeta{
Kind: "IndexerCluster",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "test",
Finalizers: []string{"enterprise.splunk.com/delete-pvc"},
},

Spec: enterpriseApi.IndexerClusterSpec{
CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{
Volumes: []corev1.Volume{},
Spec: enterpriseApi.Spec{
ImagePullPolicy: "Always",
},
ClusterManagerRef: corev1.ObjectReference{
Name: clusterManagerRef,
},
Defaults: siteDefaults,
},
Replicas: int32(1),
},
}

namespaceList.Items = append(namespaceList.Items, idx)

}

l, err := getIndexerClusterSortedSiteList(ctx, client, namespaceList.Items[0].Spec.ClusterManagerRef, namespaceList)
if err != nil {
t.Errorf("getIndexerClusterSortedSiteList should not have returned error; err=%v", err)
}

for i, v := range l.Items {
if getSiteName(ctx, client, &v) != siteName[i] {
t.Errorf("Unexpected error while sorting indexer clusters using siteName %v", err)
debug.PrintStack()

}
}

}

0 comments on commit 207d9b6

Please sign in to comment.