diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java index e03b170913751..c9f478969a614 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java @@ -18,19 +18,14 @@ */ package org.apache.pulsar.client.impl; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.HashSet; -import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.pulsar.broker.MultiBrokerBaseTest; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -71,16 +66,8 @@ protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws Pu protected void trigReleaseConnection(PulsarClientImpl pulsarClient) throws InterruptedException, NoSuchFieldException, IllegalAccessException { // Wait for every request has been response. - Field field = ConnectionPool.class.getDeclaredField("pool"); - field.setAccessible(true); - ConcurrentHashMap>> pool = - (ConcurrentHashMap>>) field.get(pulsarClient.getCnxPool()); - final List> clientCnxWrapList = - pool.values().stream().flatMap(c -> c.values().stream()).collect(Collectors.toList()); Awaitility.waitAtMost(Duration.ofSeconds(5)).until(() -> { - for (CompletableFuture clientCnxWrapFuture : clientCnxWrapList){ + for (CompletableFuture clientCnxWrapFuture : pulsarClient.getCnxPool().getConnections()){ if (!clientCnxWrapFuture.isDone()){ continue; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 9750911b37c21..850e805067d12 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; +import lombok.Value; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; @@ -64,7 +65,7 @@ public class ConnectionPool implements AutoCloseable { public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60; - protected final ConcurrentHashMap>> pool; + protected final ConcurrentMap> pool; private final Bootstrap bootstrap; private final PulsarChannelInitializer channelInitializerHandler; @@ -87,6 +88,14 @@ public class ConnectionPool implements AutoCloseable { /** Async release useless connections task. **/ private ScheduledFuture asyncReleaseUselessConnectionsTask; + + @Value + private static class Key { + InetSocketAddress logicalAddress; + InetSocketAddress physicalAddress; + int randomKey; + } + public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup)); } @@ -185,7 +194,7 @@ public CompletableFuture getConnection(final InetSocketAddress addres } void closeAllConnections() { - pool.values().forEach(map -> map.values().forEach(future -> { + pool.values().forEach(future -> { if (future.isDone()) { if (!future.isCompletedExceptionally()) { // Connection was already created successfully, the join will not throw any exception @@ -198,10 +207,9 @@ void closeAllConnections() { // succeed future.thenAccept(ClientCnx::close); } - })); + }); } - - /** + /** * Get a connection from the pool. *

* The connection can either be created or be coming from the pool itself. @@ -222,51 +230,44 @@ public CompletableFuture getConnection(InetSocketAddress logicalAddre InetSocketAddress physicalAddress, final int randomKey) { if (maxConnectionsPerHosts == 0) { // Disable pooling - return createConnection(logicalAddress, physicalAddress, -1); + return createConnection(new Key(logicalAddress, physicalAddress, -1)); } - - final ConcurrentMap> innerPool = - pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>()); - CompletableFuture completableFuture = innerPool - .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey)); + Key key = new Key(logicalAddress, physicalAddress, randomKey); + CompletableFuture completableFuture = pool.computeIfAbsent(key, k -> createConnection(key)); if (completableFuture.isCompletedExceptionally()) { // we cannot cache a failed connection, so we remove it from the pool // there is a race condition in which // cleanupConnection is called before caching this result // and so the clean up fails - cleanupConnection(logicalAddress, randomKey, completableFuture); + pool.remove(key, completableFuture); return completableFuture; } return completableFuture.thenCompose(clientCnx -> { // If connection already release, create a new one. if (clientCnx.getIdleState().isReleased()) { - cleanupConnection(logicalAddress, randomKey, completableFuture); - return innerPool - .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey)); + pool.remove(key, completableFuture); + return pool.computeIfAbsent(key, k -> createConnection(key)); } // Try use exists connection. if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) { return CompletableFuture.completedFuture(clientCnx); } else { // If connection already release, create a new one. - cleanupConnection(logicalAddress, randomKey, completableFuture); - return innerPool - .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey)); + pool.remove(key, completableFuture); + return pool.computeIfAbsent(key, k -> createConnection(key)); } }); } - private CompletableFuture createConnection(InetSocketAddress logicalAddress, - InetSocketAddress physicalAddress, int connectionKey) { + private CompletableFuture createConnection(Key key) { if (log.isDebugEnabled()) { - log.debug("Connection for {} not found in cache", logicalAddress); + log.debug("Connection for {} not found in cache", key.logicalAddress); } final CompletableFuture cnxFuture = new CompletableFuture<>(); - // Trigger async connect to broker - createConnection(logicalAddress, physicalAddress).thenAccept(channel -> { + createConnection(key.logicalAddress, key.physicalAddress).thenAccept(channel -> { log.info("[{}] Connected to server", channel); channel.closeFuture().addListener(v -> { @@ -274,7 +275,7 @@ private CompletableFuture createConnection(InetSocketAddress logicalA if (log.isDebugEnabled()) { log.debug("Removing closed connection from pool: {}", v); } - cleanupConnection(logicalAddress, connectionKey, cnxFuture); + pool.remove(key, cnxFuture); }); // We are connected to broker, but need to wait until the connect/connected handshake is @@ -300,14 +301,14 @@ private CompletableFuture createConnection(InetSocketAddress logicalA // CompletableFuture is cached into the "pool" map, // it is not enough to clean it here, we need to clean it // in the "pool" map when the CompletableFuture is cached - cleanupConnection(logicalAddress, connectionKey, cnxFuture); + pool.remove(key, cnxFuture); cnx.ctx().close(); return null; }); }).exceptionally(exception -> { eventLoopGroup.execute(() -> { - log.warn("Failed to open connection to {} : {}", physicalAddress, exception.getMessage()); - cleanupConnection(logicalAddress, connectionKey, cnxFuture); + log.warn("Failed to open connection to {} : {}", key.physicalAddress, exception.getMessage()); + pool.remove(key, cnxFuture); cnxFuture.completeExceptionally(new PulsarClientException(exception)); }); return null; @@ -439,17 +440,9 @@ public void close() throws Exception { } } - private void cleanupConnection(InetSocketAddress address, int connectionKey, - CompletableFuture connectionFuture) { - ConcurrentMap> map = pool.get(address); - if (map != null) { - map.remove(connectionKey, connectionFuture); - } - } - @VisibleForTesting int getPoolSize() { - return pool.values().stream().mapToInt(Map::size).sum(); + return pool.size(); } private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class); @@ -459,11 +452,8 @@ public void doMarkAndReleaseUselessConnections(){ return; } List releaseIdleConnectionTaskList = new ArrayList<>(); - for (Map.Entry>> entry : - pool.entrySet()){ - ConcurrentMap> innerPool = entry.getValue(); - for (Map.Entry> entry0 : innerPool.entrySet()) { - CompletableFuture future = entry0.getValue(); + for (Map.Entry> entry : pool.entrySet()) { + CompletableFuture future = entry.getValue(); // Ensure connection has been connected. if (!future.isDone()) { continue; @@ -481,18 +471,17 @@ public void doMarkAndReleaseUselessConnections(){ if (clientCnx.getIdleState().isReleasing()) { releaseIdleConnectionTaskList.add(() -> { if (clientCnx.getIdleState().tryMarkReleasedAndCloseConnection()) { - cleanupConnection(entry.getKey(), entry0.getKey(), future); + pool.remove(entry.getKey(), future); } }); } } - } // Do release idle connections. releaseIdleConnectionTaskList.forEach(Runnable::run); } public Set> getConnections() { return Collections.unmodifiableSet( - pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet())); + pool.values().stream().collect(Collectors.toSet())); } }