Skip to content

Commit

Permalink
support version notice to nodes (aws#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
haouc authored Aug 24, 2023
1 parent a6e4cd7 commit 28e332d
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func main() {
nodeManagerWorkers := asyncWorkers.NewDefaultWorkerPool("node async workers",
10, 1, ctrl.Log.WithName("node async workers"), ctx)
nodeManager, err := manager.NewNodeManager(ctrl.Log.WithName("node manager"), resourceManager,
apiWrapper, nodeManagerWorkers, controllerConditions, healthzHandler)
apiWrapper, nodeManagerWorkers, controllerConditions, version.GitVersion, healthzHandler)

if err != nil {
ctrl.Log.Error(err, "failed to init node manager")
Expand Down
21 changes: 12 additions & 9 deletions pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type manager struct {
// wrapper around the clients for all APIs used by controller
wrapper api.Wrapper
// worker for performing async operation on node APIs
worker asyncWorker.Worker
conditions condition.Conditions
worker asyncWorker.Worker
conditions condition.Conditions
controllerVersion string
}

// Manager to perform operation on list of managed/un-managed node
Expand Down Expand Up @@ -94,15 +95,16 @@ type AsyncOperationJob struct {

// NewNodeManager returns a new node manager
func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager,
wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions, healthzHandler *rcHealthz.HealthzHandler) (Manager, error) {
wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions, controllerVersion string, healthzHandler *rcHealthz.HealthzHandler) (Manager, error) {

manager := &manager{
resourceManager: resourceManager,
Log: logger,
dataStore: make(map[string]node.Node),
wrapper: wrapper,
worker: worker,
conditions: conditions,
resourceManager: resourceManager,
Log: logger,
dataStore: make(map[string]node.Node),
wrapper: wrapper,
worker: worker,
conditions: conditions,
controllerVersion: controllerVersion,
}

// add health check on subpath for node manager
Expand Down Expand Up @@ -386,6 +388,7 @@ func (m *manager) performAsyncOperation(job interface{}) (ctrl.Result, error) {
var err error
switch asyncJob.op {
case Init:
utils.SendNodeEventWithNodeName(m.wrapper.K8sAPI, asyncJob.nodeName, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", m.controllerVersion), v1.EventTypeNormal, m.Log)
err = asyncJob.node.InitResources(m.resourceManager)
if err != nil {
log.Error(err, "removing the node from cache as it failed to initialize")
Expand Down
8 changes: 6 additions & 2 deletions pkg/node/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func Test_GetNewManager(t *testing.T) {
mock := NewMock(ctrl, map[string]node.Node{})

mock.MockWorker.EXPECT().StartWorkerPool(gomock.Any()).Return(nil)
manager, err := NewNodeManager(zap.New(), nil, api.Wrapper{}, mock.MockWorker, mock.MockConditions, healthzHandler)
manager, err := NewNodeManager(zap.New(), nil, api.Wrapper{}, mock.MockWorker, mock.MockConditions, "v1.3.1", healthzHandler)

assert.NotNil(t, manager)
assert.NoError(t, err)
Expand All @@ -179,7 +179,7 @@ func Test_GetNewManager_Error(t *testing.T) {
mock := NewMock(ctrl, map[string]node.Node{})

mock.MockWorker.EXPECT().StartWorkerPool(gomock.Any()).Return(mockError)
manager, err := NewNodeManager(zap.New(), nil, api.Wrapper{}, mock.MockWorker, mock.MockConditions, healthzHandler)
manager, err := NewNodeManager(zap.New(), nil, api.Wrapper{}, mock.MockWorker, mock.MockConditions, "v1.3.1", healthzHandler)

assert.NotNil(t, manager)
assert.Error(t, err, mockError)
Expand Down Expand Up @@ -640,6 +640,8 @@ func Test_performAsyncOperation(t *testing.T) {
job.op = Init

mock.MockK8sAPI.EXPECT().AddLabelToManageNode(v1Node, config.HasTrunkAttachedLabel, config.BooleanTrue).Return(true, nil).AnyTimes()
mock.MockK8sAPI.EXPECT().GetNode(nodeName).Return(v1Node, nil)
mock.MockK8sAPI.EXPECT().BroadcastEvent(v1Node, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", mock.Manager.controllerVersion), v1.EventTypeNormal).Times(1)
mock.MockNode.EXPECT().InitResources(mock.MockResourceManager).Return(nil)
mock.MockNode.EXPECT().UpdateResources(mock.MockResourceManager).Return(nil)
_, err := mock.Manager.performAsyncOperation(job)
Expand Down Expand Up @@ -674,6 +676,8 @@ func Test_performAsyncOperation_fail(t *testing.T) {
}

mock.MockNode.EXPECT().InitResources(mock.MockResourceManager).Return(&node.ErrInitResources{})
mock.MockK8sAPI.EXPECT().GetNode(nodeName).Return(v1Node, nil)
mock.MockK8sAPI.EXPECT().BroadcastEvent(v1Node, utils.VersionNotice, fmt.Sprintf("The node is managed by VPC resource controller version %s", mock.Manager.controllerVersion), v1.EventTypeNormal).Times(1)

_, err := mock.Manager.performAsyncOperation(job)
assert.NotContains(t, mock.Manager.dataStore, nodeName) // It should be cleared from cache
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
NodeTrunkInitiatedReason = "NodeTrunkInitiated"
NodeTrunkFailedInitializationReason = "NodeTrunkFailedInit"
EniConfigNameNotFoundReason = "EniConfigNameNotFound"
VersionNotice = "ControllerVersionNotice"
)

func SendNodeEventWithNodeName(client k8s.K8sWrapper, nodeName, reason, msg, eventType string, logger logr.Logger) {
Expand Down

0 comments on commit 28e332d

Please sign in to comment.