Skip to content

Commit

Permalink
KAFKA-16560 Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig (a…
Browse files Browse the repository at this point in the history
…pache#15761)

* Make ClusterConfig immutable
* Make BrokerNode immutable
* Refactor out build argument in ControllerNode
* Add setPrefix and replace put property with set map in ClusterConfig
* Remove rollingBrokerRestart from ClusterInstance interface
* Refactor KRaftClusterTest#doOnStartedKafkaCluster

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
brandboat authored Apr 27, 2024
1 parent 4060d43 commit 5de5d96
Show file tree
Hide file tree
Showing 20 changed files with 873 additions and 465 deletions.
209 changes: 151 additions & 58 deletions core/src/test/java/kafka/test/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import org.apache.kafka.server.common.MetadataVersion;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

/**
* Represents a requested configuration of a Kafka cluster for integration testing
* Represents an immutable requested configuration of a Kafka cluster for integration testing.
*/
public class ClusterConfig {

Expand All @@ -44,26 +46,36 @@ public class ClusterConfig {
private final File trustStoreFile;
private final MetadataVersion metadataVersion;

private final Properties serverProperties = new Properties();
private final Properties producerProperties = new Properties();
private final Properties consumerProperties = new Properties();
private final Properties adminClientProperties = new Properties();
private final Properties saslServerProperties = new Properties();
private final Properties saslClientProperties = new Properties();
private final Map<Integer, Properties> perBrokerOverrideProperties = new HashMap<>();
private final Map<String, String> serverProperties;
private final Map<String, String> producerProperties;
private final Map<String, String> consumerProperties;
private final Map<String, String> adminClientProperties;
private final Map<String, String> saslServerProperties;
private final Map<String, String> saslClientProperties;
private final Map<Integer, Map<String, String>> perBrokerOverrideProperties;

ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart,
@SuppressWarnings("checkstyle:ParameterNumber")
private ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
MetadataVersion metadataVersion) {
this.type = type;
MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perBrokerOverrideProperties) {
this.type = Objects.requireNonNull(type);
this.brokers = brokers;
this.controllers = controllers;
this.name = name;
this.autoStart = autoStart;
this.securityProtocol = securityProtocol;
this.securityProtocol = Objects.requireNonNull(securityProtocol);
this.listenerName = listenerName;
this.trustStoreFile = trustStoreFile;
this.metadataVersion = metadataVersion;
this.metadataVersion = Objects.requireNonNull(metadataVersion);
this.serverProperties = Objects.requireNonNull(serverProperties);
this.producerProperties = Objects.requireNonNull(producerProperties);
this.consumerProperties = Objects.requireNonNull(consumerProperties);
this.adminClientProperties = Objects.requireNonNull(adminClientProperties);
this.saslServerProperties = Objects.requireNonNull(saslServerProperties);
this.saslClientProperties = Objects.requireNonNull(saslClientProperties);
this.perBrokerOverrideProperties = Objects.requireNonNull(perBrokerOverrideProperties);
}

public Type clusterType() {
Expand All @@ -86,27 +98,27 @@ public boolean isAutoStart() {
return autoStart;
}

public Properties serverProperties() {
public Map<String, String> serverProperties() {
return serverProperties;
}

public Properties producerProperties() {
public Map<String, String> producerProperties() {
return producerProperties;
}

public Properties consumerProperties() {
public Map<String, String> consumerProperties() {
return consumerProperties;
}

public Properties adminClientProperties() {
public Map<String, String> adminClientProperties() {
return adminClientProperties;
}

public Properties saslServerProperties() {
public Map<String, String> saslServerProperties() {
return saslServerProperties;
}

public Properties saslClientProperties() {
public Map<String, String> saslClientProperties() {
return saslClientProperties;
}

Expand All @@ -126,8 +138,8 @@ public MetadataVersion metadataVersion() {
return metadataVersion;
}

public Properties brokerServerProperties(int brokerId) {
return perBrokerOverrideProperties.computeIfAbsent(brokerId, __ -> new Properties());
public Map<Integer, Map<String, String>> perBrokerOverrideProperties() {
return perBrokerOverrideProperties;
}

public Map<String, String> nameTags() {
Expand All @@ -139,28 +151,69 @@ public Map<String, String> nameTags() {
return tags;
}

public ClusterConfig copyOf() {
ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion);
copy.serverProperties.putAll(serverProperties);
copy.producerProperties.putAll(producerProperties);
copy.consumerProperties.putAll(consumerProperties);
copy.saslServerProperties.putAll(saslServerProperties);
copy.saslClientProperties.putAll(saslClientProperties);
perBrokerOverrideProperties.forEach((brokerId, props) -> {
Properties propsCopy = new Properties();
propsCopy.putAll(props);
copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
});
return copy;
@SuppressWarnings({"CyclomaticComplexity"})
@Override
public boolean equals(Object object) {
if (this == object) return true;
if (object == null || getClass() != object.getClass()) return false;
ClusterConfig clusterConfig = (ClusterConfig) object;
return Objects.equals(type, clusterConfig.type)
&& Objects.equals(brokers, clusterConfig.brokers)
&& Objects.equals(controllers, clusterConfig.controllers)
&& Objects.equals(name, clusterConfig.name)
&& Objects.equals(autoStart, clusterConfig.autoStart)
&& Objects.equals(securityProtocol, clusterConfig.securityProtocol)
&& Objects.equals(listenerName, clusterConfig.listenerName)
&& Objects.equals(trustStoreFile, clusterConfig.trustStoreFile)
&& Objects.equals(metadataVersion, clusterConfig.metadataVersion)
&& Objects.equals(serverProperties, clusterConfig.serverProperties)
&& Objects.equals(producerProperties, clusterConfig.producerProperties)
&& Objects.equals(consumerProperties, clusterConfig.consumerProperties)
&& Objects.equals(adminClientProperties, clusterConfig.adminClientProperties)
&& Objects.equals(saslServerProperties, clusterConfig.saslServerProperties)
&& Objects.equals(saslClientProperties, clusterConfig.saslClientProperties)
&& Objects.equals(perBrokerOverrideProperties, clusterConfig.perBrokerOverrideProperties);
}

public static Builder defaultClusterBuilder() {
return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting());
@Override
public int hashCode() {
return Objects.hash(type, brokers, controllers, name, autoStart, securityProtocol, listenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties, perBrokerOverrideProperties);
}

public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart,
SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion);
public static Builder defaultBuilder() {
return new Builder()
.setType(Type.ZK)
.setBrokers(1)
.setControllers(1)
.setAutoStart(true)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
.setMetadataVersion(MetadataVersion.latestTesting());
}

public static Builder builder() {
return new Builder();
}

public static Builder builder(ClusterConfig clusterConfig) {
return new Builder()
.setType(clusterConfig.type)
.setBrokers(clusterConfig.brokers)
.setControllers(clusterConfig.controllers)
.setName(clusterConfig.name)
.setAutoStart(clusterConfig.autoStart)
.setSecurityProtocol(clusterConfig.securityProtocol)
.setListenerName(clusterConfig.listenerName)
.setTrustStoreFile(clusterConfig.trustStoreFile)
.setMetadataVersion(clusterConfig.metadataVersion)
.setServerProperties(clusterConfig.serverProperties)
.setProducerProperties(clusterConfig.producerProperties)
.setConsumerProperties(clusterConfig.consumerProperties)
.setAdminClientProperties(clusterConfig.adminClientProperties)
.setSaslServerProperties(clusterConfig.saslServerProperties)
.setSaslClientProperties(clusterConfig.saslClientProperties)
.setPerBrokerProperties(clusterConfig.perBrokerOverrideProperties);
}

public static class Builder {
Expand All @@ -173,63 +226,103 @@ public static class Builder {
private String listenerName;
private File trustStoreFile;
private MetadataVersion metadataVersion;
private Map<String, String> serverProperties = Collections.emptyMap();
private Map<String, String> producerProperties = Collections.emptyMap();
private Map<String, String> consumerProperties = Collections.emptyMap();
private Map<String, String> adminClientProperties = Collections.emptyMap();
private Map<String, String> saslServerProperties = Collections.emptyMap();
private Map<String, String> saslClientProperties = Collections.emptyMap();
private Map<Integer, Map<String, String>> perBrokerOverrideProperties = Collections.emptyMap();

Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
this.type = type;
this.brokers = brokers;
this.controllers = controllers;
this.autoStart = autoStart;
this.securityProtocol = securityProtocol;
this.metadataVersion = metadataVersion;
}
private Builder() {}

public Builder type(Type type) {
public Builder setType(Type type) {
this.type = type;
return this;
}

public Builder brokers(int brokers) {
public Builder setBrokers(int brokers) {
this.brokers = brokers;
return this;
}

public Builder controllers(int controllers) {
public Builder setControllers(int controllers) {
this.controllers = controllers;
return this;
}

public Builder name(String name) {
public Builder setName(String name) {
this.name = name;
return this;
}

public Builder autoStart(boolean autoStart) {
public Builder setAutoStart(boolean autoStart) {
this.autoStart = autoStart;
return this;
}

public Builder securityProtocol(SecurityProtocol securityProtocol) {
public Builder setSecurityProtocol(SecurityProtocol securityProtocol) {
this.securityProtocol = securityProtocol;
return this;
}

public Builder listenerName(String listenerName) {
public Builder setListenerName(String listenerName) {
this.listenerName = listenerName;
return this;
}

public Builder trustStoreFile(File trustStoreFile) {
public Builder setTrustStoreFile(File trustStoreFile) {
this.trustStoreFile = trustStoreFile;
return this;
}

public Builder metadataVersion(MetadataVersion metadataVersion) {
public Builder setMetadataVersion(MetadataVersion metadataVersion) {
this.metadataVersion = metadataVersion;
return this;
}

public Builder setServerProperties(Map<String, String> serverProperties) {
this.serverProperties = Collections.unmodifiableMap(new HashMap<>(serverProperties));
return this;
}

public Builder setConsumerProperties(Map<String, String> consumerProperties) {
this.consumerProperties = Collections.unmodifiableMap(new HashMap<>(consumerProperties));
return this;
}

public Builder setProducerProperties(Map<String, String> producerProperties) {
this.producerProperties = Collections.unmodifiableMap(new HashMap<>(producerProperties));
return this;
}

public Builder setAdminClientProperties(Map<String, String> adminClientProperties) {
this.adminClientProperties = Collections.unmodifiableMap(new HashMap<>(adminClientProperties));
return this;
}

public Builder setSaslServerProperties(Map<String, String> saslServerProperties) {
this.saslServerProperties = Collections.unmodifiableMap(new HashMap<>(saslServerProperties));
return this;
}

public Builder setSaslClientProperties(Map<String, String> saslClientProperties) {
this.saslClientProperties = Collections.unmodifiableMap(new HashMap<>(saslClientProperties));
return this;
}

public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerOverrideProperties) {
this.perBrokerOverrideProperties = Collections.unmodifiableMap(
perBrokerOverrideProperties.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
return this;
}

public ClusterConfig build() {
return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion);
return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties,
perBrokerOverrideProperties);
}
}
}
75 changes: 75 additions & 0 deletions core/src/test/java/kafka/test/ClusterConfigTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.test;

import kafka.test.annotation.Type;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

public class ClusterConfigTest {

private static Map<String, Object> fields(ClusterConfig config) {
return Arrays.stream(config.getClass().getDeclaredFields()).collect(Collectors.toMap(Field::getName, f -> {
f.setAccessible(true);
return Assertions.assertDoesNotThrow(() -> f.get(config));
}));
}

@Test
public void testCopy() throws IOException {
File trustStoreFile = TestUtils.tempFile();

ClusterConfig clusterConfig = ClusterConfig.builder()
.setType(Type.KRAFT)
.setBrokers(3)
.setControllers(2)
.setName("builder-test")
.setAutoStart(true)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
.setListenerName("EXTERNAL")
.setTrustStoreFile(trustStoreFile)
.setMetadataVersion(MetadataVersion.IBP_0_8_0)
.setServerProperties(Collections.singletonMap("broker", "broker_value"))
.setConsumerProperties(Collections.singletonMap("consumer", "consumer_value"))
.setProducerProperties(Collections.singletonMap("producer", "producer_value"))
.setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value"))
.setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value"))
.setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value"))
.setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value")))
.build();

ClusterConfig copy = ClusterConfig.builder(clusterConfig).build();
Assertions.assertEquals(clusterConfig, copy);
Assertions.assertEquals(clusterConfig.hashCode(), copy.hashCode());

Map<String, Object> clusterConfigFields = fields(clusterConfig);
Map<String, Object> copyFields = fields(clusterConfig);
Assertions.assertEquals(clusterConfigFields, copyFields);
}
}
Loading

0 comments on commit 5de5d96

Please sign in to comment.