Skip to content

Commit

Permalink
Skipping health check on nodes if EC2 returns throttling errors
Browse files Browse the repository at this point in the history
  • Loading branch information
haouc committed Oct 21, 2024
1 parent 21961a7 commit 0296f7f
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 3 deletions.
5 changes: 5 additions & 0 deletions controllers/core/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func (r *NodeReconciler) Check() healthz.Checker {
return nil
}

if r.Manager.SkipHealthCheck() {
// node manager observes EC2 error on processing node, pausing reconciler check to avoid stressing the system
return nil
}

err := rcHealthz.PingWithTimeout(func(c chan<- error) {
// when the reconciler is ready, testing the reconciler with a fake node request
pingRequest := &ctrl.Request{
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 35 additions & 3 deletions pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type manager struct {
worker asyncWorker.Worker
conditions condition.Conditions
controllerVersion string
stopHealthCheckAt time.Time
}

// Manager to perform operation on list of managed/un-managed node
Expand All @@ -66,6 +67,7 @@ type Manager interface {
UpdateNode(nodeName string) error
DeleteNode(nodeName string) error
CheckNodeForLeakedENIs(nodeName string)
SkipHealthCheck() bool
}

// AsyncOperation is operation on a node after the lock has been released.
Expand Down Expand Up @@ -96,6 +98,8 @@ type AsyncOperationJob struct {
nodeName string
}

const pausingHealthCheckDuration = 30 * time.Minute

// NewNodeManager returns a new node manager
func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager,
wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions, controllerVersion string, healthzHandler *rcHealthz.HealthzHandler) (Manager, error) {
Expand Down Expand Up @@ -425,6 +429,10 @@ func (m *manager) performAsyncOperation(job interface{}) (ctrl.Result, error) {
utils.SendNodeEventWithNodeName(m.wrapper.K8sAPI, asyncJob.nodeName, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", m.controllerVersion), v1.EventTypeNormal, m.Log)
err = asyncJob.node.InitResources(m.resourceManager)
if err != nil {
if pauseHealthCheckOnError(err) {
m.setStopHealthCheck()
log.Info("node manager sets a pause on health check due to observing a EC2 error", "error", err.Error())
}
log.Error(err, "removing the node from cache as it failed to initialize")
m.removeNodeSafe(asyncJob.nodeName)
// if initializing node failed, we want to make this visible although the manager will retry
Expand Down Expand Up @@ -565,12 +573,36 @@ func (m *manager) check() healthz.Checker {
randomName := uuid.New().String()
_, found := m.GetNode(randomName)
m.Log.V(1).Info("health check tested ping GetNode to check on datastore cache in node manager successfully", "TesedNodeName", randomName, "NodeFound", found)
var ping interface{}
m.worker.SubmitJob(ping)
m.Log.V(1).Info("health check tested ping SubmitJob with a nil job to check on worker queue in node manager successfully")
if m.SkipHealthCheck() {
m.Log.Info("due to EC2 error, node manager skips node worker queue health check for now")
} else {
var ping interface{}
m.worker.SubmitJob(ping)
m.Log.V(1).Info("health check tested ping SubmitJob with a nil job to check on worker queue in node manager successfully")
}
c <- nil
}, m.Log)

return err
}
}

func (m *manager) SkipHealthCheck() bool {
m.lock.RLock()
defer m.lock.RUnlock()

return time.Since(m.stopHealthCheckAt) < pausingHealthCheckDuration
}

func (m *manager) setStopHealthCheck() {
m.lock.Lock()
defer m.lock.Unlock()

m.stopHealthCheckAt = time.Now()
}

func pauseHealthCheckOnError(err error) bool {
return lo.ContainsBy(utils.PauseHealthCheckErrors, func(e string) bool {
return strings.Contains(err.Error(), e)
})
}
26 changes: 26 additions & 0 deletions pkg/node/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1"
rcV1alpha1 "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
Expand Down Expand Up @@ -684,6 +685,31 @@ func Test_performAsyncOperation_fail(t *testing.T) {
assert.NoError(t, err)
}

func Test_performAsyncOperation_fail_pausingHealthCheck(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mock := NewMock(ctrl, map[string]node.Node{nodeName: managedNode})

job := AsyncOperationJob{
node: mock.MockNode,
nodeName: nodeName,
op: Init,
}

mock.MockNode.EXPECT().InitResources(mock.MockResourceManager).Return(&node.ErrInitResources{
Err: errors.New("RequestLimitExceeded: Request limit exceeded.\n\tstatus code: 503, request id: 123-123-123-123-123"),
})
mock.MockK8sAPI.EXPECT().GetNode(nodeName).Return(v1Node, nil)
mock.MockK8sAPI.EXPECT().BroadcastEvent(v1Node, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", mock.Manager.controllerVersion), v1.EventTypeNormal).Times(1)

_, err := mock.Manager.performAsyncOperation(job)
time.Sleep(time.Second * 1)
assert.True(t, mock.Manager.SkipHealthCheck())
assert.NotContains(t, mock.Manager.dataStore, nodeName) // It should be cleared from cache
assert.NoError(t, err)
}

// Test_isPodENICapacitySet test if the pod-eni capacity then true is returned
func Test_isPodENICapacitySet(t *testing.T) {
ctrl := gomock.NewController(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
ErrInsufficientCidrBlocks = errors.New("InsufficientCidrBlocks: The specified subnet does not have enough free cidr blocks to satisfy the request")
ErrMsgProviderAndPoolNotFound = "cannot find the instance provider and pool from the cache"
NotRetryErrors = []string{InsufficientCidrBlocksReason}
PauseHealthCheckErrors = []string{"RequestLimitExceeded"}
)

// ShouldRetryOnError returns true if the error is retryable, else returns false
Expand Down

0 comments on commit 0296f7f

Please sign in to comment.