Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable wait time between restart and preferred leader election #8

Open
wants to merge 1 commit into
base: rackrolling-update
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ private Future<Void> maybeRollKafkaKraft(Set<NodeRef> nodes,
kafka.getKafkaVersion(),
logging,
operationTimeoutMs,
10000L,
1,
3,
3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private Alarm(Time time, long deadline, Supplier<String> timeoutMessageSupplier)
}

/**
* Creates an Alerm
* Creates an Alarm
* @param time The source of time
* @param timeoutMs The timeout for this alarm.
* @param timeoutMessageSupplier The exception message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ private void restartInParallel(Reconciliation reconciliation,
AgentClient agentClient,
Set<Context> batch,
long timeoutMs,
long waitBetweenRestartAndPreferredLeaderElection,
int maxRestarts) throws TimeoutException {
for (Context context : batch) {
restartNode(reconciliation, time, platformClient, context, maxRestarts);
Expand All @@ -357,6 +358,7 @@ private void restartInParallel(Reconciliation reconciliation,
try {
remainingTimeoutMs = awaitState(reconciliation, time, platformClient, agentClient, context, State.SERVING, remainingTimeoutMs);
if (context.currentRoles().broker()) {
time.sleep(waitBetweenRestartAndPreferredLeaderElection, 0);
awaitPreferred(reconciliation, time, rollClient, context, remainingTimeoutMs);
}
} catch (TimeoutException e) {
Expand Down Expand Up @@ -507,6 +509,7 @@ public static RackRolling rollingRestart(PodOperator podOperator,
KafkaVersion kafkaVersion,
String kafkaLogging,
long postOperationTimeoutMs,
long waitBetweenRestartAndPreferredLeaderElection,
int maxRestartBatchSize,
int maxRestarts,
int maxReconfigs,
Expand All @@ -530,6 +533,7 @@ public static RackRolling rollingRestart(PodOperator podOperator,
kafkaConfigProvider,
kafkaLogging,
postOperationTimeoutMs,
waitBetweenRestartAndPreferredLeaderElection,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if we can have a smaller name? Something like restartAndPreferredLeaderElectionDelay.

maxRestartBatchSize,
maxRestarts,
maxReconfigs,
Expand All @@ -551,6 +555,7 @@ protected static RackRolling rollingRestart(Time time,
Function<Integer, String> kafkaConfigProvider,
String desiredLogging,
long postOperationTimeoutMs,
long waitBetweenRestartAndPreferredLeaderElection,
int maxRestartBatchSize,
int maxRestarts,
int maxReconfigs,
Expand All @@ -568,6 +573,7 @@ protected static RackRolling rollingRestart(Time time,
kafkaConfigProvider,
desiredLogging,
postOperationTimeoutMs,
waitBetweenRestartAndPreferredLeaderElection,
maxRestartBatchSize,
maxRestarts,
maxReconfigs,
Expand All @@ -590,6 +596,7 @@ protected static RackRolling rollingRestart(Time time,
private final Function<Integer, String> kafkaConfigProvider;
private final String desiredLogging;
private final long postOperationTimeoutMs;
private final long waitBetweenRestartAndPreferredLeaderElectionMs;
private final int maxRestartBatchSize;
private final int maxRestarts;
private final int maxReconfigs;
Expand Down Expand Up @@ -625,6 +632,7 @@ public RackRolling(Time time,
Function<Integer, String> kafkaConfigProvider,
String desiredLogging,
long postOperationTimeoutMs,
long waitBetweenRestartAndPreferredLeaderElection,
int maxRestartBatchSize,
int maxRestarts,
int maxReconfigs,
Expand All @@ -640,6 +648,7 @@ public RackRolling(Time time,
this.kafkaConfigProvider = kafkaConfigProvider;
this.desiredLogging = desiredLogging;
this.postOperationTimeoutMs = postOperationTimeoutMs;
this.waitBetweenRestartAndPreferredLeaderElectionMs = waitBetweenRestartAndPreferredLeaderElection;
this.maxRestartBatchSize = maxRestartBatchSize;
this.maxRestarts = maxRestarts;
this.maxReconfigs = maxReconfigs;
Expand Down Expand Up @@ -726,7 +735,7 @@ public List<Integer> loop() throws TimeoutException, InterruptedException, Execu
// We want to give nodes chance to get ready before we try to connect to the or consider them for rolling.
// This is important especially for nodes which were just started.
LOGGER.debugCr(reconciliation, "Waiting for nodes {} to become ready before initialising plan in case they just started", unreadyNodes);
waitForUnreadyNodes(unreadyNodes, true);
awaitReadiness(unreadyNodes, true);
}

var byPlan = initialPlan(contexts, rollClient);
Expand Down Expand Up @@ -774,7 +783,7 @@ public List<Integer> loop() throws TimeoutException, InterruptedException, Execu
// from taking out a node each time (due, e.g. to a configuration error).
LOGGER.debugCr(reconciliation, "Nodes {} do not need to be restarted", unreadyNodes);
LOGGER.debugCr(reconciliation, "Waiting for non-restarted nodes {} to become ready", unreadyNodes);
return waitForUnreadyNodes(unreadyNodes, false);
return awaitReadiness(unreadyNodes, false);
}
}

Expand Down Expand Up @@ -830,34 +839,22 @@ private List<Integer> restartNodes(List<Context> nodesToRestart, int totalNumOfC
var batchOfContexts = nodesToRestart.stream().filter(context -> batchOfIds.contains(context.nodeId())).collect(Collectors.toSet());
LOGGER.debugCr(reconciliation, "Restart batch: {}", batchOfContexts);
// restart a batch
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, maxRestarts);
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, waitBetweenRestartAndPreferredLeaderElectionMs, maxRestarts);

return batchOfIds.stream().toList();
}

private List<Integer> reconfigureNodes(List<Context> contexts) {
List<Integer> reconfiguredNode = List.of();
for (var context : contexts) {
// TODO decide whether to support canary reconfiguration for cluster-scoped configs (nice to have)
try {
reconfigureNode(reconciliation, time, rollClient, context, maxReconfigs);
} catch (RuntimeException e) {
return List.of(context.nodeId());
}

time.sleep(postOperationTimeoutMs / 2, 0);
// TODO decide whether we need an explicit healthcheck here
// or at least to know that the kube health check probe will have failed at the time
// we break to OUTER (We need to test a scenario of breaking configuration change, does this sleep catch it?)
awaitPreferred(reconciliation, time, rollClient, context, postOperationTimeoutMs / 2);
// termination condition
if (contexts.stream().allMatch(context2 -> context2.state().equals(State.LEADING_ALL_PREFERRED))) {
LOGGER.debugCr(reconciliation, "Terminate: All nodes leading preferred replicas after reconfigure");
break;
}
reconfiguredNode = List.of(context.nodeId());
}
return reconfiguredNode;
awaitReadiness(contexts, false);
return contexts.stream().map(Context::nodeId).collect(Collectors.toList());
}

private List<Integer> waitForLogRecovery(List<Context> contexts) {
Expand All @@ -877,7 +874,7 @@ private List<Integer> waitForLogRecovery(List<Context> contexts) {
return contexts.stream().map(Context::nodeId).collect(Collectors.toList());
}

private List<Integer> waitForUnreadyNodes(List<Context> contexts, boolean ignoreTimeout) {
private List<Integer> awaitReadiness(List<Context> contexts, boolean ignoreTimeout) {
long remainingTimeoutMs = postOperationTimeoutMs;
for (Context context : contexts) {
try {
Expand Down Expand Up @@ -915,7 +912,7 @@ private List<Integer> restartUnReadyNodes(List<Context> contexts, int totalNumOf
LOGGER.warnCr(reconciliation, "All controller nodes are combined and they are not running, therefore restarting them all now");
// if all controller nodes (except a single node quorum) are combined and all of them are not running e.g. Pending, we need to restart them all at the same time to form the quorum.
// This is because until the quorum has been formed and broker process can connect to it, the combined nodes do not become ready.
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, maxRestarts);
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, waitBetweenRestartAndPreferredLeaderElectionMs, maxRestarts);
return combinedNodesToRestart.stream().map(Context::nodeId).toList();
}

Expand Down Expand Up @@ -987,14 +984,8 @@ private Map<Plan, List<Context>> initialPlan(List<Context> contexts, RollClient
// If a pure controller's configuration has changed, it should have non-empty reasons to restart.
return Plan.NOP;
} else {
if (context.numReconfigs() > 0
&& context.state() == State.LEADING_ALL_PREFERRED) {
LOGGER.debugCr(reconciliation, "{} has already been reconfigured", context.nodeRef());
return Plan.NOP;
} else {
LOGGER.debugCr(reconciliation, "{} may need to be reconfigured", context.nodeRef());
return Plan.MAYBE_RECONFIGURE;
}
LOGGER.debugCr(reconciliation, "{} may need to be reconfigured", context.nodeRef());
return Plan.MAYBE_RECONFIGURE;
}
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,18 +306,21 @@ public int tryElectAllPreferredLeaders(NodeRef nodeRef) {
for (TopicPartitionInfo topicPartitionInfo : td.partitions()) {
if (!topicPartitionInfo.replicas().isEmpty()
&& topicPartitionInfo.replicas().get(0).id() == nodeRef.nodeId() // this node is preferred leader
&& topicPartitionInfo.leader().id() != nodeRef.nodeId()) { // this onde is not current leader
&& topicPartitionInfo.leader().id() != nodeRef.nodeId()) { // this node is not current leader
toElect.add(new TopicPartition(td.name(), topicPartitionInfo.partition()));
}
}
}

var electionResults = brokerAdmin.electLeaders(ElectionType.PREFERRED, toElect).partitions().get();

long count = electionResults.values().stream()
.filter(Optional::isPresent)
.count();
return count > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) count;
if (toElect.size() > 0) {
var electionResults = brokerAdmin.electLeaders(ElectionType.PREFERRED, toElect).partitions().get();

long count = electionResults.values().stream()
.filter(Optional::isPresent)
.count();
return count > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) count;
} else {
return 0;
}
} catch (InterruptedException e) {
throw new UncheckedInterruptedException(e);
} catch (ExecutionException e) {
Expand Down
Loading