Skip to content

Commit

Permalink
Disable http client connection reuse to prevent memory leak (opensear…
Browse files Browse the repository at this point in the history
…ch-project#842)

The operator pod is suffering from memory leaks. After some analysis I
think I have narrowed it down to connections for the http client being
kept for reuse but never being used due to a new client being created in
every reconcile run.
This PR disables the connection keepalive/reuse and (at least in my
experiments) prevents the memory leak.

Fixes opensearch-project#700

- [x] Commits are signed per the DCO using --signoff
- [-] Unittest added for the new/changed functionality and all unit
tests are successful
- [-] Customer-visible features documented
- [x] No linter warnings (`make lint`)

If CRDs are changed:
- [-] CRD YAMLs updated (`make manifests`) and also copied into the helm
chart
- [-] Changes to CRDs documented

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and
signing off your commits, please check
[here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin).

Signed-off-by: Sebastian Woehrl <sebastian.woehrl@maibornwolff.de>
(cherry picked from commit 56c9c8f)
  • Loading branch information
swoehrl-mw committed Jul 2, 2024
1 parent bcdb0d2 commit 70dca28
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ spec:
dataStream:
description: The dataStream config that should be applied
properties:
timestampField:
timestamp_field:
description: TimestampField for dataStream
properties:
name:
Expand Down
4 changes: 4 additions & 0 deletions opensearch-operator/opensearch-gateway/services/os_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewOsClusterClient(clusterUrl string, username string, password string, opt
}
return &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
// These options are needed as otherwise connections would be kept and leak memory
// Connection reuse is not really possible due to each reconcile run being independent
DisableKeepAlives: true,
MaxIdleConns: 1,
}
}(),
Addresses: []string{clusterUrl},
Expand Down
33 changes: 11 additions & 22 deletions opensearch-operator/pkg/reconcilers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/builders"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/helpers"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/util"
"github.com/cisco-open/operator-tools/pkg/reconciler"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -25,6 +26,7 @@ type ScalerReconciler struct {
recorder record.EventRecorder
reconcilerContext *ReconcilerContext
instance *opsterv1.OpenSearchCluster
ReconcilerOptions
}

func NewScalerReconciler(
Expand All @@ -33,14 +35,17 @@ func NewScalerReconciler(
recorder record.EventRecorder,
reconcilerContext *ReconcilerContext,
instance *opsterv1.OpenSearchCluster,
opts ...reconciler.ResourceReconcilerOption,
opts ...ReconcilerOption,
) *ScalerReconciler {
options := ReconcilerOptions{}
options.apply(opts...)
return &ScalerReconciler{
client: k8s.NewK8sClient(client, ctx, append(opts, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler")))...),
client: k8s.NewK8sClient(client, ctx, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler"))),
ctx: ctx,
recorder: recorder,
reconcilerContext: reconcilerContext,
instance: instance,
ReconcilerOptions: options,
}
}

Expand Down Expand Up @@ -187,11 +192,7 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu
if !smartDecrease {
return false, err
}
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return true, err
}
clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "WARN", "failed to remove node exclude", "Group-%s . failed to remove node exclude %s", nodePoolGroupName, lastReplicaNodeName)
Expand All @@ -209,13 +210,9 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu

func (r *ScalerReconciler) excludeNode(currentStatus opsterv1.ComponentStatus, currentSts appsv1.StatefulSet, nodePoolGroupName string) error {
lg := log.FromContext(r.ctx)
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
annotations := map[string]string{"cluster-name": r.instance.GetName()}
if err != nil {
return err
}

clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client for scaling")
Expand Down Expand Up @@ -272,12 +269,8 @@ func (r *ScalerReconciler) drainNode(currentStatus opsterv1.ComponentStatus, cur
lg := log.FromContext(r.ctx)
annotations := map[string]string{"cluster-name": r.instance.GetName()}
lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1)
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return err
}

clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
return err
}
Expand Down Expand Up @@ -328,12 +321,8 @@ func (r *ScalerReconciler) removeStatefulSet(sts appsv1.StatefulSet) (*ctrl.Resu
}

// Gracefully remove nodes
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return nil, err
}
annotations := map[string]string{"cluster-name": r.instance.GetName()}
clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client")
Expand Down

0 comments on commit 70dca28

Please sign in to comment.