Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[improve][client] add physicalAddress as part of connection pool key (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Mar 5, 2024
1 parent 207335a commit e2f94dc
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,14 @@
*/
package org.apache.pulsar.client.impl;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -71,16 +66,8 @@ protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws Pu
protected void trigReleaseConnection(PulsarClientImpl pulsarClient)
throws InterruptedException, NoSuchFieldException, IllegalAccessException {
// Wait for every request has been response.
Field field = ConnectionPool.class.getDeclaredField("pool");
field.setAccessible(true);
ConcurrentHashMap<InetSocketAddress,ConcurrentMap<Integer,
CompletableFuture<ClientCnx>>> pool =
(ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer,
CompletableFuture<ClientCnx>>>) field.get(pulsarClient.getCnxPool());
final List<CompletableFuture<ClientCnx>> clientCnxWrapList =
pool.values().stream().flatMap(c -> c.values().stream()).collect(Collectors.toList());
Awaitility.waitAtMost(Duration.ofSeconds(5)).until(() -> {
for (CompletableFuture<ClientCnx> clientCnxWrapFuture : clientCnxWrapList){
for (CompletableFuture<ClientCnx> clientCnxWrapFuture : pulsarClient.getCnxPool().getConnections()){
if (!clientCnxWrapFuture.isDone()){
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
Expand All @@ -64,7 +65,7 @@ public class ConnectionPool implements AutoCloseable {

public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60;

protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
protected final ConcurrentMap<Key, CompletableFuture<ClientCnx>> pool;

private final Bootstrap bootstrap;
private final PulsarChannelInitializer channelInitializerHandler;
Expand All @@ -87,6 +88,14 @@ public class ConnectionPool implements AutoCloseable {
/** Async release useless connections task. **/
private ScheduledFuture asyncReleaseUselessConnectionsTask;


@Value
private static class Key {
InetSocketAddress logicalAddress;
InetSocketAddress physicalAddress;
int randomKey;
}

public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
}
Expand Down Expand Up @@ -185,7 +194,7 @@ public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress addres
}

void closeAllConnections() {
pool.values().forEach(map -> map.values().forEach(future -> {
pool.values().forEach(future -> {
if (future.isDone()) {
if (!future.isCompletedExceptionally()) {
// Connection was already created successfully, the join will not throw any exception
Expand All @@ -198,10 +207,9 @@ void closeAllConnections() {
// succeed
future.thenAccept(ClientCnx::close);
}
}));
});
}

/**
/**
* Get a connection from the pool.
* <p>
* The connection can either be created or be coming from the pool itself.
Expand All @@ -222,59 +230,52 @@ public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddre
InetSocketAddress physicalAddress, final int randomKey) {
if (maxConnectionsPerHosts == 0) {
// Disable pooling
return createConnection(logicalAddress, physicalAddress, -1);
return createConnection(new Key(logicalAddress, physicalAddress, -1));
}

final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
CompletableFuture<ClientCnx> completableFuture = innerPool
.computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
Key key = new Key(logicalAddress, physicalAddress, randomKey);
CompletableFuture<ClientCnx> completableFuture = pool.computeIfAbsent(key, k -> createConnection(key));
if (completableFuture.isCompletedExceptionally()) {
// we cannot cache a failed connection, so we remove it from the pool
// there is a race condition in which
// cleanupConnection is called before caching this result
// and so the clean up fails
cleanupConnection(logicalAddress, randomKey, completableFuture);
pool.remove(key, completableFuture);
return completableFuture;
}

return completableFuture.thenCompose(clientCnx -> {
// If connection already release, create a new one.
if (clientCnx.getIdleState().isReleased()) {
cleanupConnection(logicalAddress, randomKey, completableFuture);
return innerPool
.computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
pool.remove(key, completableFuture);
return pool.computeIfAbsent(key, k -> createConnection(key));
}
// Try use exists connection.
if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) {
return CompletableFuture.completedFuture(clientCnx);
} else {
// If connection already release, create a new one.
cleanupConnection(logicalAddress, randomKey, completableFuture);
return innerPool
.computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
pool.remove(key, completableFuture);
return pool.computeIfAbsent(key, k -> createConnection(key));
}
});
}

private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, int connectionKey) {
private CompletableFuture<ClientCnx> createConnection(Key key) {
if (log.isDebugEnabled()) {
log.debug("Connection for {} not found in cache", logicalAddress);
log.debug("Connection for {} not found in cache", key.logicalAddress);
}

final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>();

// Trigger async connect to broker
createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
createConnection(key.logicalAddress, key.physicalAddress).thenAccept(channel -> {
log.info("[{}] Connected to server", channel);

channel.closeFuture().addListener(v -> {
// Remove connection from pool when it gets closed
if (log.isDebugEnabled()) {
log.debug("Removing closed connection from pool: {}", v);
}
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
pool.remove(key, cnxFuture);
});

// We are connected to broker, but need to wait until the connect/connected handshake is
Expand All @@ -300,14 +301,14 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
// CompletableFuture is cached into the "pool" map,
// it is not enough to clean it here, we need to clean it
// in the "pool" map when the CompletableFuture is cached
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
pool.remove(key, cnxFuture);
cnx.ctx().close();
return null;
});
}).exceptionally(exception -> {
eventLoopGroup.execute(() -> {
log.warn("Failed to open connection to {} : {}", physicalAddress, exception.getMessage());
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
log.warn("Failed to open connection to {} : {}", key.physicalAddress, exception.getMessage());
pool.remove(key, cnxFuture);
cnxFuture.completeExceptionally(new PulsarClientException(exception));
});
return null;
Expand Down Expand Up @@ -439,17 +440,9 @@ public void close() throws Exception {
}
}

private void cleanupConnection(InetSocketAddress address, int connectionKey,
CompletableFuture<ClientCnx> connectionFuture) {
ConcurrentMap<Integer, CompletableFuture<ClientCnx>> map = pool.get(address);
if (map != null) {
map.remove(connectionKey, connectionFuture);
}
}

@VisibleForTesting
int getPoolSize() {
return pool.values().stream().mapToInt(Map::size).sum();
return pool.size();
}

private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
Expand All @@ -459,11 +452,8 @@ public void doMarkAndReleaseUselessConnections(){
return;
}
List<Runnable> releaseIdleConnectionTaskList = new ArrayList<>();
for (Map.Entry<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> entry :
pool.entrySet()){
ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool = entry.getValue();
for (Map.Entry<Integer, CompletableFuture<ClientCnx>> entry0 : innerPool.entrySet()) {
CompletableFuture<ClientCnx> future = entry0.getValue();
for (Map.Entry<Key, CompletableFuture<ClientCnx>> entry : pool.entrySet()) {
CompletableFuture<ClientCnx> future = entry.getValue();
// Ensure connection has been connected.
if (!future.isDone()) {
continue;
Expand All @@ -481,18 +471,17 @@ public void doMarkAndReleaseUselessConnections(){
if (clientCnx.getIdleState().isReleasing()) {
releaseIdleConnectionTaskList.add(() -> {
if (clientCnx.getIdleState().tryMarkReleasedAndCloseConnection()) {
cleanupConnection(entry.getKey(), entry0.getKey(), future);
pool.remove(entry.getKey(), future);
}
});
}
}
}
// Do release idle connections.
releaseIdleConnectionTaskList.forEach(Runnable::run);
}

public Set<CompletableFuture<ClientCnx>> getConnections() {
return Collections.unmodifiableSet(
pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
pool.values().stream().collect(Collectors.toSet()));
}
}

0 comments on commit e2f94dc

Please sign in to comment.