diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java index d1421fcd5f3..198415bf663 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java @@ -505,6 +505,7 @@ private Future maybeRollKafkaKraft(Set nodes, kafka.getKafkaVersion(), logging, operationTimeoutMs, + 10000L, 1, 3, 3, diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/Alarm.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/Alarm.java index 19f65ac9ede..b3403c24314 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/Alarm.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/Alarm.java @@ -44,7 +44,7 @@ private Alarm(Time time, long deadline, Supplier timeoutMessageSupplier) } /** - * Creates an Alerm + * Creates an Alarm * @param time The source of time * @param timeoutMs The timeout for this alarm. * @param timeoutMessageSupplier The exception message diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java index 2a8f899d370..38e638747be 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java @@ -348,6 +348,7 @@ private void restartInParallel(Reconciliation reconciliation, AgentClient agentClient, Set batch, long timeoutMs, + long waitBetweenRestartAndPreferredLeaderElection, int maxRestarts) throws TimeoutException { for (Context context : batch) { restartNode(reconciliation, time, platformClient, context, maxRestarts); @@ -357,6 +358,7 @@ private void restartInParallel(Reconciliation reconciliation, try { remainingTimeoutMs = awaitState(reconciliation, time, platformClient, agentClient, context, State.SERVING, remainingTimeoutMs); if (context.currentRoles().broker()) { + time.sleep(waitBetweenRestartAndPreferredLeaderElection, 0); awaitPreferred(reconciliation, time, rollClient, context, remainingTimeoutMs); } } catch (TimeoutException e) { @@ -507,6 +509,7 @@ public static RackRolling rollingRestart(PodOperator podOperator, KafkaVersion kafkaVersion, String kafkaLogging, long postOperationTimeoutMs, + long waitBetweenRestartAndPreferredLeaderElection, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -530,6 +533,7 @@ public static RackRolling rollingRestart(PodOperator podOperator, kafkaConfigProvider, kafkaLogging, postOperationTimeoutMs, + waitBetweenRestartAndPreferredLeaderElection, maxRestartBatchSize, maxRestarts, maxReconfigs, @@ -551,6 +555,7 @@ protected static RackRolling rollingRestart(Time time, Function kafkaConfigProvider, String desiredLogging, long postOperationTimeoutMs, + long waitBetweenRestartAndPreferredLeaderElection, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -568,6 +573,7 @@ protected static RackRolling rollingRestart(Time time, kafkaConfigProvider, desiredLogging, postOperationTimeoutMs, + waitBetweenRestartAndPreferredLeaderElection, maxRestartBatchSize, maxRestarts, maxReconfigs, @@ -590,6 +596,7 @@ protected static RackRolling rollingRestart(Time time, private final Function kafkaConfigProvider; private final String desiredLogging; private final long postOperationTimeoutMs; + private final long waitBetweenRestartAndPreferredLeaderElectionMs; private final int maxRestartBatchSize; private final int maxRestarts; private final int maxReconfigs; @@ -625,6 +632,7 @@ public RackRolling(Time time, Function kafkaConfigProvider, String desiredLogging, long postOperationTimeoutMs, + long waitBetweenRestartAndPreferredLeaderElection, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -640,6 +648,7 @@ public RackRolling(Time time, this.kafkaConfigProvider = kafkaConfigProvider; this.desiredLogging = desiredLogging; this.postOperationTimeoutMs = postOperationTimeoutMs; + this.waitBetweenRestartAndPreferredLeaderElectionMs = waitBetweenRestartAndPreferredLeaderElection; this.maxRestartBatchSize = maxRestartBatchSize; this.maxRestarts = maxRestarts; this.maxReconfigs = maxReconfigs; @@ -726,7 +735,7 @@ public List loop() throws TimeoutException, InterruptedException, Execu // We want to give nodes chance to get ready before we try to connect to the or consider them for rolling. // This is important especially for nodes which were just started. LOGGER.debugCr(reconciliation, "Waiting for nodes {} to become ready before initialising plan in case they just started", unreadyNodes); - waitForUnreadyNodes(unreadyNodes, true); + awaitReadiness(unreadyNodes, true); } var byPlan = initialPlan(contexts, rollClient); @@ -774,7 +783,7 @@ public List loop() throws TimeoutException, InterruptedException, Execu // from taking out a node each time (due, e.g. to a configuration error). LOGGER.debugCr(reconciliation, "Nodes {} do not need to be restarted", unreadyNodes); LOGGER.debugCr(reconciliation, "Waiting for non-restarted nodes {} to become ready", unreadyNodes); - return waitForUnreadyNodes(unreadyNodes, false); + return awaitReadiness(unreadyNodes, false); } } @@ -830,34 +839,22 @@ private List restartNodes(List nodesToRestart, int totalNumOfC var batchOfContexts = nodesToRestart.stream().filter(context -> batchOfIds.contains(context.nodeId())).collect(Collectors.toSet()); LOGGER.debugCr(reconciliation, "Restart batch: {}", batchOfContexts); // restart a batch - restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, maxRestarts); + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, waitBetweenRestartAndPreferredLeaderElectionMs, maxRestarts); return batchOfIds.stream().toList(); } private List reconfigureNodes(List contexts) { - List reconfiguredNode = List.of(); for (var context : contexts) { - // TODO decide whether to support canary reconfiguration for cluster-scoped configs (nice to have) try { reconfigureNode(reconciliation, time, rollClient, context, maxReconfigs); } catch (RuntimeException e) { return List.of(context.nodeId()); } - time.sleep(postOperationTimeoutMs / 2, 0); - // TODO decide whether we need an explicit healthcheck here - // or at least to know that the kube health check probe will have failed at the time - // we break to OUTER (We need to test a scenario of breaking configuration change, does this sleep catch it?) - awaitPreferred(reconciliation, time, rollClient, context, postOperationTimeoutMs / 2); - // termination condition - if (contexts.stream().allMatch(context2 -> context2.state().equals(State.LEADING_ALL_PREFERRED))) { - LOGGER.debugCr(reconciliation, "Terminate: All nodes leading preferred replicas after reconfigure"); - break; - } - reconfiguredNode = List.of(context.nodeId()); } - return reconfiguredNode; + awaitReadiness(contexts, false); + return contexts.stream().map(Context::nodeId).collect(Collectors.toList()); } private List waitForLogRecovery(List contexts) { @@ -877,7 +874,7 @@ private List waitForLogRecovery(List contexts) { return contexts.stream().map(Context::nodeId).collect(Collectors.toList()); } - private List waitForUnreadyNodes(List contexts, boolean ignoreTimeout) { + private List awaitReadiness(List contexts, boolean ignoreTimeout) { long remainingTimeoutMs = postOperationTimeoutMs; for (Context context : contexts) { try { @@ -915,7 +912,7 @@ private List restartUnReadyNodes(List contexts, int totalNumOf LOGGER.warnCr(reconciliation, "All controller nodes are combined and they are not running, therefore restarting them all now"); // if all controller nodes (except a single node quorum) are combined and all of them are not running e.g. Pending, we need to restart them all at the same time to form the quorum. // This is because until the quorum has been formed and broker process can connect to it, the combined nodes do not become ready. - restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, maxRestarts); + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, waitBetweenRestartAndPreferredLeaderElectionMs, maxRestarts); return combinedNodesToRestart.stream().map(Context::nodeId).toList(); } @@ -987,14 +984,8 @@ private Map> initialPlan(List contexts, RollClient // If a pure controller's configuration has changed, it should have non-empty reasons to restart. return Plan.NOP; } else { - if (context.numReconfigs() > 0 - && context.state() == State.LEADING_ALL_PREFERRED) { - LOGGER.debugCr(reconciliation, "{} has already been reconfigured", context.nodeRef()); - return Plan.NOP; - } else { - LOGGER.debugCr(reconciliation, "{} may need to be reconfigured", context.nodeRef()); - return Plan.MAYBE_RECONFIGURE; - } + LOGGER.debugCr(reconciliation, "{} may need to be reconfigured", context.nodeRef()); + return Plan.MAYBE_RECONFIGURE; } } })); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RollClientImpl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RollClientImpl.java index 1bfe7a7c4b2..b9cb02e4397 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RollClientImpl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RollClientImpl.java @@ -306,18 +306,21 @@ public int tryElectAllPreferredLeaders(NodeRef nodeRef) { for (TopicPartitionInfo topicPartitionInfo : td.partitions()) { if (!topicPartitionInfo.replicas().isEmpty() && topicPartitionInfo.replicas().get(0).id() == nodeRef.nodeId() // this node is preferred leader - && topicPartitionInfo.leader().id() != nodeRef.nodeId()) { // this onde is not current leader + && topicPartitionInfo.leader().id() != nodeRef.nodeId()) { // this node is not current leader toElect.add(new TopicPartition(td.name(), topicPartitionInfo.partition())); } } } - - var electionResults = brokerAdmin.electLeaders(ElectionType.PREFERRED, toElect).partitions().get(); - - long count = electionResults.values().stream() - .filter(Optional::isPresent) - .count(); - return count > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) count; + if (toElect.size() > 0) { + var electionResults = brokerAdmin.electLeaders(ElectionType.PREFERRED, toElect).partitions().get(); + + long count = electionResults.values().stream() + .filter(Optional::isPresent) + .count(); + return count > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) count; + } else { + return 0; + } } catch (InterruptedException e) { throw new UncheckedInterruptedException(e); } catch (ExecutionException e) { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java index 9b8d9c31979..87801f17f49 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java @@ -45,6 +45,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; public class RackRollingTest { @@ -215,6 +216,17 @@ MockBuilder mockDescribeConfigs(RollClient rollClient, Set nodeConf return this; } + MockBuilder mockDescribeConfigsWithUpdatedResult(RollClient rollClient, Configs currentConfigs, Configs updatedConfigs, int nodeId) { + when(rollClient.describeControllerConfigs(any())) + .thenReturn(Map.of(nodeId, currentConfigs)) + .thenReturn(Map.of(nodeId, updatedConfigs)); + + when(rollClient.describeBrokerConfigs(any())) + .thenReturn(Map.of(nodeId, currentConfigs)) + .thenReturn(Map.of(nodeId, updatedConfigs)); + return this; + } + MockBuilder mockQuorumLastCaughtUpTimestamps(RollClient rollClient, Map quorumState) { doReturn(quorumState) .when(rollClient) @@ -305,6 +317,7 @@ private RackRolling newRollingRestart(PlatformClient platformClient, kafkaConfigProvider, null, 120_000, + 0, maxRestartsBatchSize, 1, 1, @@ -318,6 +331,7 @@ private void doRollingRestart(PlatformClient platformClient, Collection nodeRefList, Function reason, Function kafkaConfigProvider, + String desiredLogging, int maxRestartsBatchSize, int maxRestart) throws ExecutionException, InterruptedException, TimeoutException { @@ -332,8 +346,9 @@ private void doRollingRestart(PlatformClient platformClient, KafkaVersionTestUtils.getLatestVersion(), true, kafkaConfigProvider, - null, + desiredLogging, 120_000, + 0, maxRestartsBatchSize, maxRestart, 1, @@ -358,7 +373,7 @@ void shouldNotRestartBrokersWithNoTopicsIfAllHealthyAndNoReason() throws Executi .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -382,7 +397,7 @@ void shouldRestartBrokerWithNoTopicIfReasonManualRolling() throws ExecutionExcep .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -406,7 +421,7 @@ void shouldRestartBrokerIfReasonManualRolling() throws ExecutionException, Inter .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -432,7 +447,7 @@ void shouldThrowMaxRestartsExceededIfBrokerRestartsMoreThanMaxRestarts() { // when var ex = assertThrows(MaxRestartsExceededException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, null, 1, 1)); //then assertEquals("Node pool-kafka-0/0 has been restarted 1 times", ex.getMessage()); @@ -451,7 +466,6 @@ void shouldRestartIfMaxReconfigExceeded() throws ExecutionException, Interrupted .addTopic("topic-A", 0) .mockTopics(rollClient) .mockDescribeConfigs(rollClient, Set.of(new ConfigEntry("compression.type", "zstd")), Set.of(), 0) - .mockElectLeaders(rollClient, List.of(0), 0) .done().get(0); var rr = newRollingRestart(platformClient, @@ -494,7 +508,7 @@ void shouldNotThrowExceptionIfAllPreferredLeaderNotElectedAfterRestart() throws 0) .done(); - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 1, 1); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(0)), any(), any()); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(1)), any(), any()); @@ -526,7 +540,7 @@ void shouldNotThrowExceptionIfAllPreferredLeadersNotElectedAfterReconfig() throw .mockElectLeaders(rollClient, List.of(1), 0) .done(); - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, serverId -> "compression.type=snappy", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, serverId -> "compression.type=snappy", null, 1, 1); Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRefs.get(0)), any(), any()); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(1)), any(), any()); @@ -556,7 +570,7 @@ void shouldRepeatAllPreferredLeaderElectionCallsUntilAllPreferredLeaderElected() .mockElectLeaders(rollClient, List.of(1, 1, 1, 1, 0), 0) .done().get(0); - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, null, 1, 1); Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRef), any()); @@ -580,7 +594,7 @@ void shouldThrowExceptionIfNodeNotAbleToRecoverAfterRestart() { .done().get(0); var te = assertThrows(UnrestartableNodesException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, null, 1, 3)); assertEquals("The max attempts (2) to wait for this node pool-kafka-0/0 to finish performing log recovery has been reached. There are 0 logs and 0 segments left to recover.", te.getMessage()); @@ -607,7 +621,7 @@ void shouldThrowExceptionWithRecoveryProgressIfNodeIsInRecovery() { .done().get(0); var te = assertThrows(RuntimeException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, null, 1, 1)); assertEquals("The max attempts (2) to wait for this node pool-kafka-0/0 to finish performing log recovery has been reached. There are 100 logs and 300 segments left to recover.", te.getMessage()); @@ -633,7 +647,7 @@ void shouldNotRestartUnreadyNodes() { .done().get(0); var te = assertThrows(RuntimeException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 1, 1)); assertEquals("java.util.concurrent.TimeoutException: Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:00:00Z, reason=[], numRestarts=0, numReconfigs=0, numAttempts=2]", te.getMessage()); @@ -676,7 +690,6 @@ void shouldRestartUnresponsiveNode() throws ExecutionException, InterruptedExcep @Test void shouldReconfigureBrokerIfChangedReconfigurableParameter() throws ExecutionException, InterruptedException, TimeoutException { - // given PlatformClient platformClient = mock(PlatformClient.class); RollClient rollClient = mock(RollClient.class); @@ -687,21 +700,18 @@ void shouldReconfigureBrokerIfChangedReconfigurableParameter() throws ExecutionE .mockCanConnectToNodes(rollClient, true, 0) .mockBrokerState(agentClient, List.of(BrokerState.RUNNING, BrokerState.NOT_RUNNING, BrokerState.STARTING, BrokerState.RECOVERY, BrokerState.RUNNING), 0) .addTopic("topic-A", 0) - .mockDescribeConfigs(rollClient, - Set.of(new ConfigEntry("compression.type", "zstd")), - Set.of(), - 0) - .mockElectLeaders(rollClient, 0) + .mockDescribeConfigsWithUpdatedResult(rollClient, + new Configs(new Config(Set.of(new ConfigEntry("compression.type", "zstd"))), new Config(Set.of())), + new Configs(new Config(Set.of(new ConfigEntry("compression.type", "snappy"))), new Config(Set.of())), + 0) .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "compression.type=snappy", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "compression.type=snappy", null, 0, 0); // then Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRef), any(), any()); - Mockito.verify(platformClient, never()).restartNode(eq(nodeRef), any()); - Mockito.verify(rollClient, times(1)).tryElectAllPreferredLeaders(eq(nodeRef)); - } + Mockito.verify(platformClient, never()).restartNode(eq(nodeRef), any());} @Test void shouldRestartBrokerIfChangedNonReconfigurableParameter() throws ExecutionException, InterruptedException, TimeoutException { @@ -724,7 +734,7 @@ void shouldRestartBrokerIfChangedNonReconfigurableParameter() throws ExecutionEx .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "auto.leader.rebalance.enable=false", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "auto.leader.rebalance.enable=false", null, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRef), any(), any()); @@ -746,19 +756,18 @@ void shouldReconfigureBrokerIfChangedReconfigurableLoggingParameter() throws Exe .mockCanConnectToNodes(rollClient, true, 0) .addTopic("topic-A", 0) .mockTopics(rollClient) - .mockDescribeConfigs(rollClient, - Set.of(), - Set.of(new ConfigEntry("org.apache.kafka", "DEBUG")), 0) - .mockElectLeaders(rollClient, 0) + .mockDescribeConfigsWithUpdatedResult(rollClient, + new Configs(new Config(Set.of()), new Config(Set.of())), + new Configs(new Config(Set.of()), new Config(Set.of(new ConfigEntry("org.apache.kafka", "DEBUG"), new ConfigEntry("root", "WARN")))), + 0) .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "log.retention.ms=1000", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, "log4j.logger.org.apache.kafka=DEBUG", 1, 1); // then Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRef), any(), any()); Mockito.verify(platformClient, never()).restartNode(eq(nodeRef), any()); - Mockito.verify(rollClient, times(1)).tryElectAllPreferredLeaders(eq(nodeRef)); } @Test @@ -780,7 +789,7 @@ void shouldNotRestartBrokersIfHealthyAndNoReason() throws ExecutionException, In .done(); // when - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 3, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 3, 1); // then for (var nodeRef : nodeRefs.values()) { @@ -812,7 +821,7 @@ void shouldRestartBrokersIfReasonManualRolling() throws ExecutionException, Inte // when doRollingRestart(platformClient, rollClient, agentClient, - nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1); + nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1); // then for (var nodeRef : nodeRefs.values()) { @@ -1186,7 +1195,7 @@ public void shouldRollOddSizedQuorumOneControllerBehind() throws ExecutionExcept .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because neither controller 0 nor 1 can be restarted without impacting the quorum health"); // We should be able to restart only the controller that is behind @@ -1221,7 +1230,7 @@ public void shouldNotRollEvenSizedQuorumTwoControllersBehind() { // we should not restart any controllers as the majority have not caught up to the leader assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because none of the controllers can be restarted without impacting the quorum health"); for (var nodeRef : nodeRefs.values()) { @@ -1295,7 +1304,7 @@ public void shouldNotRollControllersWithInvalidTimestamp() { .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because of invalid timestamps for controller 0 and 2"); for (var nodeRef : nodeRefs.values()) { @@ -1323,7 +1332,7 @@ public void shouldNotRollControllersWithInvalidLeader() { .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because of invalid quorum leader"); for (var nodeRef : nodeRefs.values()) { @@ -1385,7 +1394,7 @@ public void shouldRollTwoNodesQuorumOneControllerBehind() throws ExecutionExcept .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because of controller 2 has fallen behind therefore controller 1 cannot be restarted"); //only the controller that has fallen behind should be restarted Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRefs.get(2)), any()); @@ -1516,7 +1525,7 @@ void shouldFailReconciliationIfControllerNodeNeverBecomeReady() { .done(); var ex = assertThrows(TimeoutException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 1, 3)); assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-1/1, currentRoles=NodeRoles[controller=true, broker=false], state=NOT_RUNNING, lastTransition=1970-01-01T00:00:00Z, reason=[POD_STUCK], numRestarts=0, numReconfigs=0, numAttempts=2]", ex.getMessage()); @@ -1541,7 +1550,7 @@ void shouldFailReconciliationIfBrokerNodeNeverBecomeReady() { //If nodes never got into running state, we have to exit the inner loop somehow. var ex = assertThrows(TimeoutException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 1, 3)); assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:10:19Z, reason=[POD_STUCK], numRestarts=2, numReconfigs=0, numAttempts=2]", ex.getMessage());