Skip to content

Commit

Permalink
Kafka 2.4 KIP-455 - Update TopicFactory Interface and DefaultTopicFac…
Browse files Browse the repository at this point in the history
…tory Implementation to use AdminClient

Kafka 2.4 KIP-455 - Update TopicFactory Interface and DefaultTopicFactory Implementation to use AdminClient
Signed-off-by: Andrew Choi <li_andchoi@microsoft.com>
  • Loading branch information
Andrew Choi authored Oct 26, 2020
1 parent f9194e8 commit 0693dc4
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private void createDeleteClusterTopic() {
try {
int brokerCount = _adminClient.describeCluster().nodes().get().size();

Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect);
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient);
Set<BrokerMetadata> brokers = new HashSet<>();
for (Node broker : _adminClient.describeCluster().nodes().get()) {
BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ void maybeAddPartitions(int minPartitionNum) throws ExecutionException, Interrup
if (partitionNum < minPartitionNum) {
LOGGER.info("{} will increase partition of the topic {} in the cluster from {}" + " to {}.",
this.getClass().toString(), _topic, partitionNum, minPartitionNum);
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect);
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient);
Set<BrokerMetadata> brokers = new HashSet<>();
for (Node broker : _adminClient.describeCluster().nodes().get()) {
BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null);
Expand Down Expand Up @@ -409,7 +409,7 @@ int numPartitions() throws InterruptedException, ExecutionException {

private Set<Node> getAvailableBrokers() throws ExecutionException, InterruptedException {
Set<Node> brokers = new HashSet<>(_adminClient.describeCluster().nodes().get());
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect);
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient);
brokers.removeIf(broker -> blackListedBrokers.contains(broker.id()));
return brokers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public int createTopicIfNotExist(String topic, short replicationFactor, double p
}

@Override
public Set<Integer> getBlackListedBrokers(String zkUrl) {
public Set<Integer> getBlackListedBrokers(AdminClient adminClient) {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ int createTopicIfNotExist(String topic, short replicationFactor, double partitio
throws ExecutionException, InterruptedException;

/**
* @param zkUrl zookeeper connection url
* @param adminClient AdminClient object
* @return A set of brokers that don't take new partitions or reassigned partitions for topics.
*/
Set<Integer> getBlackListedBrokers(String zkUrl);
Set<Integer> getBlackListedBrokers(AdminClient adminClient);

}

0 comments on commit 0693dc4

Please sign in to comment.