Skip to content

Commit

Permalink
Forget failing nodes which are no longer needed (#17)
Browse files Browse the repository at this point in the history
* Add method for getting failed nodes in the cluster
* Add method to forget a node in cluster
* Forget failing nodes on reconcile
  • Loading branch information
chris-cmsoft authored May 27, 2022
1 parent 2975723 commit a784f63
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 9 deletions.
30 changes: 21 additions & 9 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"github.com/containersolutions/redis-cluster-operator/internal/kubernetes"
redis_internal "github.com/containersolutions/redis-cluster-operator/internal/redis"
"github.com/containersolutions/redis-cluster-operator/internal/utils"
Expand Down Expand Up @@ -231,17 +232,16 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
// This can also be augmented by doing cluster meet for all ready nodes, and ignoring any none ready ones.
// If the amount of ready pods is equal to the amount of nodes needed, we probably have some additional nodes we need to remove.
// We can forget these additional nodes, as they are probably nodes which pods got killed.
if len(clusterNodes.Nodes) == int(redisCluster.NodesNeeded()) {
logger.Info("Meeting Redis nodes")
err = clusterNodes.ClusterMeet(ctx)
if err != nil {
return r.RequeueError(ctx, "Could not meet all nodes together", err)
}
// We'll wait for 10 seconds to ensure the meet is propagated
time.Sleep(time.Second * 5)
logger.Info("Meeting Redis nodes")
err = clusterNodes.ClusterMeet(ctx)
if err != nil {
return r.RequeueError(ctx, "Could not meet all nodes together", err)
}
// We'll wait for 10 seconds to ensure the meet is propagated
time.Sleep(time.Second * 5)
// endregion

logger.Info("Checking Cluster Master Replica Ratio")
// region Ensure Cluster Replication Ratio
err = clusterNodes.EnsureClusterReplicationRatio(ctx, redisCluster)
if err != nil {
Expand All @@ -255,7 +255,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

// region Assign Slots
logger.Info("Assigning missing slots")
logger.Info("Assigning Missing Slots")
slotsAssignments := clusterNodes.CalculateSlotAssignment()
for node, slots := range slotsAssignments {
if len(slots) == 0 {
Expand All @@ -271,6 +271,18 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}
// endregion

logger.Info("Forgetting Failed Nodes No Longer Valid")
failingNodes, err := clusterNodes.GetFailingNodes(ctx)
if err != nil {
return r.RequeueError(ctx, "could not fetch failing nodes", err)
}
for _, node := range failingNodes {
err = clusterNodes.ForgetNode(ctx, node)
if err != nil {
return r.RequeueError(ctx, fmt.Sprintf("could not forget node %s", node.NodeAttributes.ID), err)
}
}
}

return ctrl.Result{
Expand Down
47 changes: 47 additions & 0 deletions internal/redis/cluster_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redis

import (
"context"
"errors"
"github.com/containersolutions/redis-cluster-operator/api/v1alpha1"
"math"
"sort"
Expand All @@ -21,6 +22,52 @@ func (c *ClusterNodes) ReloadNodes(ctx context.Context) error {
return nil
}

func (c *ClusterNodes) GetCommandingNode(ctx context.Context) (*Node, error) {
for _, node := range c.Nodes {
if node.IsMaster() {
err := node.Ping(ctx).Err()
return node, err
}
}
return nil, errors.New("no commanding nodes found")
}

func (c *ClusterNodes) ForgetNode(ctx context.Context, forgetNode *Node) error {
for _, node := range c.Nodes {
err := node.ClusterForget(ctx, forgetNode.NodeAttributes.ID).Err()
if err != nil {
return err
}
}
return nil
}

// GetFailingNodes returns a list of all the nodes marked as failing in the cluster.
// Any nodes marked as failing in `cluster nodes` command will be returned
// We will most likely not be able to connect to these nodes as they would be restarted pods
func (c *ClusterNodes) GetFailingNodes(ctx context.Context) ([]*Node, error) {
node, err := c.GetCommandingNode(ctx)
if err != nil {
return nil, err
}

friends, err := node.GetFriends(ctx)

if err != nil {
return nil, err
}

var result []*Node

for _, friend := range friends {
if friend.NodeAttributes.HasFlag("fail") {
result = append(result, friend)
}
}

return result, nil
}

func (c *ClusterNodes) ClusterMeet(ctx context.Context) error {
for _, node := range c.Nodes {
for _, joinNode := range c.Nodes {
Expand Down
105 changes: 105 additions & 0 deletions internal/redis/cluster_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,108 @@ func TestClusterNodes_EnsureClusterReplicationRatioIfTooFewMasters(t *testing.T)
}
}
}

func TestClusterNodes_GetFailingNodes(t *testing.T) {
var nodes []*Node
mocks := map[string]*redismock.ClientMock{}
for i := 0; i <= 0; i++ {
node, err := NewNode(context.TODO(), &redis.Options{
Addr: "10.20.30.40:6379",
}, func(opt *redis.Options) *redis.Client {
client, mock := redismock.NewClientMock()
switch i {
case 0:
mock.MatchExpectationsInOrder(false)
mock.ExpectPing().SetVal("PONG")
clusterNodeString := `c9d83f035342c51c8d23b32339f37656becd14c9 10.20.30.40:6379@16379 myself,master - 0 1653647426553 3 connected 0-5461
1a4c602fc868c69b74fc13f9b0410a20241c7197 10.20.30.41:6379@16379 master,fail - 1653646405584 1653646403000 4 connected
`
// Cluster nodes will be called twice. Once for creating the nodes, the next for getting friends.
mock.ExpectClusterNodes().SetVal(clusterNodeString)
mock.ExpectClusterNodes().SetVal(clusterNodeString)
mocks["c9d83f035342c51c8d23b32339f37656becd14c9"] = &mock
}
return client
})
if err != nil {
t.Fatalf("Got error whil trying to create node. %v", err)
}
nodes = append(nodes, node)
}
clusterNodes := ClusterNodes{
Nodes: nodes,
}
failingNodes, err := clusterNodes.GetFailingNodes(context.TODO())
if err != nil {
t.Fatalf("Failed to get failing nodes. %v", err)
}
if len(failingNodes) != 1 {
t.Fatalf("incorrect amount of failing nodes returned")
}
if failingNodes[0].NodeAttributes.ID != "1a4c602fc868c69b74fc13f9b0410a20241c7197" {
t.Fatalf("Incorrect node returned for failing nodes. Expected 1a4c602fc868c69b74fc13f9b0410a20241c7197. Got %s", failingNodes[0].NodeAttributes.ID)
}
for node, mock := range mocks {
realMock := *mock
if err = realMock.ExpectationsWereMet(); err != nil {
t.Fatalf("Not all expectations from redis were met. Node %s. Err: %v", node, err)
}
}
}

func TestClusterNodes_ForgetNode(t *testing.T) {
var nodes []*Node
mocks := map[string]*redismock.ClientMock{}
for i := 0; i <= 1; i++ {
node, err := NewNode(context.TODO(), &redis.Options{
Addr: "10.20.30.40:6379",
}, func(opt *redis.Options) *redis.Client {
client, mock := redismock.NewClientMock()
switch i {
case 0:
mock.MatchExpectationsInOrder(false)
clusterNodeString := `1cbbfae6453680475e523e4d28438b1c1acf8cd3 10.20.30.40:6379@16379 myself,master - 0 1653647424000 2 connected 5462-10923
c9d83f035342c51c8d23b32339f37656becd14c9 10.20.30.41:6379@16379 master - 0 1653647426553 3 connected 0-5461
1a4c602fc868c69b74fc13f9b0410a20241c7197 10.20.30.42:6379@16379 master,fail - 1653646405584 1653646403000 4 connected`
// Cluster nodes will be called twice. Once for creating the nodes, the next for getting friends.
mock.ExpectClusterNodes().SetVal(clusterNodeString)
mock.ExpectClusterForget("1a4c602fc868c69b74fc13f9b0410a20241c7197").SetVal("OK")
mocks["1cbbfae6453680475e523e4d28438b1c1acf8cd3"] = &mock
case 1:
mock.MatchExpectationsInOrder(false)
clusterNodeString := `1cbbfae6453680475e523e4d28438b1c1acf8cd3 10.20.30.40:6379@16379 master - 0 1653647424000 2 connected 5462-10923
c9d83f035342c51c8d23b32339f37656becd14c9 10.20.30.41:6379@16379 myself,master - 0 1653647426553 3 connected 0-5461
1a4c602fc868c69b74fc13f9b0410a20241c7197 10.20.30.42:6379@16379 master,fail - 1653646405584 1653646403000 4 connected`
// Cluster nodes will be called twice. Once for creating the nodes, the next for getting friends.
mock.ExpectClusterNodes().SetVal(clusterNodeString)
mock.ExpectClusterForget("1a4c602fc868c69b74fc13f9b0410a20241c7197").SetVal("OK")
mocks["c9d83f035342c51c8d23b32339f37656becd14c9"] = &mock
}
return client
})
if err != nil {
t.Fatalf("Got error whil trying to create node. %v", err)
}
nodes = append(nodes, node)
}
clusterNodes := ClusterNodes{
Nodes: nodes,
}
removeAbleNode := &Node{
NodeAttributes: NodeAttributes{
ID: "1a4c602fc868c69b74fc13f9b0410a20241c7197",
host: "10.20.30.42",
port: "6379",
},
}
err := clusterNodes.ForgetNode(context.TODO(), removeAbleNode)
if err != nil {
t.Fatalf("Failed to forget node. %v", err)
}
for node, mock := range mocks {
realMock := *mock
if err = realMock.ExpectationsWereMet(); err != nil {
t.Fatalf("Not all expectations from redis were met. Node %s. Err: %v", node, err)
}
}
}

0 comments on commit a784f63

Please sign in to comment.