Skip to content

Commit

Permalink
chore: Do not shutdown externally provided executor (#2057)
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Ivanov <ivanivanov.ii726@gmail.com>
  • Loading branch information
0xivanov authored Nov 6, 2024
1 parent 3c1912b commit 0144164
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 84 deletions.
100 changes: 44 additions & 56 deletions sdk/src/main/java/com/hedera/hashgraph/sdk/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public final class Client implements AutoCloseable {
private volatile Duration minBackoff = DEFAULT_MIN_BACKOFF;
private boolean autoValidateChecksums = false;
private boolean defaultRegenerateTransactionId = true;
private final boolean shouldShutdownExecutor;
// If networkUpdatePeriod is null, any network updates in progress will not complete
@Nullable
private Duration networkUpdatePeriod;
Expand All @@ -103,21 +104,23 @@ public final class Client implements AutoCloseable {
/**
* Constructor.
*
* @param executor the executor
* @param network the network
* @param mirrorNetwork the mirror network
* @param executor the executor
* @param network the network
* @param mirrorNetwork the mirror network
* @param shouldShutdownExecutor
*/
@VisibleForTesting
Client(
ExecutorService executor,
Network network,
MirrorNetwork mirrorNetwork,
@Nullable Duration networkUpdateInitialDelay,
@Nullable Duration networkUpdateInitialDelay, boolean shouldShutdownExecutor,
@Nullable Duration networkUpdatePeriod
) {
this.executor = executor;
this.network = network;
this.mirrorNetwork = mirrorNetwork;
this.shouldShutdownExecutor = shouldShutdownExecutor;
this.networkUpdatePeriod = networkUpdatePeriod;
scheduleNetworkUpdate(networkUpdateInitialDelay);
}
Expand Down Expand Up @@ -149,15 +152,14 @@ static ExecutorService createExecutor() {
* chose nodes to send transactions to. For one transaction, at most 1/3 of the nodes will be tried.
*
* @param networkMap the map of node IDs to node addresses that make up the network.
* @param executor runs the grpc requests asynchronously. Note that calling `close()` method on one of the
* clients will close the executor for all the other clients sharing this executor
* @param executor runs the grpc requests asynchronously.
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forNetwork(Map<String, AccountId> networkMap, ExecutorService executor) {
var network = Network.forNetwork(executor, networkMap);
var mirrorNetwork = MirrorNetwork.forNetwork(executor, new ArrayList<>());

return new Client(executor, network, mirrorNetwork, null, null);
return new Client(executor, network, mirrorNetwork, null, false, null);
}


Expand All @@ -175,7 +177,10 @@ public static Client forNetwork(Map<String, AccountId> networkMap, ExecutorServi
*/
public static Client forNetwork(Map<String, AccountId> networkMap) {
var executor = createExecutor();
return forNetwork(networkMap, executor);
var network = Network.forNetwork(executor, networkMap);
var mirrorNetwork = MirrorNetwork.forNetwork(executor, new ArrayList<>());

return new Client(executor, network, mirrorNetwork, null, true, null);
}

/**
Expand All @@ -197,49 +202,46 @@ public static Client forName(String name) {
* Construct a Hedera client pre-configured for <a
* href="https://docs.hedera.com/guides/mainnet/address-book#mainnet-address-book">Mainnet access</a>.
*
* @param executor runs the grpc requests asynchronously. Note that calling `close()` method on one of the
* clients will close the executor for all the other clients sharing this executor
* @param executor runs the grpc requests asynchronously.
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forMainnet(ExecutorService executor) {
var network = Network.forMainnet(executor);
var mirrorNetwork = MirrorNetwork.forMainnet(executor);

return new Client(executor, network, mirrorNetwork, NETWORK_UPDATE_INITIAL_DELAY,
DEFAULT_NETWORK_UPDATE_PERIOD);
false, DEFAULT_NETWORK_UPDATE_PERIOD);
}

/**
* Construct a Hedera client pre-configured for <a href="https://docs.hedera.com/guides/testnet/nodes">Testnet
* access</a>.
*
* @param executor runs the grpc requests asynchronously. Note that calling `close()` method on one of the
* clients will close the executor for all the other clients sharing this executor
* @param executor runs the grpc requests asynchronously.
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forTestnet(ExecutorService executor) {
var network = Network.forTestnet(executor);
var mirrorNetwork = MirrorNetwork.forTestnet(executor);

return new Client(executor, network, mirrorNetwork, NETWORK_UPDATE_INITIAL_DELAY,
DEFAULT_NETWORK_UPDATE_PERIOD);
false, DEFAULT_NETWORK_UPDATE_PERIOD);
}

/**
* Construct a Hedera client pre-configured for <a
* href="https://docs.hedera.com/guides/testnet/testnet-nodes#previewnet-node-public-keys">Preview Testnet
* nodes</a>.
*
* @param executor runs the grpc requests asynchronously. Note that calling `close()` method on one of the
* clients will close the executor for all the other clients sharing this executor
* @param executor runs the grpc requests asynchronously.
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forPreviewnet(ExecutorService executor) {
var network = Network.forPreviewnet(executor);
var mirrorNetwork = MirrorNetwork.forPreviewnet(executor);

return new Client(executor, network, mirrorNetwork, NETWORK_UPDATE_INITIAL_DELAY,
DEFAULT_NETWORK_UPDATE_PERIOD);
false, DEFAULT_NETWORK_UPDATE_PERIOD);
}


Expand All @@ -251,7 +253,11 @@ public static Client forPreviewnet(ExecutorService executor) {
*/
public static Client forMainnet() {
var executor = createExecutor();
return forMainnet(executor);
var network = Network.forMainnet(executor);
var mirrorNetwork = MirrorNetwork.forMainnet(executor);

return new Client(executor, network, mirrorNetwork, NETWORK_UPDATE_INITIAL_DELAY,
true, DEFAULT_NETWORK_UPDATE_PERIOD);
}

/**
Expand All @@ -262,7 +268,11 @@ public static Client forMainnet() {
*/
public static Client forTestnet() {
var executor = createExecutor();
return forTestnet(executor);
var network = Network.forTestnet(executor);
var mirrorNetwork = MirrorNetwork.forTestnet(executor);

return new Client(executor, network, mirrorNetwork, NETWORK_UPDATE_INITIAL_DELAY,
true, DEFAULT_NETWORK_UPDATE_PERIOD);
}

/**
Expand All @@ -274,7 +284,11 @@ public static Client forTestnet() {
*/
public static Client forPreviewnet() {
var executor = createExecutor();
return forPreviewnet(executor);
var network = Network.forPreviewnet(executor);
var mirrorNetwork = MirrorNetwork.forPreviewnet(executor);

return new Client(executor, network, mirrorNetwork, NETWORK_UPDATE_INITIAL_DELAY,
true, DEFAULT_NETWORK_UPDATE_PERIOD);
}

/**
Expand Down Expand Up @@ -1353,34 +1367,6 @@ public synchronized void close() throws TimeoutException {
close(closeTimeout);
}

/**
* Initiates an orderly shutdown of all channels (to the Hedera network),
* without closing the ExecutorService {@link #executor}
*
* @throws TimeoutException if the network doesn't close in time
*/
public synchronized void closeChannels() throws TimeoutException {
var closeDeadline = Instant.now().plus(closeTimeout);

networkUpdatePeriod = null;
cancelScheduledNetworkUpdate();
cancelAllSubscriptions();

network.beginClose();
mirrorNetwork.beginClose();

var networkError = network.awaitClose(closeDeadline, null);
var mirrorNetworkError = mirrorNetwork.awaitClose(closeDeadline, networkError);

if (mirrorNetworkError != null) {
if (mirrorNetworkError instanceof TimeoutException ex) {
throw ex;
} else {
throw new RuntimeException(mirrorNetworkError);
}
}
}

/**
* Initiates an orderly shutdown of all channels (to the Hedera network) in which preexisting transactions or
* queries continue but more would be immediately cancelled.
Expand All @@ -1405,17 +1391,19 @@ public synchronized void close(Duration timeout) throws TimeoutException {
var mirrorNetworkError = mirrorNetwork.awaitClose(closeDeadline, networkError);

// https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
try {
executor.shutdown();
if (!executor.awaitTermination(timeout.getSeconds() / 2, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (shouldShutdownExecutor) {
try {
executor.shutdown();
if (!executor.awaitTermination(timeout.getSeconds() / 2, TimeUnit.SECONDS)) {
logger.warn("Pool did not terminate");
executor.shutdownNow();
if (!executor.awaitTermination(timeout.getSeconds() / 2, TimeUnit.SECONDS)) {
logger.warn("Pool did not terminate");
}
}
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}

if (mirrorNetworkError != null) {
Expand Down
55 changes: 39 additions & 16 deletions sdk/src/test/java/com/hedera/hashgraph/sdk/ClientCloseTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
package com.hedera.hashgraph.sdk;

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -18,14 +10,45 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;

public class ClientCloseTest {

@Test
void doesNotCloseExternalExecutor() throws TimeoutException {
var executor = Client.createExecutor();
var network = new HashMap<String, AccountId>();

var client = Client.forNetwork(network, executor);
client.close();
assertThat(executor.isShutdown()).isFalse();

client = Client.forMainnet(executor);
client.close();
assertThat(executor.isShutdown()).isFalse();

client = Client.forTestnet(executor);
client.close();
assertThat(executor.isShutdown()).isFalse();

client = Client.forPreviewnet(executor);
client.close();
assertThat(executor.isShutdown()).isFalse();
}

@Test
void closeHandlesNetworkTimeout() {
var executor = Client.createExecutor();
var network = mock(Network.class);
when(network.awaitClose(any(), any())).thenReturn(new TimeoutException("network timeout"));
var mirrorNetwork = MirrorNetwork.forNetwork(executor, Collections.emptyList());
var client = new Client(executor, network, mirrorNetwork, null, null);
var client = new Client(executor, network, mirrorNetwork, null, true, null);

assertThatExceptionOfType(TimeoutException.class).isThrownBy(client::close).withMessage("network timeout");
assertThat(mirrorNetwork.hasShutDownNow).isTrue();
Expand All @@ -38,7 +61,7 @@ void closeHandlesNetworkInterrupted() {
var network = mock(Network.class);
when(network.awaitClose(any(), any())).thenReturn(interruptedException);
var mirrorNetwork = MirrorNetwork.forNetwork(executor, Collections.emptyList());
var client = new Client(executor, network, mirrorNetwork, null, null);
var client = new Client(executor, network, mirrorNetwork, null, true, null);

assertThatExceptionOfType(RuntimeException.class).isThrownBy(client::close).withCause(interruptedException);
assertThat(mirrorNetwork.hasShutDownNow).isTrue();
Expand All @@ -50,7 +73,7 @@ void closeHandlesMirrorNetworkTimeout() {
var network = Network.forNetwork(executor, Collections.emptyMap());
var mirrorNetwork = mock(MirrorNetwork.class);
when(mirrorNetwork.awaitClose(any(), any())).thenReturn(new TimeoutException("mirror timeout"));
var client = new Client(executor, network, mirrorNetwork, null, null);
var client = new Client(executor, network, mirrorNetwork, null, true, null);

assertThatExceptionOfType(TimeoutException.class).isThrownBy(client::close).withMessage("mirror timeout");
assertThat(network.hasShutDownNow).isFalse();
Expand All @@ -63,7 +86,7 @@ void closeHandlesMirrorNetworkInterrupted() {
var network = Network.forNetwork(executor, Collections.emptyMap());
var mirrorNetwork = mock(MirrorNetwork.class);
when(mirrorNetwork.awaitClose(any(), any())).thenReturn(interruptedException);
var client = new Client(executor, network, mirrorNetwork, null, null);
var client = new Client(executor, network, mirrorNetwork, null, true, null);

assertThatExceptionOfType(RuntimeException.class).isThrownBy(client::close).withCause(interruptedException);
assertThat(network.hasShutDownNow).isFalse();
Expand All @@ -74,7 +97,7 @@ void closeHandlesExecutorShutdown() throws TimeoutException {
var executor = Client.createExecutor();
var network = Network.forNetwork(executor, Collections.emptyMap());
var mirrorNetwork = MirrorNetwork.forNetwork(executor, Collections.emptyList());
var client = new Client(executor, network, mirrorNetwork, null, null);
var client = new Client(executor, network, mirrorNetwork, null, true, null);

client.close();
assertThat(executor.isShutdown()).isTrue();
Expand All @@ -86,7 +109,7 @@ void closeHandlesExecutorTerminatingInTime() throws InterruptedException, Timeou
var executor = mock(ThreadPoolExecutor.class);
var network = Network.forNetwork(executor, Collections.emptyMap());
var mirrorNetwork = MirrorNetwork.forNetwork(executor, Collections.emptyList());
var client = new Client(executor, network, mirrorNetwork, null, null);
var client = new Client(executor, network, mirrorNetwork, null, true, null);

doReturn(true).when(executor).awaitTermination(30 / 2, TimeUnit.SECONDS);

Expand All @@ -100,7 +123,7 @@ void closeHandlesExecutorNotTerminatingInTime() throws InterruptedException, Tim
var executor = mock(ThreadPoolExecutor.class);
var network = Network.forNetwork(executor, Collections.emptyMap());
var mirrorNetwork = MirrorNetwork.forNetwork(executor, Collections.emptyList());
var client = new Client(executor, network, mirrorNetwork, null, null);
var client = new Client(executor, network, mirrorNetwork, null, true, null);

doReturn(false).when(executor).awaitTermination(30 / 2, TimeUnit.SECONDS);

Expand All @@ -114,7 +137,7 @@ void closeHandlesExecutorWhenThreadIsInterrupted() throws InterruptedException,
var executor = mock(ThreadPoolExecutor.class);
var network = Network.forNetwork(executor, Collections.emptyMap());
var mirrorNetwork = MirrorNetwork.forNetwork(executor, Collections.emptyList());
var client = new Client(executor, network, mirrorNetwork, null, null);
var client = new Client(executor, network, mirrorNetwork, null, true, null);

doThrow(new InterruptedException()).when(executor).awaitTermination(30 / 2, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,17 @@ void loadTest() throws Exception {
for (int i = 0; i < nThreads; i++) {
int finalI = i;
threadPoolExecutor.submit(() -> {
var client = Client.forNetwork(testEnv.client.getNetwork(), clientExecutor);
client.setOperator(operatorId, operatorPrivateKey);
client.setMaxAttempts(10);
try {
try (var client = Client.forNetwork(testEnv.client.getNetwork(), clientExecutor);
) {
client.setOperator(operatorId, operatorPrivateKey);
client.setMaxAttempts(10);
new AccountCreateTransaction()
.setKey(PrivateKey.generateED25519())
.execute(client)
.getReceipt(client);
System.out.println(finalI);
} catch (Exception e) {
fail("AccountCreateTransaction failed, " + e);
} finally {
try {
client.closeChannels();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
});
}
Expand All @@ -90,7 +84,6 @@ void loadTest() throws Exception {

long endTime = System.currentTimeMillis();
long executionTime = endTime - startTime;
System.out.println();
System.out.println("All tasks have finished execution in " + executionTime + "ms");
clientExecutor.shutdownNow();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public SetupResponse setup(final SetupParams params) throws Exception {

@JSONRPC2Method("reset")
public SetupResponse reset() throws Exception {
client.closeChannels();
client.close();
client = null;
return new SetupResponse("");
}
Expand Down

0 comments on commit 0144164

Please sign in to comment.